1#!/usr/bin/env python3
2#
3# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
4#
5# This program is free software; you can redistribute it and/or
6# modify it under the terms of the GNU Lesser General Public
7# License as published by the Free Software Foundation; either
8# version 2.1 of the License, or (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13# Lesser General Public License for more details.
14#
15# You should have received a copy of the GNU Lesser General Public
16# License along with this program; if not, write to the
17# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18# Boston, MA 02110-1301, USA.
19
20""" Class representing tests and test managers. """
21
22import json
23import os
24import sys
25import re
26import copy
27import shlex
28import socketserver
29import struct
30import time
31from . import utils
32import signal
33import urllib.parse
34import subprocess
35import threading
36import queue
37import configparser
38import xml
39import random
40import shutil
41import uuid
42
43from .utils import which
44from . import reporters
45from . import loggable
46from .loggable import Loggable
47
48from collections import defaultdict
49try:
50    from lxml import etree as ET
51except ImportError:
52    import xml.etree.cElementTree as ET
53
54try:
55    import mdv
56except ImportError:
57    mdv = None
58
59from .vfb_server import get_virual_frame_buffer_server
60from .httpserver import HTTPServer
61from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \
62    Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \
63    check_bugs_resolution, is_tty
64
65# The factor by which we increase the hard timeout when running inside
66# Valgrind
67GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20
68TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1))
69# The error reported by valgrind when detecting errors
70VALGRIND_ERROR_CODE = 20
71
72VALIDATE_OVERRIDE_EXTENSION = ".override"
73EXITING_SIGNALS = dict([(-getattr(signal, s), s) for s in [
74    'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS',
75    'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)])
76EXITING_SIGNALS.update({139: "SIGSEGV"})
77EXITING_SIGNALS.update({(v, k) for k, v in EXITING_SIGNALS.items()})
78
79
80CI_ARTIFACTS_URL = os.environ.get('CI_ARTIFACTS_URL')
81
82
83class Test(Loggable):
84
85    """ A class representing a particular test. """
86
87    def __init__(self, application_name, classname, options,
88                 reporter, duration=0, timeout=DEFAULT_TIMEOUT,
89                 hard_timeout=None, extra_env_variables=None,
90                 expected_issues=None, is_parallel=True,
91                 workdir=None):
92        """
93        @timeout: The timeout during which the value return by get_current_value
94                  keeps being exactly equal
95        @hard_timeout: Max time the test can take in absolute
96        """
97        Loggable.__init__(self)
98        self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor
99        if hard_timeout:
100            self.hard_timeout = hard_timeout * TIMEOUT_FACTOR
101            self.hard_timeout *= options.timeout_factor
102        else:
103            self.hard_timeout = hard_timeout
104        self.classname = classname
105        self.options = options
106        self.application = application_name
107        self.command = []
108        self.server_command = None
109        self.reporter = reporter
110        self.process = None
111        self.proc_env = None
112        self.thread = None
113        self.queue = None
114        self.duration = duration
115        self.stack_trace = None
116        self._uuid = None
117        if expected_issues is None:
118            self.expected_issues = []
119        elif not isinstance(expected_issues, list):
120            self.expected_issues = [expected_issues]
121        else:
122            self.expected_issues = expected_issues
123
124        extra_env_variables = extra_env_variables or {}
125        self.extra_env_variables = extra_env_variables
126        self.optional = False
127        self.is_parallel = is_parallel
128        self.generator = None
129        # String representation of the test number in the testsuite
130        self.number = ""
131        self.workdir = workdir
132        self.allow_flakiness = False
133        self.html_log = None
134
135        self.clean()
136
137    def _generate_expected_issues(self):
138        return ''
139
140    def generate_expected_issues(self):
141        res = '%s"FIXME \'%s\' issues [REPORT A BUG ' % (" " * 4, self.classname) \
142            + 'in https://gitlab.freedesktop.org/gstreamer/ '\
143            + 'or use a proper bug description]": {'
144        res += """
145        "tests": [
146            "%s"
147        ],
148        "issues": [""" % (self.classname)
149
150        retcode = self.process.returncode if self.process else 0
151        if retcode != 0:
152            signame = EXITING_SIGNALS.get(retcode)
153            val = "'" + signame + "'" if signame else retcode
154            res += """\n            {
155                '%s': %s,
156                'sometimes': True,
157            },""" % ("signame" if signame else "returncode", val)
158
159        res += self._generate_expected_issues()
160        res += "\n%s],\n%s},\n" % (" " * 8, " " * 4)
161
162        return res
163
164    def clean(self):
165        self.kill_subprocess()
166        self.message = ""
167        self.error_str = ""
168        self.time_taken = 0.0
169        self._starting_time = None
170        self.result = Result.NOT_RUN
171        self.logfile = None
172        self.out = None
173        self.extra_logfiles = set()
174        self.__env_variable = []
175        self.kill_subprocess()
176        self.process = None
177
178    def __str__(self):
179        string = self.classname
180        if self.result != Result.NOT_RUN:
181            string += ": " + self.result
182            if self.result in [Result.FAILED, Result.TIMEOUT]:
183                string += " '%s'" % self.message
184                if not self.options.dump_on_failure:
185                    if not self.options.redirect_logs and self.result != Result.PASSED:
186                        string += self.get_logfile_repr()
187                else:
188                    string = "\n==> %s" % string
189
190        return string
191
192    def add_env_variable(self, variable, value=None):
193        """
194        Only usefull so that the gst-validate-launcher can print the exact
195        right command line to reproduce the tests
196        """
197        if value is None:
198            value = os.environ.get(variable, None)
199
200        if value is None:
201            return
202
203        self.__env_variable.append(variable)
204
205    @property
206    def _env_variable(self):
207        res = ""
208        for var in set(self.__env_variable):
209            if res:
210                res += " "
211            value = self.proc_env.get(var, None)
212            if value is not None:
213                res += "%s='%s'" % (var, value)
214
215        return res
216
217    def open_logfile(self):
218        if self.out:
219            return
220
221        path = os.path.join(self.options.logsdir,
222                            self.classname.replace(".", os.sep) + '.md')
223        mkdir(os.path.dirname(path))
224        self.logfile = path
225
226        if self.options.redirect_logs == 'stdout':
227            self.out = sys.stdout
228        elif self.options.redirect_logs == 'stderr':
229            self.out = sys.stderr
230        else:
231            self.out = open(path, 'w+')
232
233    def finalize_logfiles(self):
234        if not self.options.redirect_logs:
235            self.out.flush()
236            for logfile in self.extra_logfiles:
237                self.out.write('\n\n## %s:\n\n```\n%s\n```\n' % (
238                    os.path.basename(logfile), self.get_extra_log_content(logfile))
239                )
240            self.out.flush()
241            self.out.close()
242
243        if self.options.html:
244            self.html_log = os.path.splitext(self.logfile)[0] + '.html'
245            import commonmark
246            parser = commonmark.Parser()
247            with open(self.logfile) as f:
248                ast = parser.parse(f.read())
249
250            renderer = commonmark.HtmlRenderer()
251            html = renderer.render(ast)
252            with open(self.html_log, 'w') as f:
253                f.write(html)
254
255        self.out = None
256
257    def _get_file_content(self, file_name):
258        f = open(file_name, 'r+')
259        value = f.read()
260        f.close()
261
262        return value
263
264    def get_log_content(self):
265        return self._get_file_content(self.logfile)
266
267    def get_extra_log_content(self, extralog):
268        if extralog not in self.extra_logfiles:
269            return ""
270
271        return self._get_file_content(extralog)
272
273    def get_classname(self):
274        name = self.classname.split('.')[-1]
275        classname = self.classname.replace('.%s' % name, '')
276
277        return classname
278
279    def get_name(self):
280        return self.classname.split('.')[-1]
281
282    def get_uuid(self):
283        if self._uuid is None:
284            self._uuid = self.classname + str(uuid.uuid4())
285        return self._uuid
286
287    def add_arguments(self, *args):
288        self.command += args
289
290    def build_arguments(self):
291        self.add_env_variable("LD_PRELOAD")
292        self.add_env_variable("DISPLAY")
293
294    def add_stack_trace_to_logfile(self):
295        self.debug("Adding stack trace")
296        trace_gatherer = BackTraceGenerator.get_default()
297        stack_trace = trace_gatherer.get_trace(self)
298
299        if not stack_trace:
300            return
301
302        info = "\n\n## Stack trace\n\n```\n%s\n```" % stack_trace
303        if self.options.redirect_logs:
304            print(info)
305            return
306
307        if self.options.xunit_file:
308            self.stack_trace = stack_trace
309
310        self.out.write(info)
311        self.out.flush()
312
313    def add_known_issue_information(self):
314        if self.expected_issues:
315            info = "\n\n## Already known issues\n\n``` python\n%s\n```\n\n" % (
316                json.dumps(self.expected_issues, indent=4)
317            )
318        else:
319            info = ""
320
321        info += "\n\n**You can mark the issues as 'known' by adding the " \
322            + " following lines to the list of known issues**\n" \
323            + "\n\n``` python\n%s\n```" % (self.generate_expected_issues())
324
325        if self.options.redirect_logs:
326            print(info)
327            return
328
329        self.out.write(info)
330
331    def set_result(self, result, message="", error=""):
332
333        if not self.options.redirect_logs:
334            self.out.write("\n```\n")
335            self.out.flush()
336
337        self.debug("Setting result: %s (message: %s, error: %s)" % (result,
338                                                                    message, error))
339
340        if result is Result.TIMEOUT:
341            if self.options.debug is True:
342                if self.options.gdb:
343                    printc("Timeout, you should process <ctrl>c to get into gdb",
344                           Colors.FAIL)
345                    # and wait here until gdb exits
346                    self.process.communicate()
347                else:
348                    pname = self.command[0]
349                    input("%sTimeout happened  on %s you can attach gdb doing:\n $gdb %s %d%s\n"
350                          "Press enter to continue" % (Colors.FAIL, self.classname,
351                          pname, self.process.pid, Colors.ENDC))
352            else:
353                self.add_stack_trace_to_logfile()
354
355        self.result = result
356        self.message = message
357        self.error_str = error
358
359        if result not in [Result.PASSED, Result.NOT_RUN]:
360            self.add_known_issue_information()
361
362    def check_results(self):
363        if self.result is Result.FAILED or self.result is Result.TIMEOUT:
364            return
365
366        self.debug("%s returncode: %s", self, self.process.returncode)
367        if self.process.returncode == 0:
368            self.set_result(Result.PASSED)
369        elif self.process.returncode in EXITING_SIGNALS:
370            self.add_stack_trace_to_logfile()
371            self.set_result(Result.FAILED,
372                            "Application exited with signal %s" % (
373                                EXITING_SIGNALS[self.process.returncode]))
374        elif self.process.returncode == VALGRIND_ERROR_CODE:
375            self.set_result(Result.FAILED, "Valgrind reported errors")
376        else:
377            self.set_result(Result.FAILED,
378                            "Application returned %d" % (self.process.returncode))
379
380    def get_current_value(self):
381        """
382        Lets subclasses implement a nicer timeout measurement method
383        They should return some value with which we will compare
384        the previous and timeout if they are egual during self.timeout
385        seconds
386        """
387        return Result.NOT_RUN
388
389    def process_update(self):
390        """
391        Returns True when process has finished running or has timed out.
392        """
393
394        if self.process is None:
395            # Process has not started running yet
396            return False
397
398        self.process.poll()
399        if self.process.returncode is not None:
400            return True
401
402        val = self.get_current_value()
403
404        self.debug("Got value: %s" % val)
405        if val is Result.NOT_RUN:
406            # The get_current_value logic is not implemented... dumb
407            # timeout
408            if time.time() - self.last_change_ts > self.timeout:
409                self.set_result(Result.TIMEOUT,
410                                "Application timed out: %s secs" %
411                                self.timeout,
412                                "timeout")
413                return True
414            return False
415        elif val is Result.FAILED:
416            return True
417        elif val is Result.KNOWN_ERROR:
418            return True
419
420        self.log("New val %s" % val)
421
422        if val == self.last_val:
423            delta = time.time() - self.last_change_ts
424            self.debug("%s: Same value for %d/%d seconds" %
425                       (self, delta, self.timeout))
426            if delta > self.timeout:
427                self.set_result(Result.TIMEOUT,
428                                "Application timed out: %s secs" %
429                                self.timeout,
430                                "timeout")
431                return True
432        elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout:
433            self.set_result(
434                Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout)
435            return True
436        else:
437            self.last_change_ts = time.time()
438            self.last_val = val
439
440        return False
441
442    def get_subproc_env(self):
443        return os.environ.copy()
444
445    def kill_subprocess(self):
446        utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT)
447
448    def run_external_checks(self):
449        pass
450
451    def thread_wrapper(self):
452        def enable_sigint():
453            # Restore the SIGINT handler for the child process (gdb) to ensure
454            # it can handle it.
455            signal.signal(signal.SIGINT, signal.SIG_DFL)
456
457        if self.options.gdb and os.name != "nt":
458            preexec_fn = enable_sigint
459        else:
460            preexec_fn = None
461
462        self.process = subprocess.Popen(self.command,
463                                        stderr=self.out,
464                                        stdout=self.out,
465                                        env=self.proc_env,
466                                        cwd=self.workdir,
467                                        preexec_fn=preexec_fn)
468        self.process.wait()
469        if self.result is not Result.TIMEOUT:
470            if self.process.returncode == 0:
471                self.run_external_checks()
472            self.queue.put(None)
473
474    def get_valgrind_suppression_file(self, subdir, name):
475        p = get_data_file(subdir, name)
476        if p:
477            return p
478
479        self.error("Could not find any %s file" % name)
480
481    def get_valgrind_suppressions(self):
482        return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')]
483
484    def use_gdb(self, command):
485        if self.hard_timeout is not None:
486            self.hard_timeout *= GDB_TIMEOUT_FACTOR
487        self.timeout *= GDB_TIMEOUT_FACTOR
488
489        if not self.options.gdb_non_stop:
490            self.timeout = sys.maxsize
491            self.hard_timeout = sys.maxsize
492
493        args = ["gdb"]
494        if self.options.gdb_non_stop:
495            args += ["-ex", "run", "-ex", "backtrace", "-ex", "quit"]
496        args += ["--args"] + command
497        return args
498
499    def use_valgrind(self, command, subenv):
500        vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
501        self.extra_logfiles.add(vglogsfile)
502
503        vg_args = []
504
505        for o, v in [('trace-children', 'yes'),
506                     ('tool', 'memcheck'),
507                     ('leak-check', 'full'),
508                     ('leak-resolution', 'high'),
509                     # TODO: errors-for-leak-kinds should be set to all instead of definite
510                     # and all false positives should be added to suppression
511                     # files.
512                     ('errors-for-leak-kinds', 'definite,indirect'),
513                     ('show-leak-kinds', 'definite,indirect'),
514                     ('show-possibly-lost', 'no'),
515                     ('num-callers', '20'),
516                     ('error-exitcode', str(VALGRIND_ERROR_CODE)),
517                     ('gen-suppressions', 'all')]:
518            vg_args.append("--%s=%s" % (o, v))
519
520        if not self.options.redirect_logs:
521            vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind'
522            self.extra_logfiles.add(vglogsfile)
523            vg_args.append("--%s=%s" % ('log-file', vglogsfile))
524
525        for supp in self.get_valgrind_suppressions():
526            vg_args.append("--suppressions=%s" % supp)
527
528        command = ["valgrind"] + vg_args + command
529
530        # Tune GLib's memory allocator to be more valgrind friendly
531        subenv['G_DEBUG'] = 'gc-friendly'
532        subenv['G_SLICE'] = 'always-malloc'
533
534        if self.hard_timeout is not None:
535            self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR
536        self.timeout *= VALGRIND_TIMEOUT_FACTOR
537
538        # Enable 'valgrind.config'
539        self.add_validate_config(get_data_file(
540            'data', 'valgrind.config'), subenv)
541        if subenv == self.proc_env:
542            self.add_env_variable('G_DEBUG', 'gc-friendly')
543            self.add_env_variable('G_SLICE', 'always-malloc')
544            self.add_env_variable('GST_VALIDATE_CONFIG',
545                                  self.proc_env['GST_VALIDATE_CONFIG'])
546
547        return command
548
549    def add_validate_config(self, config, subenv=None):
550        if not subenv:
551            subenv = self.extra_env_variables
552
553        if "GST_VALIDATE_CONFIG" in subenv:
554            subenv['GST_VALIDATE_CONFIG'] = '%s%s%s' % (
555                subenv['GST_VALIDATE_CONFIG'], os.pathsep, config)
556        else:
557            subenv['GST_VALIDATE_CONFIG'] = config
558
559    def launch_server(self):
560        return None
561
562    def get_logfile_repr(self):
563        if not self.options.redirect_logs:
564            if self.html_log:
565                log = self.html_log
566            else:
567                log = self.logfile
568
569            if CI_ARTIFACTS_URL:
570                log = CI_ARTIFACTS_URL + os.path.relpath(log, self.options.logsdir)
571
572            return "\n    Log: %s" % (log)
573
574        return ""
575
576    def get_command_repr(self):
577        message = "%s %s" % (self._env_variable, ' '.join(
578            shlex.quote(arg) for arg in self.command))
579        if self.server_command:
580            message = "%s & %s" % (self.server_command, message)
581
582        return message
583
584    def test_start(self, queue):
585        self.open_logfile()
586
587        self.server_command = self.launch_server()
588        self.queue = queue
589        self.command = [self.application]
590        self._starting_time = time.time()
591        self.build_arguments()
592        self.proc_env = self.get_subproc_env()
593
594        for var, value in list(self.extra_env_variables.items()):
595            value = self.proc_env.get(var, '') + os.pathsep + value
596            self.proc_env[var] = value.strip(os.pathsep)
597            self.add_env_variable(var, self.proc_env[var])
598
599        if self.options.gdb:
600            self.command = self.use_gdb(self.command)
601
602            self.previous_sigint_handler = signal.getsignal(signal.SIGINT)
603            # Make the gst-validate executable ignore SIGINT while gdb is
604            # running.
605            signal.signal(signal.SIGINT, signal.SIG_IGN)
606
607        if self.options.valgrind:
608            self.command = self.use_valgrind(self.command, self.proc_env)
609
610        if not self.options.redirect_logs:
611            self.out.write("# `%s`\n\n"
612                           "## Command\n\n``` bash\n%s\n```\n\n" % (
613                               self.classname, self.get_command_repr()))
614            self.out.write("## %s output\n\n``` \n\n" % os.path.basename(self.application))
615            self.out.flush()
616        else:
617            message = "Launching: %s%s\n" \
618                "    Command: %s\n" % (Colors.ENDC, self.classname,
619                                       self.get_command_repr())
620            printc(message, Colors.OKBLUE)
621
622        self.thread = threading.Thread(target=self.thread_wrapper)
623        self.thread.start()
624
625        self.last_val = 0
626        self.last_change_ts = time.time()
627        self.start_ts = time.time()
628
629    def _dump_log_file(self, logfile):
630        with open(logfile, 'r') as fin:
631            printc(self.get_logfile_repr())
632            if mdv and utils.supports_ansi_colors():
633                printc(mdv.main(fin.read()))
634            else:
635                for line in fin.readlines():
636                    print('> ' + line, end='')
637
638    def _dump_log_files(self):
639        self._dump_log_file(self.logfile)
640
641    def copy_logfiles(self, extra_folder="flaky_tests"):
642        path = os.path.dirname(os.path.join(self.options.logsdir, extra_folder,
643                            self.classname.replace(".", os.sep)))
644        mkdir(path)
645        self.logfile = shutil.copy(self.logfile, path)
646        extra_logs = []
647        for logfile in self.extra_logfiles:
648            extra_logs.append(shutil.copy(logfile, path))
649        self.extra_logfiles = extra_logs
650
651    def test_end(self, retry_on_failure=False):
652        self.kill_subprocess()
653        self.thread.join()
654        self.time_taken = time.time() - self._starting_time
655
656        if self.options.gdb:
657            signal.signal(signal.SIGINT, self.previous_sigint_handler)
658
659        self.finalize_logfiles()
660        message = None
661        end = "\n"
662        if self.result != Result.PASSED:
663            if not retry_on_failure:
664                message = str(self)
665                end = "\n"
666        else:
667            if is_tty():
668                message = "%s %s: %s%s" % (self.number, self.classname, self.result,
669                                        " (" + self.message + ")" if self.message else "")
670                end = "\r"
671
672        if message is not None:
673            printc(message, color=utils.get_color_for_result(
674                self.result), end=end)
675
676        if self.options.dump_on_failure:
677            if self.result is not Result.PASSED:
678                self._dump_log_files()
679
680        # Only keep around env variables we need later
681        clean_env = {}
682        for n in self.__env_variable:
683            clean_env[n] = self.proc_env.get(n, None)
684        self.proc_env = clean_env
685
686        # Don't keep around JSON report objects, they were processed
687        # in check_results already
688        self.reports = []
689
690        return self.result
691
692
693class GstValidateTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
694    pass
695
696
697class GstValidateListener(socketserver.BaseRequestHandler, Loggable):
698
699    def __init__(self, *args, **kwargs):
700        super().__init__(*args, **kwargs)
701        Loggable.__init__(self, "GstValidateListener")
702
703    def handle(self):
704        """Implements BaseRequestHandler handle method"""
705        test = None
706        self.logCategory = "GstValidateListener"
707        while True:
708            raw_len = self.request.recv(4)
709            if raw_len == b'':
710                return
711            msglen = struct.unpack('>I', raw_len)[0]
712            try:
713                msg = self.request.recv(msglen).decode('utf-8', 'ignore')
714            except UnicodeDecodeError as e:
715                self.error("Could not decode message: %s - %s" % (msg, e))
716                continue
717            if msg == '':
718                return
719
720            try:
721                obj = json.loads(msg)
722            except json.decoder.JSONDecodeError:
723                self.error("Could not deserialize: %s - %s" % (msg, e))
724                continue
725
726            if test is None:
727                # First message must contain the uuid
728                uuid = obj.get("uuid", None)
729                if uuid is None:
730                    return
731                # Find test from launcher
732                for t in self.server.launcher.tests:
733                    if uuid == t.get_uuid():
734                        test = t
735                        break
736                if test is None:
737                    self.server.launcher.error(
738                        "Could not find test for UUID %s" % uuid)
739                    return
740
741            obj_type = obj.get("type", '')
742            if obj_type == 'position':
743                test.set_position(obj['position'], obj['duration'],
744                                  obj['speed'])
745            elif obj_type == 'buffering':
746                test.set_position(obj['position'], 100)
747            elif obj_type == 'action':
748                test.add_action_execution(obj)
749                # Make sure that action is taken into account when checking if process
750                # is updating
751                test.position += 1
752            elif obj_type == 'action-done':
753                # Make sure that action end is taken into account when checking if process
754                # is updating
755                test.position += 1
756                test.actions_infos[-1]['execution-duration'] = obj['execution-duration']
757            elif obj_type == 'report':
758                test.add_report(obj)
759
760
761class GstValidateTest(Test):
762
763    """ A class representing a particular test. """
764    HARD_TIMEOUT_FACTOR = 5
765    fault_sig_regex = re.compile("<Caught SIGNAL: .*>")
766
767    def __init__(self, application_name, classname,
768                 options, reporter, duration=0,
769                 timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None,
770                 media_descriptor=None, extra_env_variables=None,
771                 expected_issues=None, workdir=None):
772
773        extra_env_variables = extra_env_variables or {}
774
775        if not hard_timeout and self.HARD_TIMEOUT_FACTOR:
776            if timeout:
777                hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR
778            elif duration:
779                hard_timeout = duration * self.HARD_TIMEOUT_FACTOR
780            else:
781                hard_timeout = None
782
783        # If we are running from source, use the -debug version of the
784        # application which is using rpath instead of libtool's wrappers. It's
785        # slightly faster to start and will not confuse valgrind.
786        debug = '%s-debug' % application_name
787        p = look_for_file_in_source_dir('tools', debug)
788        if p:
789            application_name = p
790
791        self.reports = []
792        self.position = -1
793        self.media_duration = -1
794        self.speed = 1.0
795        self.actions_infos = []
796        self.media_descriptor = media_descriptor
797        self.server = None
798        self.criticals = []
799
800        override_path = self.get_override_file(media_descriptor)
801        if override_path:
802            if extra_env_variables:
803                if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""):
804                    extra_env_variables[
805                        "GST_VALIDATE_OVERRIDE"] += os.path.pathsep
806
807            extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path
808
809        super(GstValidateTest, self).__init__(application_name, classname,
810                                              options, reporter,
811                                              duration=duration,
812                                              timeout=timeout,
813                                              hard_timeout=hard_timeout,
814                                              extra_env_variables=extra_env_variables,
815                                              expected_issues=expected_issues,
816                                              workdir=workdir)
817
818        if scenario is None or scenario.name.lower() == "none":
819            self.scenario = None
820        else:
821            self.scenario = scenario
822
823    def kill_subprocess(self):
824        Test.kill_subprocess(self)
825
826    def add_report(self, report):
827        self.reports.append(report)
828
829    def set_position(self, position, duration, speed=None):
830        self.position = position
831        self.media_duration = duration
832        if speed:
833            self.speed = speed
834
835    def add_action_execution(self, action_infos):
836        self.actions_infos.append(action_infos)
837
838    def get_override_file(self, media_descriptor):
839        if media_descriptor:
840            if media_descriptor.get_path():
841                override_path = os.path.splitext(media_descriptor.get_path())[
842                    0] + VALIDATE_OVERRIDE_EXTENSION
843                if os.path.exists(override_path):
844                    return override_path
845
846        return None
847
848    def get_current_position(self):
849        return self.position
850
851    def get_current_value(self):
852        return self.position
853
854    def get_subproc_env(self):
855        subproc_env = os.environ.copy()
856
857        subproc_env["GST_VALIDATE_UUID"] = self.get_uuid()
858
859        if 'GST_DEBUG' in os.environ and not self.options.redirect_logs:
860            gstlogsfile = os.path.splitext(self.logfile)[0] + '.gstdebug'
861            self.extra_logfiles.add(gstlogsfile)
862            subproc_env["GST_DEBUG_FILE"] = gstlogsfile
863
864        if self.options.no_color:
865            subproc_env["GST_DEBUG_NO_COLOR"] = '1'
866
867        # Ensure XInitThreads is called, see bgo#731525
868        subproc_env['GST_GL_XINITTHREADS'] = '1'
869        self.add_env_variable('GST_GL_XINITTHREADS', '1')
870
871        if self.scenario is not None:
872            scenario = self.scenario.get_execution_name()
873            subproc_env["GST_VALIDATE_SCENARIO"] = scenario
874            self.add_env_variable("GST_VALIDATE_SCENARIO",
875                                  subproc_env["GST_VALIDATE_SCENARIO"])
876        else:
877            try:
878                del subproc_env["GST_VALIDATE_SCENARIO"]
879            except KeyError:
880                pass
881
882        if not subproc_env.get('GST_DEBUG_DUMP_DOT_DIR'):
883            dotfilesdir = os.path.join(self.options.logsdir,
884                                self.classname.replace(".", os.sep) + '.pipelines_dot_files')
885            mkdir(dotfilesdir)
886            subproc_env['GST_DEBUG_DUMP_DOT_DIR'] = dotfilesdir
887            if CI_ARTIFACTS_URL:
888                dotfilesurl = CI_ARTIFACTS_URL + os.path.relpath(dotfilesdir,
889                                                                 self.options.logsdir)
890                subproc_env['GST_VALIDATE_DEBUG_DUMP_DOT_URL'] = dotfilesurl
891
892        return subproc_env
893
894    def clean(self):
895        Test.clean(self)
896        self.reports = []
897        self.position = -1
898        self.media_duration = -1
899        self.speed = 1.0
900        self.actions_infos = []
901
902    def build_arguments(self):
903        super(GstValidateTest, self).build_arguments()
904        if "GST_VALIDATE" in os.environ:
905            self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"])
906
907        if "GST_VALIDATE_SCENARIOS_PATH" in os.environ:
908            self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH",
909                                  os.environ["GST_VALIDATE_SCENARIOS_PATH"])
910
911        self.add_env_variable("GST_VALIDATE_CONFIG")
912        self.add_env_variable("GST_VALIDATE_OVERRIDE")
913
914    def get_extra_log_content(self, extralog):
915        value = Test.get_extra_log_content(self, extralog)
916
917        return value
918
919    def report_matches_expected_issues(self, report, expected_issues):
920        for key in ['bug', 'bugs', 'sometimes']:
921            if key in expected_issues:
922                del expected_issues[key]
923        for key, value in list(report.items()):
924            if key in expected_issues:
925                if not re.findall(expected_issues[key], str(value)):
926                    return False
927                expected_issues.pop(key)
928
929        return not bool(expected_issues)
930
931    def check_reported_issues(self, expected_issues):
932        ret = []
933        expected_retcode = [0]
934        for report in self.reports:
935            found = None
936            for expected_issue in expected_issues:
937                if self.report_matches_expected_issues(report,
938                                                       expected_issue.copy()):
939                    found = expected_issue
940                    break
941
942            if found is not None:
943                if not found.get('can-happen-several-times', False):
944                    expected_issues.remove(found)
945                if report['level'] == 'critical':
946                    if found.get('sometimes', True) and isinstance(expected_retcode, list):
947                        expected_retcode.append(18)
948                    else:
949                        expected_retcode = [18]
950            elif report['level'] == 'critical':
951                ret.append(report)
952
953        if not ret:
954            return None, expected_issues, expected_retcode
955
956        return ret, expected_issues, expected_retcode
957
958    def check_expected_issue(self, expected_issue):
959        res = True
960        msg = ''
961        expected_symbols = expected_issue.get('stacktrace_symbols')
962        if expected_symbols:
963            trace_gatherer = BackTraceGenerator.get_default()
964            stack_trace = trace_gatherer.get_trace(self)
965
966            if stack_trace:
967                if not isinstance(expected_symbols, list):
968                    expected_symbols = [expected_symbols]
969
970                not_found_symbols = [s for s in expected_symbols
971                                     if s not in stack_trace]
972                if not_found_symbols:
973                    msg = " Expected symbols '%s' not found in stack trace " % (
974                        not_found_symbols)
975                    res = False
976            else:
977                msg += " No stack trace available, could not verify symbols "
978
979        _, not_found_expected_issues, _ = self.check_reported_issues(expected_issue.get('issues', []))
980        if not_found_expected_issues:
981            mandatory_failures = [f for f in not_found_expected_issues
982                                  if not f.get('sometimes', True)]
983            if mandatory_failures:
984                msg = " (Expected issues not found: %s) " % mandatory_failures
985                res = False
986
987        return msg, res
988
989    def check_expected_timeout(self, expected_timeout):
990        msg = "Expected timeout happened. "
991        result = Result.PASSED
992        message = expected_timeout.get('message')
993        if message:
994            if not re.findall(message, self.message):
995                result = Result.FAILED
996                msg = "Expected timeout message: %s got %s " % (
997                    message, self.message)
998
999        stack_msg, stack_res = self.check_expected_issue(expected_timeout)
1000        if not stack_res:
1001            result = Result.TIMEOUT
1002            msg += stack_msg
1003
1004        return result, msg
1005
1006    def check_results(self):
1007        if self.result in [Result.FAILED, self.result is Result.PASSED]:
1008            return
1009
1010        for report in self.reports:
1011            if report.get('issue-id') == 'runtime::missing-plugin':
1012                self.set_result(Result.SKIPPED, "%s\n%s" % (report['summary'],
1013                                                            report['details']))
1014                return
1015
1016        self.debug("%s returncode: %s", self, self.process.returncode)
1017
1018        expected_issues = copy.deepcopy(self.expected_issues)
1019        self.criticals, not_found_expected_issues, expected_returncode = self.check_reported_issues(expected_issues)
1020        expected_timeout = None
1021        expected_signal = None
1022        for i, f in enumerate(not_found_expected_issues):
1023            returncode = f.get('returncode', [])
1024            if not isinstance(returncode, list):
1025                returncode = [returncode]
1026
1027            if f.get('signame'):
1028                signames = f['signame']
1029                if not isinstance(signames, list):
1030                    signames = [signames]
1031
1032                returncode = [EXITING_SIGNALS[signame] for signame in signames]
1033
1034            if returncode:
1035                if 'sometimes' in f:
1036                    returncode.append(0)
1037                expected_returncode = returncode
1038                expected_signal = f
1039            elif f.get("timeout"):
1040                expected_timeout = f
1041
1042        not_found_expected_issues = [f for f in not_found_expected_issues
1043                                     if not f.get('returncode') and not f.get('signame')]
1044
1045        msg = ""
1046        result = Result.PASSED
1047        if self.result == Result.TIMEOUT:
1048            with open(self.logfile) as f:
1049                signal_fault_info = self.fault_sig_regex.findall(f.read())
1050                if signal_fault_info:
1051                    result = Result.FAILED
1052                    msg = signal_fault_info[0]
1053                elif expected_timeout:
1054                    not_found_expected_issues.remove(expected_timeout)
1055                    result, msg = self.check_expected_timeout(expected_timeout)
1056                else:
1057                    return
1058        elif self.process.returncode in EXITING_SIGNALS:
1059            msg = "Application exited with signal %s" % (
1060                EXITING_SIGNALS[self.process.returncode])
1061            if self.process.returncode not in expected_returncode:
1062                result = Result.FAILED
1063            else:
1064                if expected_signal:
1065                    stack_msg, stack_res = self.check_expected_issue(
1066                        expected_signal)
1067                    if not stack_res:
1068                        msg += stack_msg
1069                        result = Result.FAILED
1070            self.add_stack_trace_to_logfile()
1071        elif self.process.returncode == VALGRIND_ERROR_CODE:
1072            msg = "Valgrind reported errors "
1073            result = Result.FAILED
1074        elif self.process.returncode not in expected_returncode:
1075            msg = "Application returned %s " % self.process.returncode
1076            if expected_returncode != [0]:
1077                msg += "(expected %s) " % expected_returncode
1078            result = Result.FAILED
1079
1080        if self.criticals:
1081            msg += "(critical errors: [%s]) " % ', '.join([c['summary']
1082                                                           for c in self.criticals])
1083            result = Result.FAILED
1084
1085        if not_found_expected_issues:
1086            mandatory_failures = [f for f in not_found_expected_issues
1087                                  if not f.get('sometimes', True)]
1088
1089            if mandatory_failures:
1090                msg += " (Expected errors not found: %s) " % mandatory_failures
1091                result = Result.FAILED
1092        elif self.expected_issues:
1093            msg += ' %s(Expected errors occured: %s)%s' % (Colors.OKBLUE,
1094                                                           self.expected_issues,
1095                                                           Colors.ENDC)
1096
1097        self.set_result(result, msg.strip())
1098
1099    def _generate_expected_issues(self):
1100        res = ""
1101        self.criticals = self.criticals or []
1102        if self.result == Result.TIMEOUT:
1103            res += """            {
1104                'timeout': True,
1105                'sometimes': True,
1106            },"""
1107
1108        for report in self.criticals:
1109            res += "\n%s{" % (" " * 12)
1110
1111            for key, value in report.items():
1112                if key == "type":
1113                    continue
1114                if value is None:
1115                    continue
1116                res += '\n%s%s"%s": "%s",' % (
1117                    " " * 16, "# " if key == "details" else "",
1118                    key, value.replace('\n', '\\n'))
1119
1120            res += "\n%s}," % (" " * 12)
1121
1122        return res
1123
1124    def get_valgrind_suppressions(self):
1125        result = super(GstValidateTest, self).get_valgrind_suppressions()
1126        result.extend(utils.get_gst_build_valgrind_suppressions())
1127        gst_sup = self.get_valgrind_suppression_file('common', 'gst.supp')
1128        if gst_sup:
1129            result.append(gst_sup)
1130
1131        return result
1132
1133
1134class GstValidateEncodingTestInterface(object):
1135    DURATION_TOLERANCE = GST_SECOND / 4
1136
1137    def __init__(self, combination, media_descriptor, duration_tolerance=None):
1138        super(GstValidateEncodingTestInterface, self).__init__()
1139
1140        self.media_descriptor = media_descriptor
1141        self.combination = combination
1142        self.dest_file = ""
1143
1144        self._duration_tolerance = duration_tolerance
1145        if duration_tolerance is None:
1146            self._duration_tolerance = self.DURATION_TOLERANCE
1147
1148    def get_current_size(self):
1149        try:
1150            size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size
1151        except OSError:
1152            return None
1153
1154        self.debug("Size: %s" % size)
1155        return size
1156
1157    def _get_profile_full(self, muxer, venc, aenc, video_restriction=None,
1158                          audio_restriction=None, audio_presence=0,
1159                          video_presence=0):
1160        ret = ""
1161        if muxer:
1162            ret += muxer
1163        ret += ":"
1164        if venc:
1165            if video_restriction is not None:
1166                ret = ret + video_restriction + '->'
1167            ret += venc
1168            if video_presence:
1169                ret = ret + '|' + str(video_presence)
1170        if aenc:
1171            ret += ":"
1172            if audio_restriction is not None:
1173                ret = ret + audio_restriction + '->'
1174            ret += aenc
1175            if audio_presence:
1176                ret = ret + '|' + str(audio_presence)
1177
1178        return ret.replace("::", ":")
1179
1180    def get_profile(self, video_restriction=None, audio_restriction=None):
1181        vcaps = self.combination.get_video_caps()
1182        acaps = self.combination.get_audio_caps()
1183        if self.media_descriptor is not None:
1184            if self.media_descriptor.get_num_tracks("video") == 0:
1185                vcaps = None
1186
1187            if self.media_descriptor.get_num_tracks("audio") == 0:
1188                acaps = None
1189
1190        return self._get_profile_full(self.combination.get_muxer_caps(),
1191                                      vcaps, acaps,
1192                                      video_restriction=video_restriction,
1193                                      audio_restriction=audio_restriction)
1194
1195    def _clean_caps(self, caps):
1196        """
1197        Returns a list of key=value or structure name, without "(types)" or ";" or ","
1198        """
1199        return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',')
1200
1201    # pylint: disable=E1101
1202    def _has_caps_type_variant(self, c, ccaps):
1203        """
1204        Handle situations where we can have application/ogg or video/ogg or
1205        audio/ogg
1206        """
1207        has_variant = False
1208        media_type = re.findall("application/|video/|audio/", c)
1209        if media_type:
1210            media_type = media_type[0].replace('/', '')
1211            possible_mtypes = ["application", "video", "audio"]
1212            possible_mtypes.remove(media_type)
1213            for tmptype in possible_mtypes:
1214                possible_c_variant = c.replace(media_type, tmptype)
1215                if possible_c_variant in ccaps:
1216                    self.info(
1217                        "Found %s in %s, good enough!", possible_c_variant, ccaps)
1218                    has_variant = True
1219
1220        return has_variant
1221
1222    # pylint: disable=E1101
1223    def run_iqa_test(self, reference_file_uri):
1224        """
1225        Runs IQA test if @reference_file_path exists
1226        @test: The test to run tests on
1227        """
1228        if not GstValidateBaseTestManager.has_feature('iqa'):
1229            self.debug('Iqa element not present, not running extra test.')
1230            return
1231
1232        pipeline_desc = """
1233            uridecodebin uri=%s !
1234                iqa name=iqa do-dssim=true dssim-error-threshold=1.0 ! fakesink
1235            uridecodebin uri=%s ! iqa.
1236        """ % (reference_file_uri, self.dest_file)
1237        pipeline_desc = pipeline_desc.replace("\n", "")
1238
1239        command = [GstValidateBaseTestManager.COMMAND] + \
1240            shlex.split(pipeline_desc)
1241        msg = "## Running IQA tests on results of: " \
1242            + "%s\n### Command: \n```\n%s\n```\n" % (
1243                self.classname, ' '.join(command))
1244        if not self.options.redirect_logs:
1245            self.out.write(msg)
1246            self.out.flush()
1247        else:
1248            printc(msg, Colors.OKBLUE)
1249
1250        self.process = subprocess.Popen(command,
1251                                        stderr=self.out,
1252                                        stdout=self.out,
1253                                        env=self.proc_env,
1254                                        cwd=self.workdir)
1255        self.process.wait()
1256
1257    def check_encoded_file(self):
1258        result_descriptor = GstValidateMediaDescriptor.new_from_uri(
1259            self.dest_file)
1260        if result_descriptor is None:
1261            return (Result.FAILED, "Could not discover encoded file %s"
1262                    % self.dest_file)
1263
1264        duration = result_descriptor.get_duration()
1265        orig_duration = self.media_descriptor.get_duration()
1266        tolerance = self._duration_tolerance
1267
1268        if orig_duration - tolerance >= duration <= orig_duration + tolerance:
1269            os.remove(result_descriptor.get_path())
1270            return (Result.FAILED, "Duration of encoded file is "
1271                    " wrong (%s instead of %s)" %
1272                    (utils.TIME_ARGS(duration),
1273                     utils.TIME_ARGS(orig_duration)))
1274        else:
1275            all_tracks_caps = result_descriptor.get_tracks_caps()
1276            container_caps = result_descriptor.get_caps()
1277            if container_caps:
1278                all_tracks_caps.insert(0, ("container", container_caps))
1279
1280            for track_type, caps in all_tracks_caps:
1281                ccaps = self._clean_caps(caps)
1282                wanted_caps = self.combination.get_caps(track_type)
1283                cwanted_caps = self._clean_caps(wanted_caps)
1284
1285                if wanted_caps is None:
1286                    os.remove(result_descriptor.get_path())
1287                    return (Result.FAILED,
1288                            "Found a track of type %s in the encoded files"
1289                            " but none where wanted in the encoded profile: %s"
1290                            % (track_type, self.combination))
1291
1292                for c in cwanted_caps:
1293                    if c not in ccaps:
1294                        if not self._has_caps_type_variant(c, ccaps):
1295                            os.remove(result_descriptor.get_path())
1296                            return (Result.FAILED,
1297                                    "Field: %s  (from %s) not in caps of the outputed file %s"
1298                                    % (wanted_caps, c, ccaps))
1299
1300            os.remove(result_descriptor.get_path())
1301            return (Result.PASSED, "")
1302
1303
1304class TestsManager(Loggable):
1305
1306    """ A class responsible for managing tests. """
1307
1308    name = "base"
1309    loading_testsuite = None
1310
1311    def __init__(self):
1312
1313        Loggable.__init__(self)
1314
1315        self.tests = []
1316        self.unwanted_tests = []
1317        self.options = None
1318        self.args = None
1319        self.reporter = None
1320        self.wanted_tests_patterns = []
1321        self.blacklisted_tests_patterns = []
1322        self._generators = []
1323        self.check_testslist = True
1324        self.all_tests = None
1325        self.expected_issues = {}
1326        self.blacklisted_tests = []
1327
1328    def init(self):
1329        return True
1330
1331    def list_tests(self):
1332        return sorted(list(self.tests), key=lambda x: x.classname)
1333
1334    def find_tests(self, classname):
1335        regex = re.compile(classname)
1336        return [test for test in self.list_tests() if regex.findall(test.classname)]
1337
1338    def add_expected_issues(self, expected_issues):
1339        for bugid, failure_def in list(expected_issues.items()):
1340            tests_regexes = []
1341            for test_name_regex in failure_def['tests']:
1342                regex = re.compile(test_name_regex)
1343                tests_regexes.append(regex)
1344                for test in self.tests:
1345                    if regex.findall(test.classname):
1346                        if failure_def.get('allow_flakiness'):
1347                            test.allow_flakiness = True
1348                            self.debug("%s allow flakyness" % (test.classname))
1349                        else:
1350                            for issue in failure_def['issues']:
1351                                issue['bug'] = bugid
1352                            test.expected_issues.extend(failure_def['issues'])
1353                            self.debug("%s added expected issues from %s" % (
1354                                test.classname, bugid))
1355            failure_def['tests'] = tests_regexes
1356
1357        self.expected_issues.update(expected_issues)
1358
1359    def add_test(self, test):
1360        if test.generator is None:
1361            test.classname = self.loading_testsuite + '.' + test.classname
1362
1363        for bugid, failure_def in list(self.expected_issues.items()):
1364            failure_def['bug'] = bugid
1365            for regex in failure_def['tests']:
1366                if regex.findall(test.classname):
1367                    if failure_def.get('allow_flakiness'):
1368                        test.allow_flakiness = True
1369                        self.debug("%s allow flakyness" % (test.classname))
1370                    else:
1371                        for issue in failure_def['issues']:
1372                            issue['bug'] = bugid
1373                        test.expected_issues.extend(failure_def['issues'])
1374                        self.debug("%s added expected issues from %s" % (
1375                            test.classname, bugid))
1376
1377        if self._is_test_wanted(test):
1378            if test not in self.tests:
1379                self.tests.append(test)
1380        else:
1381            if test not in self.tests:
1382                self.unwanted_tests.append(test)
1383
1384    def get_tests(self):
1385        return self.tests
1386
1387    def populate_testsuite(self):
1388        pass
1389
1390    def add_generators(self, generators):
1391        """
1392        @generators: A list of, or one single #TestsGenerator to be used to generate tests
1393        """
1394        if not isinstance(generators, list):
1395            generators = [generators]
1396        self._generators.extend(generators)
1397        for generator in generators:
1398            generator.testsuite = self.loading_testsuite
1399
1400        self._generators = list(set(self._generators))
1401
1402    def get_generators(self):
1403        return self._generators
1404
1405    def _add_blacklist(self, blacklisted_tests):
1406        if not isinstance(blacklisted_tests, list):
1407            blacklisted_tests = [blacklisted_tests]
1408
1409        for patterns in blacklisted_tests:
1410            for pattern in patterns.split(","):
1411                self.blacklisted_tests_patterns.append(re.compile(pattern))
1412
1413    def set_default_blacklist(self, default_blacklist):
1414        for test_regex, reason in default_blacklist:
1415            if not test_regex.startswith(self.loading_testsuite + '.'):
1416                test_regex = self.loading_testsuite + '.' + test_regex
1417            self.blacklisted_tests.append((test_regex, reason))
1418            self._add_blacklist(test_regex)
1419
1420    def add_options(self, parser):
1421        """ Add more arguments. """
1422        pass
1423
1424    def set_settings(self, options, args, reporter):
1425        """ Set properties after options parsing. """
1426        self.options = options
1427        self.args = args
1428        self.reporter = reporter
1429
1430        self.populate_testsuite()
1431
1432        if self.options.valgrind:
1433            self.print_valgrind_bugs()
1434
1435        if options.wanted_tests:
1436            for patterns in options.wanted_tests:
1437                for pattern in patterns.split(","):
1438                    self.wanted_tests_patterns.append(re.compile(pattern))
1439
1440        if options.blacklisted_tests:
1441            for patterns in options.blacklisted_tests:
1442                self._add_blacklist(patterns)
1443
1444    def check_blacklists(self):
1445        if self.options.check_bugs_status:
1446            if not check_bugs_resolution(self.blacklisted_tests):
1447                return False
1448
1449        return True
1450
1451    def log_blacklists(self):
1452        if self.blacklisted_tests:
1453            self.info("Currently 'hardcoded' %s blacklisted tests:" %
1454                      self.name)
1455
1456        for name, bug in self.blacklisted_tests:
1457            if not self.options.check_bugs_status:
1458                self.info("  + %s --> bug: %s" % (name, bug))
1459
1460    def check_expected_issues(self):
1461        if not self.expected_issues or not self.options.check_bugs_status:
1462            return True
1463
1464        bugs_definitions = defaultdict(list)
1465        for bug, failure_def in list(self.expected_issues.items()):
1466            tests_names = '|'.join(
1467                [regex.pattern for regex in failure_def['tests']])
1468            bugs_definitions[tests_names].extend([bug])
1469
1470        return check_bugs_resolution(bugs_definitions.items())
1471
1472    def _check_blacklisted(self, test):
1473        for pattern in self.blacklisted_tests_patterns:
1474            if pattern.findall(test.classname):
1475                self.info("%s is blacklisted by %s", test.classname, pattern)
1476                return True
1477
1478        return False
1479
1480    def _check_whitelisted(self, test):
1481        for pattern in self.wanted_tests_patterns:
1482            if pattern.findall(test.classname):
1483                if self._check_blacklisted(test):
1484                    # If explicitly white listed that specific test
1485                    # bypass the blacklisting
1486                    if pattern.pattern != test.classname:
1487                        return False
1488                return True
1489        return False
1490
1491    def _check_duration(self, test):
1492        if test.duration > 0 and int(self.options.long_limit) < int(test.duration):
1493            self.info("Not activating %s as its duration (%d) is superior"
1494                      " than the long limit (%d)" % (test, test.duration,
1495                                                     int(self.options.long_limit)))
1496            return False
1497
1498        return True
1499
1500    def _is_test_wanted(self, test):
1501        if self._check_whitelisted(test):
1502            if not self._check_duration(test):
1503                return False
1504            return True
1505
1506        if self._check_blacklisted(test):
1507            return False
1508
1509        if not self._check_duration(test):
1510            return False
1511
1512        if not self.wanted_tests_patterns:
1513            return True
1514
1515        return False
1516
1517    def needs_http_server(self):
1518        return False
1519
1520    def print_valgrind_bugs(self):
1521        pass
1522
1523
1524class TestsGenerator(Loggable):
1525
1526    def __init__(self, name, test_manager, tests=[]):
1527        Loggable.__init__(self)
1528        self.name = name
1529        self.test_manager = test_manager
1530        self.testsuite = None
1531        self._tests = {}
1532        for test in tests:
1533            self._tests[test.classname] = test
1534
1535    def generate_tests(self, *kwargs):
1536        """
1537        Method that generates tests
1538        """
1539        return list(self._tests.values())
1540
1541    def add_test(self, test):
1542        test.generator = self
1543        test.classname = self.testsuite + '.' + test.classname
1544        self._tests[test.classname] = test
1545
1546
1547class GstValidateTestsGenerator(TestsGenerator):
1548
1549    def populate_tests(self, uri_minfo_special_scenarios, scenarios):
1550        pass
1551
1552    @staticmethod
1553    def get_fakesink_for_media_type(media_type, needs_clock=False):
1554        if media_type == "video":
1555            if needs_clock:
1556                return 'fakevideosink qos=true max-lateness=20000000'
1557
1558            return "fakevideosink sync=false"
1559
1560        if needs_clock:
1561            return "fakesink sync=true"
1562
1563        return "fakesink"
1564
1565    def generate_tests(self, uri_minfo_special_scenarios, scenarios):
1566        self.populate_tests(uri_minfo_special_scenarios, scenarios)
1567        return super(GstValidateTestsGenerator, self).generate_tests()
1568
1569
1570class _TestsLauncher(Loggable):
1571
1572    def __init__(self):
1573
1574        Loggable.__init__(self)
1575
1576        self.options = None
1577        self.testers = []
1578        self.tests = []
1579        self.reporter = None
1580        self._list_testers()
1581        self.all_tests = None
1582        self.wanted_tests_patterns = []
1583
1584        self.queue = queue.Queue()
1585        self.jobs = []
1586        self.total_num_tests = 0
1587        self.server = None
1588        self.httpsrv = None
1589        self.vfb_server = None
1590
1591    def _list_app_dirs(self):
1592        app_dirs = []
1593        env_dirs = os.environ["GST_VALIDATE_APPS_DIR"]
1594        if env_dirs is not None:
1595            for dir_ in env_dirs.split(":"):
1596                app_dirs.append(dir_)
1597
1598        return app_dirs
1599
1600    def _exec_app(self, app_dir, env):
1601        try:
1602            files = os.listdir(app_dir)
1603        except OSError as e:
1604            self.debug("Could not list %s: %s" % (app_dir, e))
1605            files = []
1606        for f in files:
1607            if f.endswith(".py"):
1608                exec(compile(open(os.path.join(app_dir, f)).read(),
1609                             os.path.join(app_dir, f), 'exec'), env)
1610
1611    def _exec_apps(self, env):
1612        app_dirs = self._list_app_dirs()
1613        for app_dir in app_dirs:
1614            self._exec_app(app_dir, env)
1615
1616    def _list_testers(self):
1617        env = globals().copy()
1618        self._exec_apps(env)
1619
1620        testers = [i() for i in utils.get_subclasses(TestsManager, env)]
1621        for tester in testers:
1622            if tester.init() is True:
1623                self.testers.append(tester)
1624            else:
1625                self.warning("Can not init tester: %s -- PATH is %s"
1626                             % (tester.name, os.environ["PATH"]))
1627
1628    def add_options(self, parser):
1629        for tester in self.testers:
1630            tester.add_options(parser)
1631
1632    def _load_testsuite(self, testsuites):
1633        exceptions = []
1634        for testsuite in testsuites:
1635            try:
1636                sys.path.insert(0, os.path.dirname(testsuite))
1637                return (__import__(os.path.basename(testsuite).replace(".py", "")), None)
1638            except Exception as e:
1639                exceptions.append("Could not load %s: %s" % (testsuite, e))
1640                continue
1641            finally:
1642                sys.path.remove(os.path.dirname(testsuite))
1643
1644        return (None, exceptions)
1645
1646    def _load_testsuites(self):
1647        testsuites = set()
1648        for testsuite in self.options.testsuites:
1649            if testsuite.endswith('.py') and os.path.exists(testsuite):
1650                testsuite = os.path.abspath(os.path.expanduser(testsuite))
1651                loaded_module = self._load_testsuite([testsuite])
1652            else:
1653                possible_testsuites_paths = [os.path.join(d, testsuite + ".py")
1654                                             for d in self.options.testsuites_dirs]
1655                loaded_module = self._load_testsuite(possible_testsuites_paths)
1656
1657            module = loaded_module[0]
1658            if not loaded_module[0]:
1659                if "." in testsuite:
1660                    self.options.testsuites.append(testsuite.split('.')[0])
1661                    self.info("%s looks like a test name, trying that" %
1662                              testsuite)
1663                    self.options.wanted_tests.append(testsuite)
1664                else:
1665                    printc("Could not load testsuite: %s, reasons: %s" % (
1666                        testsuite, loaded_module[1]), Colors.FAIL)
1667                continue
1668
1669            testsuites.add(module)
1670            if not hasattr(module, "TEST_MANAGER"):
1671                module.TEST_MANAGER = [tester.name for tester in self.testers]
1672            elif not isinstance(module.TEST_MANAGER, list):
1673                module.TEST_MANAGER = [module.TEST_MANAGER]
1674
1675        self.options.testsuites = list(testsuites)
1676
1677    def _setup_testsuites(self):
1678        for testsuite in self.options.testsuites:
1679            loaded = False
1680            wanted_test_manager = None
1681            # TEST_MANAGER has been set in _load_testsuites()
1682            assert hasattr(testsuite, "TEST_MANAGER")
1683            wanted_test_manager = testsuite.TEST_MANAGER
1684            if not isinstance(wanted_test_manager, list):
1685                wanted_test_manager = [wanted_test_manager]
1686
1687            for tester in self.testers:
1688                if wanted_test_manager is not None and \
1689                        tester.name not in wanted_test_manager:
1690                    continue
1691
1692                prev_testsuite_name = TestsManager.loading_testsuite
1693                if self.options.user_paths:
1694                    TestsManager.loading_testsuite = tester.name
1695                    tester.register_defaults()
1696                    loaded = True
1697                else:
1698                    TestsManager.loading_testsuite = testsuite.__name__
1699                    if testsuite.setup_tests(tester, self.options):
1700                        loaded = True
1701                if prev_testsuite_name:
1702                    TestsManager.loading_testsuite = prev_testsuite_name
1703
1704            if not loaded:
1705                printc("Could not load testsuite: %s"
1706                       " maybe because of missing TestManager"
1707                       % (testsuite), Colors.FAIL)
1708                return False
1709
1710    def _load_config(self, options):
1711        printc("Loading config files is DEPRECATED"
1712               " you should use the new testsuite format now",)
1713
1714        for tester in self.testers:
1715            tester.options = options
1716            globals()[tester.name] = tester
1717        globals()["options"] = options
1718        c__file__ = __file__
1719        globals()["__file__"] = self.options.config
1720        exec(compile(open(self.options.config).read(),
1721                     self.options.config, 'exec'), globals())
1722        globals()["__file__"] = c__file__
1723
1724    def set_settings(self, options, args):
1725        if options.xunit_file:
1726            self.reporter = reporters.XunitReporter(options)
1727        else:
1728            self.reporter = reporters.Reporter(options)
1729
1730        self.options = options
1731        wanted_testers = None
1732        for tester in self.testers:
1733            if tester.name in args:
1734                wanted_testers = tester.name
1735
1736        if wanted_testers:
1737            testers = self.testers
1738            self.testers = []
1739            for tester in testers:
1740                if tester.name in args:
1741                    self.testers.append(tester)
1742                    args.remove(tester.name)
1743
1744        if options.config:
1745            self._load_config(options)
1746
1747        self._load_testsuites()
1748        if not self.options.testsuites:
1749            printc("Not testsuite loaded!", Colors.FAIL)
1750            return False
1751
1752        for tester in self.testers:
1753            tester.set_settings(options, args, self.reporter)
1754
1755        if not options.config and options.testsuites:
1756            if self._setup_testsuites() is False:
1757                return False
1758
1759        if self.options.check_bugs_status:
1760            printc("-> Checking bugs resolution... ", end='')
1761
1762        for tester in self.testers:
1763            if not tester.check_blacklists():
1764                return False
1765
1766            tester.log_blacklists()
1767
1768            if not tester.check_expected_issues():
1769                return False
1770
1771        if self.options.check_bugs_status:
1772            printc("OK", Colors.OKGREEN)
1773
1774        if self.needs_http_server() or options.httponly is True:
1775            self.httpsrv = HTTPServer(options)
1776            self.httpsrv.start()
1777
1778        if options.no_display:
1779            self.vfb_server = get_virual_frame_buffer_server(options)
1780            res = self.vfb_server.start()
1781            if res[0] is False:
1782                printc("Could not start virtual frame server: %s" % res[1],
1783                       Colors.FAIL)
1784                return False
1785            os.environ["DISPLAY"] = self.vfb_server.display_id
1786
1787        return True
1788
1789    def _check_tester_has_other_testsuite(self, testsuite, tester):
1790        if tester.name != testsuite.TEST_MANAGER[0]:
1791            return True
1792
1793        for t in self.options.testsuites:
1794            if t != testsuite:
1795                for other_testmanager in t.TEST_MANAGER:
1796                    if other_testmanager == tester.name:
1797                        return True
1798
1799        return False
1800
1801    def _check_defined_tests(self, tester, tests):
1802        if self.options.blacklisted_tests or self.options.wanted_tests:
1803            return
1804
1805        tests_names = [test.classname for test in tests]
1806        testlist_changed = False
1807        for testsuite in self.options.testsuites:
1808            if not self._check_tester_has_other_testsuite(testsuite, tester) \
1809                    and tester.check_testslist:
1810                try:
1811                    testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1812                                         'r+')
1813
1814                    know_tests = testlist_file.read().split("\n")
1815                    testlist_file.close()
1816
1817                    testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist",
1818                                         'w')
1819                except IOError:
1820                    continue
1821
1822                optional_out = []
1823                for test in know_tests:
1824                    if test and test.strip('~') not in tests_names:
1825                        if not test.startswith('~'):
1826                            testlist_changed = True
1827                            printc("Test %s Not in testsuite %s anymore"
1828                                   % (test, testsuite.__file__), Colors.FAIL)
1829                        else:
1830                            optional_out.append((test, None))
1831
1832                tests_names = sorted([(test.classname, test) for test in tests] + optional_out,
1833                                     key=lambda x: x[0].strip('~'))
1834
1835                for tname, test in tests_names:
1836                    if test and test.optional:
1837                        tname = '~' + tname
1838                    testlist_file.write("%s\n" % (tname))
1839                    if tname and tname not in know_tests:
1840                        printc("Test %s is NEW in testsuite %s"
1841                               % (tname, testsuite.__file__),
1842                               Colors.FAIL if self.options.fail_on_testlist_change else Colors.OKGREEN)
1843                        testlist_changed = True
1844
1845                testlist_file.close()
1846                break
1847
1848        return testlist_changed
1849
1850    def list_tests(self):
1851        for tester in self.testers:
1852            if not self._tester_needed(tester):
1853                continue
1854
1855            tests = tester.list_tests()
1856            if self._check_defined_tests(tester, tests) and \
1857                    self.options.fail_on_testlist_change:
1858                raise RuntimeError("Unexpected new test in testsuite.")
1859
1860            self.tests.extend(tests)
1861        self.tests.sort(key=lambda test: test.classname)
1862        return self.tests
1863
1864    def _tester_needed(self, tester):
1865        for testsuite in self.options.testsuites:
1866            if tester.name in testsuite.TEST_MANAGER:
1867                return True
1868        return False
1869
1870    def server_wrapper(self, ready):
1871        self.server = GstValidateTCPServer(
1872            ('localhost', 0), GstValidateListener)
1873        self.server.socket.settimeout(None)
1874        self.server.launcher = self
1875        self.serverport = self.server.socket.getsockname()[1]
1876        self.info("%s server port: %s" % (self, self.serverport))
1877        ready.set()
1878
1879        self.server.serve_forever(poll_interval=0.05)
1880
1881    def _start_server(self):
1882        self.info("Starting TCP Server")
1883        ready = threading.Event()
1884        self.server_thread = threading.Thread(target=self.server_wrapper,
1885                                              kwargs={'ready': ready})
1886        self.server_thread.start()
1887        ready.wait()
1888        os.environ["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport
1889
1890    def _stop_server(self):
1891        if self.server:
1892            self.server.shutdown()
1893            self.server_thread.join()
1894            self.server.server_close()
1895            self.server = None
1896
1897    def test_wait(self):
1898        while True:
1899            # Check process every second for timeout
1900            try:
1901                self.queue.get(timeout=1)
1902            except queue.Empty:
1903                pass
1904
1905            for test in self.jobs:
1906                if test.process_update():
1907                    self.jobs.remove(test)
1908                    return test
1909
1910    def tests_wait(self):
1911        try:
1912            test = self.test_wait()
1913            test.check_results()
1914        except KeyboardInterrupt:
1915            for test in self.jobs:
1916                test.kill_subprocess()
1917            raise
1918
1919        return test
1920
1921    def start_new_job(self, tests_left):
1922        try:
1923            test = tests_left.pop(0)
1924        except IndexError:
1925            return False
1926
1927        test.test_start(self.queue)
1928
1929        self.jobs.append(test)
1930
1931        return True
1932
1933    def _run_tests(self, running_tests=None, all_alone=False, retry_on_failures=False):
1934        if not self.all_tests:
1935            self.all_tests = self.list_tests()
1936
1937        if not running_tests:
1938            running_tests = self.tests
1939
1940        self.total_num_tests = len(self.all_tests)
1941        printc("\nRunning %d tests..." % self.total_num_tests, color=Colors.HEADER)
1942
1943        self.reporter.init_timer()
1944        alone_tests = []
1945        tests = []
1946        for test in running_tests:
1947            if test.is_parallel and not all_alone:
1948                tests.append(test)
1949            else:
1950                alone_tests.append(test)
1951
1952        max_num_jobs = min(self.options.num_jobs, len(tests))
1953        jobs_running = 0
1954
1955        # if order of test execution doesn't matter, shuffle
1956        # the order to optimize cpu usage
1957        if self.options.shuffle:
1958            random.shuffle(tests)
1959            random.shuffle(alone_tests)
1960
1961        current_test_num = 1
1962        to_retry = []
1963        for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]:
1964            tests_left = list(tests)
1965            for i in range(num_jobs):
1966                if not self.start_new_job(tests_left):
1967                    break
1968                jobs_running += 1
1969
1970            while jobs_running != 0:
1971                test = self.tests_wait()
1972                jobs_running -= 1
1973                test.number = "[%d / %d] " % (current_test_num,
1974                                              self.total_num_tests)
1975                current_test_num += 1
1976                res = test.test_end(retry_on_failure=retry_on_failures)
1977                to_report = True
1978                if res != Result.PASSED:
1979                    if self.options.forever or self.options.fatal_error:
1980                        return False
1981
1982                    if retry_on_failures:
1983                        if not self.options.redirect_logs and test.allow_flakiness:
1984                            test.copy_logfiles()
1985                        printc(test)
1986                        to_retry.append(test)
1987
1988                        # Not adding to final report if flakiness is tolerated
1989                        to_report = not test.allow_flakiness
1990                if to_report:
1991                    self.reporter.after_test(test)
1992                if retry_on_failures:
1993                    test.clean()
1994                if self.start_new_job(tests_left):
1995                    jobs_running += 1
1996
1997        if to_retry:
1998            printc("--> Rerunning the following tests to see if they are flaky:", Colors.WARNING)
1999            for test in to_retry:
2000                printc('  * %s' % test.classname)
2001            printc('')
2002            return self._run_tests(to_retry, all_alone=True, retry_on_failures=False)
2003
2004        return True
2005
2006    def clean_tests(self, stop_server=False):
2007        for test in self.tests:
2008            test.clean()
2009        if stop_server:
2010            self._stop_server()
2011
2012    def run_tests(self):
2013        r = 0
2014        try:
2015            self._start_server()
2016            if self.options.forever:
2017                r = 1
2018                while True:
2019                    printc("-> Iteration %d" % r, end='\r')
2020
2021                    if not self._run_tests():
2022                        break
2023                    r += 1
2024                    self.clean_tests()
2025                    msg = "-> Iteration %d... %sOK%s" % (r, Colors.OKGREEN, Colors.ENDC)
2026                    printc(msg, end="\r")
2027
2028                return False
2029            elif self.options.n_runs:
2030                res = True
2031                for r in range(self.options.n_runs):
2032                    printc("-> Iteration %d" % r, end='\r')
2033                    if not self._run_tests():
2034                        res = False
2035                        printc("ERROR", Colors.FAIL, end="\r")
2036                    else:
2037                        printc("OK", Colors.OKGREEN, end="\r")
2038                    self.clean_tests()
2039
2040                return res
2041            else:
2042                return self._run_tests(retry_on_failures=self.options.retry_on_failures)
2043        finally:
2044            if self.options.forever:
2045                printc("\n-> Ran %d times" % r)
2046            if self.httpsrv:
2047                self.httpsrv.stop()
2048            if self.vfb_server:
2049                self.vfb_server.stop()
2050            self.clean_tests(True)
2051
2052    def final_report(self):
2053        return self.reporter.final_report()
2054
2055    def needs_http_server(self):
2056        for tester in self.testers:
2057            if tester.needs_http_server():
2058                return True
2059
2060
2061class NamedDic(object):
2062
2063    def __init__(self, props):
2064        if props:
2065            for name, value in props.items():
2066                setattr(self, name, value)
2067
2068
2069class Scenario(object):
2070
2071    def __init__(self, name, props, path=None):
2072        self.name = name
2073        self.path = path
2074
2075        for prop, value in props:
2076            setattr(self, prop.replace("-", "_"), value)
2077
2078    def get_execution_name(self):
2079        if self.path is not None:
2080            return self.path
2081        else:
2082            return self.name
2083
2084    def seeks(self):
2085        if hasattr(self, "seek"):
2086            return bool(self.seek)
2087
2088        return False
2089
2090    def needs_clock_sync(self):
2091        if hasattr(self, "need_clock_sync"):
2092            return bool(self.need_clock_sync)
2093
2094        return False
2095
2096    def needs_live_content(self):
2097        # Scenarios that can only be used on live content
2098        if hasattr(self, "live_content_required"):
2099            return bool(self.live_content_required)
2100        return False
2101
2102    def compatible_with_live_content(self):
2103        # if a live content is required it's implicitely compatible with
2104        # live content
2105        if self.needs_live_content():
2106            return True
2107        if hasattr(self, "live_content_compatible"):
2108            return bool(self.live_content_compatible)
2109        return False
2110
2111    def get_min_media_duration(self):
2112        if hasattr(self, "min_media_duration"):
2113            return float(self.min_media_duration)
2114
2115        return 0
2116
2117    def does_reverse_playback(self):
2118        if hasattr(self, "reverse_playback"):
2119            return bool(self.reverse_playback)
2120
2121        return False
2122
2123    def get_duration(self):
2124        try:
2125            return float(getattr(self, "duration"))
2126        except AttributeError:
2127            return 0
2128
2129    def get_min_tracks(self, track_type):
2130        try:
2131            return int(getattr(self, "min_%s_track" % track_type))
2132        except AttributeError:
2133            return 0
2134
2135    def __repr__(self):
2136        return "<Scenario %s>" % self.name
2137
2138
2139class ScenarioManager(Loggable):
2140    _instance = None
2141    all_scenarios = []
2142
2143    FILE_EXTENSION = "scenario"
2144
2145    def __new__(cls, *args, **kwargs):
2146        if not cls._instance:
2147            cls._instance = super(ScenarioManager, cls).__new__(
2148                cls, *args, **kwargs)
2149            cls._instance.config = None
2150            cls._instance.discovered = False
2151            Loggable.__init__(cls._instance)
2152
2153        return cls._instance
2154
2155    def find_special_scenarios(self, mfile):
2156        scenarios = []
2157        mfile_bname = os.path.basename(mfile)
2158
2159        for f in os.listdir(os.path.dirname(mfile)):
2160            if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f):
2161                scenarios.append(os.path.join(os.path.dirname(mfile), f))
2162
2163        if scenarios:
2164            scenarios = self.discover_scenarios(scenarios, mfile)
2165
2166        return scenarios
2167
2168    def discover_scenarios(self, scenario_paths=[], mfile=None):
2169        """
2170        Discover scenarios specified in scenario_paths or the default ones
2171        if nothing specified there
2172        """
2173        scenarios = []
2174        scenario_defs = os.path.join(self.config.main_dir, "scenarios.def")
2175        logs = open(os.path.join(self.config.logsdir,
2176                                 "scenarios_discovery.log"), 'w')
2177
2178        try:
2179            command = [GstValidateBaseTestManager.COMMAND,
2180                       "--scenarios-defs-output-file", scenario_defs]
2181            command.extend(scenario_paths)
2182            subprocess.check_call(command, stdout=logs, stderr=logs)
2183        except subprocess.CalledProcessError as e:
2184            self.error(e)
2185            pass
2186
2187        config = configparser.RawConfigParser()
2188        f = open(scenario_defs)
2189        config.readfp(f)
2190
2191        for section in config.sections():
2192            if scenario_paths:
2193                for scenario_path in scenario_paths:
2194                    if section in os.path.splitext(os.path.basename(scenario_path))[0]:
2195                        if mfile is None:
2196                            name = section
2197                            path = scenario_path
2198                        else:
2199                            # The real name of the scenario is:
2200                            # filename.REALNAME.scenario
2201                            name = scenario_path.replace(mfile + ".", "").replace(
2202                                "." + self.FILE_EXTENSION, "")
2203                            path = scenario_path
2204            else:
2205                name = section
2206                path = None
2207
2208            props = config.items(section)
2209            scenarios.append(Scenario(name, props, path))
2210
2211        if not scenario_paths:
2212            self.discovered = True
2213            self.all_scenarios.extend(scenarios)
2214
2215        return scenarios
2216
2217    def get_scenario(self, name):
2218        if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION):
2219            scenarios = self.discover_scenarios([name])
2220
2221            if scenarios:
2222                return scenarios[0]
2223
2224        if self.discovered is False:
2225            self.discover_scenarios()
2226
2227        if name is None:
2228            return self.all_scenarios
2229
2230        try:
2231            return [scenario for scenario in self.all_scenarios if scenario.name == name][0]
2232        except IndexError:
2233            self.warning("Scenario: %s not found" % name)
2234            return None
2235
2236
2237class GstValidateBaseTestManager(TestsManager):
2238    scenarios_manager = ScenarioManager()
2239    features_cache = {}
2240
2241    def __init__(self):
2242        super(GstValidateBaseTestManager, self).__init__()
2243        self._scenarios = []
2244        self._encoding_formats = []
2245
2246    @classmethod
2247    def update_commands(cls, extra_paths=None):
2248        for varname, cmd in {'': 'gst-validate',
2249                             'TRANSCODING_': 'gst-validate-transcoding',
2250                             'MEDIA_CHECK_': 'gst-validate-media-check',
2251                             'RTSP_SERVER_': 'gst-validate-rtsp-server',
2252                             'INSPECT_': 'gst-inspect'}.items():
2253            setattr(cls, varname + 'COMMAND', which(cmd + '-1.0', extra_paths))
2254
2255    @classmethod
2256    def has_feature(cls, featurename):
2257        try:
2258            return cls.features_cache[featurename]
2259        except KeyError:
2260            pass
2261
2262        try:
2263            subprocess.check_output([cls.INSPECT_COMMAND, featurename])
2264            res = True
2265        except subprocess.CalledProcessError:
2266            res = False
2267
2268        cls.features_cache[featurename] = res
2269        return res
2270
2271    def add_scenarios(self, scenarios):
2272        """
2273        @scenarios A list or a unic scenario name(s) to be run on the tests.
2274                    They are just the default scenarios, and then depending on
2275                    the TestsGenerator to be used you can have more fine grained
2276                    control on what to be run on each serie of tests.
2277        """
2278        if isinstance(scenarios, list):
2279            self._scenarios.extend(scenarios)
2280        else:
2281            self._scenarios.append(scenarios)
2282
2283        self._scenarios = list(set(self._scenarios))
2284
2285    def set_scenarios(self, scenarios):
2286        """
2287        Override the scenarios
2288        """
2289        self._scenarios = []
2290        self.add_scenarios(scenarios)
2291
2292    def get_scenarios(self):
2293        return self._scenarios
2294
2295    def add_encoding_formats(self, encoding_formats):
2296        """
2297        :param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output
2298                           formats for transcoding test.
2299                           They are just the default encoding formats, and then depending on
2300                           the TestsGenerator to be used you can have more fine grained
2301                           control on what to be run on each serie of tests.
2302        """
2303        if isinstance(encoding_formats, list):
2304            self._encoding_formats.extend(encoding_formats)
2305        else:
2306            self._encoding_formats.append(encoding_formats)
2307
2308        self._encoding_formats = list(set(self._encoding_formats))
2309
2310    def get_encoding_formats(self):
2311        return self._encoding_formats
2312
2313
2314GstValidateBaseTestManager.update_commands()
2315
2316
2317class MediaDescriptor(Loggable):
2318
2319    def __init__(self):
2320        Loggable.__init__(self)
2321
2322    def get_path(self):
2323        raise NotImplemented
2324
2325    def has_frames(self):
2326        return False
2327
2328    def get_media_filepath(self):
2329        raise NotImplemented
2330
2331    def skip_parsers(self):
2332        return False
2333
2334    def get_caps(self):
2335        raise NotImplemented
2336
2337    def get_uri(self):
2338        raise NotImplemented
2339
2340    def get_duration(self):
2341        raise NotImplemented
2342
2343    def get_protocol(self):
2344        raise NotImplemented
2345
2346    def is_seekable(self):
2347        raise NotImplemented
2348
2349    def is_live(self):
2350        raise NotImplemented
2351
2352    def is_image(self):
2353        raise NotImplemented
2354
2355    def get_num_tracks(self, track_type):
2356        raise NotImplemented
2357
2358    def can_play_reverse(self):
2359        raise NotImplemented
2360
2361    def prerrols(self):
2362        return True
2363
2364    def is_compatible(self, scenario):
2365        if scenario is None:
2366            return True
2367
2368        if scenario.seeks() and (not self.is_seekable() or self.is_image()):
2369            self.debug("Do not run %s as %s does not support seeking",
2370                       scenario, self.get_uri())
2371            return False
2372
2373        if self.is_image() and scenario.needs_clock_sync():
2374            self.debug("Do not run %s as %s is an image",
2375                       scenario, self.get_uri())
2376            return False
2377
2378        if not self.can_play_reverse() and scenario.does_reverse_playback():
2379            return False
2380
2381        if not self.is_live() and scenario.needs_live_content():
2382            self.debug("Do not run %s as %s is not a live content",
2383                       scenario, self.get_uri())
2384            return False
2385
2386        if self.is_live() and not scenario.compatible_with_live_content():
2387            self.debug("Do not run %s as %s is a live content",
2388                       scenario, self.get_uri())
2389            return False
2390
2391        if not self.prerrols() and getattr(scenario, 'needs_preroll', False):
2392            return False
2393
2394        if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration():
2395            self.debug(
2396                "Do not run %s as %s is too short (%i < min media duation : %i",
2397                scenario, self.get_uri(),
2398                self.get_duration() / GST_SECOND,
2399                scenario.get_min_media_duration())
2400            return False
2401
2402        for track_type in ['audio', 'subtitle', 'video']:
2403            if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type):
2404                self.debug("%s -- %s | At least %s %s track needed  < %s"
2405                           % (scenario, self.get_uri(), track_type,
2406                              scenario.get_min_tracks(track_type),
2407                              self.get_num_tracks(track_type)))
2408                return False
2409
2410        return True
2411
2412
2413class GstValidateMediaDescriptor(MediaDescriptor):
2414    # Some extension file for discovering results
2415    MEDIA_INFO_EXT = "media_info"
2416    PUSH_MEDIA_INFO_EXT = "media_info.push"
2417    STREAM_INFO_EXT = "stream_info"
2418
2419    def __init__(self, xml_path):
2420        super(GstValidateMediaDescriptor, self).__init__()
2421
2422        self._xml_path = xml_path
2423        try:
2424            media_xml = ET.parse(xml_path).getroot()
2425        except xml.etree.ElementTree.ParseError:
2426            printc("Could not parse %s" % xml_path,
2427                   Colors.FAIL)
2428            raise
2429
2430        self._extract_data(media_xml)
2431
2432        self.set_protocol(urllib.parse.urlparse(
2433            urllib.parse.urlparse(self.get_uri()).scheme).scheme)
2434
2435    def skip_parsers(self):
2436        return self._skip_parsers
2437
2438    def has_frames(self):
2439        return self._has_frames
2440
2441    def _extract_data(self, media_xml):
2442        # Extract the information we need from the xml
2443        self._caps = media_xml.findall("streams")[0].attrib["caps"]
2444        self._track_caps = []
2445        try:
2446            streams = media_xml.findall("streams")[0].findall("stream")
2447        except IndexError:
2448            pass
2449        else:
2450            for stream in streams:
2451                self._track_caps.append(
2452                    (stream.attrib["type"], stream.attrib["caps"]))
2453        self._uri = media_xml.attrib["uri"]
2454        self._skip_parsers = bool(int(media_xml.attrib.get('skip-parsers', 0)))
2455        self._has_frames = bool(int(media_xml.attrib["frame-detection"]))
2456        self._duration = int(media_xml.attrib["duration"])
2457        self._protocol = media_xml.get("protocol", None)
2458        self._is_seekable = media_xml.attrib["seekable"].lower() == "true"
2459        self._is_live = media_xml.get("live", "false").lower() == "true"
2460        self._is_image = False
2461        for stream in media_xml.findall("streams")[0].findall("stream"):
2462            if stream.attrib["type"] == "image":
2463                self._is_image = True
2464        self._track_types = []
2465        for stream in media_xml.findall("streams")[0].findall("stream"):
2466            self._track_types.append(stream.attrib["type"])
2467
2468    @staticmethod
2469    def new_from_uri(uri, verbose=False, include_frames=False, is_push=False):
2470        """
2471            include_frames = 0 # Never
2472            include_frames = 1 # always
2473            include_frames = 2 # if previous file included them
2474
2475        """
2476        media_path = utils.url2path(uri)
2477
2478        ext = GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT if is_push else \
2479            GstValidateMediaDescriptor.MEDIA_INFO_EXT
2480        descriptor_path = "%s.%s" % (media_path, ext)
2481        args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
2482        args.append(uri)
2483        if include_frames == 2:
2484            try:
2485                media_xml = ET.parse(descriptor_path).getroot()
2486
2487                include_frames = bool(int(media_xml.attrib["frame-detection"]))
2488                if bool(int(media_xml.attrib.get("skip-parsers", 0))):
2489                    args.append("--skip-parsers")
2490            except FileNotFoundError:
2491                pass
2492        else:
2493            include_frames = bool(include_frames)
2494
2495        args.extend(["--output-file", descriptor_path])
2496        if include_frames:
2497            args.extend(["--full"])
2498
2499        if verbose:
2500            printc("Generating media info for %s\n"
2501                   "    Command: '%s'" % (media_path, ' '.join(args)),
2502                   Colors.OKBLUE)
2503
2504        try:
2505            subprocess.check_output(args, stderr=open(os.devnull))
2506        except subprocess.CalledProcessError as e:
2507            if verbose:
2508                printc("Result: Failed", Colors.FAIL)
2509            else:
2510                loggable.warning("GstValidateMediaDescriptor",
2511                                 "Exception: %s" % e)
2512            return None
2513
2514        if verbose:
2515            printc("Result: Passed", Colors.OKGREEN)
2516
2517        try:
2518            return GstValidateMediaDescriptor(descriptor_path)
2519        except (IOError, xml.etree.ElementTree.ParseError):
2520            return None
2521
2522    def get_path(self):
2523        return self._xml_path
2524
2525    def need_clock_sync(self):
2526        return Protocols.needs_clock_sync(self.get_protocol())
2527
2528    def get_media_filepath(self):
2529        if self.get_protocol() == Protocols.FILE:
2530            return self._xml_path.replace("." + self.MEDIA_INFO_EXT, "")
2531        elif self.get_protocol() == Protocols.PUSHFILE:
2532            return self._xml_path.replace("." + self.PUSH_MEDIA_INFO_EXT, "")
2533        else:
2534            return self._xml_path.replace("." + self.STREAM_INFO_EXT, "")
2535
2536    def get_caps(self):
2537        return self._caps
2538
2539    def get_tracks_caps(self):
2540        return self._track_caps
2541
2542    def get_uri(self):
2543        return self._uri
2544
2545    def get_duration(self):
2546        return self._duration
2547
2548    def set_protocol(self, protocol):
2549        if self._xml_path.endswith(GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT):
2550            self._protocol = Protocols.PUSHFILE
2551        else:
2552            self._protocol = protocol
2553
2554    def get_protocol(self):
2555        return self._protocol
2556
2557    def is_seekable(self):
2558        return self._is_seekable
2559
2560    def is_live(self):
2561        return self._is_live
2562
2563    def can_play_reverse(self):
2564        return True
2565
2566    def is_image(self):
2567        return self._is_image
2568
2569    def get_num_tracks(self, track_type):
2570        n = 0
2571        for t in self._track_types:
2572            if t == track_type:
2573                n += 1
2574
2575        return n
2576
2577    def get_clean_name(self):
2578        name = os.path.basename(self.get_path())
2579        name = re.sub("\.stream_info|\.media_info", "", name)
2580
2581        return name.replace('.', "_")
2582
2583
2584class MediaFormatCombination(object):
2585    FORMATS = {"aac": "audio/mpeg,mpegversion=4",  # Audio
2586               "ac3": "audio/x-ac3",
2587               "vorbis": "audio/x-vorbis",
2588               "mp3": "audio/mpeg,mpegversion=1,layer=3",
2589               "opus": "audio/x-opus",
2590               "rawaudio": "audio/x-raw",
2591
2592               # Video
2593               "h264": "video/x-h264",
2594               "h265": "video/x-h265",
2595               "vp8": "video/x-vp8",
2596               "vp9": "video/x-vp9",
2597               "theora": "video/x-theora",
2598               "prores": "video/x-prores",
2599               "jpeg": "image/jpeg",
2600
2601               # Containers
2602               "webm": "video/webm",
2603               "ogg": "application/ogg",
2604               "mkv": "video/x-matroska",
2605               "mp4": "video/quicktime,variant=iso;",
2606               "quicktime": "video/quicktime;"}
2607
2608    def __str__(self):
2609        return "%s and %s in %s" % (self.audio, self.video, self.container)
2610
2611    def __init__(self, container, audio, video):
2612        """
2613        Describes a media format to be used for transcoding tests.
2614
2615        :param container: A string defining the container format to be used, must bin in self.FORMATS
2616        :param audio: A string defining the audio format to be used, must bin in self.FORMATS
2617        :param video: A string defining the video format to be used, must bin in self.FORMATS
2618        """
2619        self.container = container
2620        self.audio = audio
2621        self.video = video
2622
2623    def get_caps(self, track_type):
2624        try:
2625            return self.FORMATS[self.__dict__[track_type]]
2626        except KeyError:
2627            return None
2628
2629    def get_audio_caps(self):
2630        return self.get_caps("audio")
2631
2632    def get_video_caps(self):
2633        return self.get_caps("video")
2634
2635    def get_muxer_caps(self):
2636        return self.get_caps("container")
2637