1#!/usr/bin/env python3 2# 3# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com> 4# 5# This program is free software; you can redistribute it and/or 6# modify it under the terms of the GNU Lesser General Public 7# License as published by the Free Software Foundation; either 8# version 2.1 of the License, or (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13# Lesser General Public License for more details. 14# 15# You should have received a copy of the GNU Lesser General Public 16# License along with this program; if not, write to the 17# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, 18# Boston, MA 02110-1301, USA. 19 20""" Class representing tests and test managers. """ 21 22import json 23import os 24import sys 25import re 26import copy 27import shlex 28import socketserver 29import struct 30import time 31from . import utils 32import signal 33import urllib.parse 34import subprocess 35import threading 36import queue 37import configparser 38import xml 39import random 40import shutil 41import uuid 42 43from .utils import which 44from . import reporters 45from . import loggable 46from .loggable import Loggable 47 48from collections import defaultdict 49try: 50 from lxml import etree as ET 51except ImportError: 52 import xml.etree.cElementTree as ET 53 54try: 55 import mdv 56except ImportError: 57 mdv = None 58 59from .vfb_server import get_virual_frame_buffer_server 60from .httpserver import HTTPServer 61from .utils import mkdir, Result, Colors, printc, DEFAULT_TIMEOUT, GST_SECOND, \ 62 Protocols, look_for_file_in_source_dir, get_data_file, BackTraceGenerator, \ 63 check_bugs_resolution, is_tty 64 65# The factor by which we increase the hard timeout when running inside 66# Valgrind 67GDB_TIMEOUT_FACTOR = VALGRIND_TIMEOUT_FACTOR = 20 68TIMEOUT_FACTOR = float(os.environ.get("TIMEOUT_FACTOR", 1)) 69# The error reported by valgrind when detecting errors 70VALGRIND_ERROR_CODE = 20 71 72VALIDATE_OVERRIDE_EXTENSION = ".override" 73EXITING_SIGNALS = dict([(-getattr(signal, s), s) for s in [ 74 'SIGQUIT', 'SIGILL', 'SIGABRT', 'SIGFPE', 'SIGSEGV', 'SIGBUS', 'SIGSYS', 75 'SIGTRAP', 'SIGXCPU', 'SIGXFSZ', 'SIGIOT'] if hasattr(signal, s)]) 76EXITING_SIGNALS.update({139: "SIGSEGV"}) 77EXITING_SIGNALS.update({(v, k) for k, v in EXITING_SIGNALS.items()}) 78 79 80CI_ARTIFACTS_URL = os.environ.get('CI_ARTIFACTS_URL') 81 82 83class Test(Loggable): 84 85 """ A class representing a particular test. """ 86 87 def __init__(self, application_name, classname, options, 88 reporter, duration=0, timeout=DEFAULT_TIMEOUT, 89 hard_timeout=None, extra_env_variables=None, 90 expected_issues=None, is_parallel=True, 91 workdir=None): 92 """ 93 @timeout: The timeout during which the value return by get_current_value 94 keeps being exactly equal 95 @hard_timeout: Max time the test can take in absolute 96 """ 97 Loggable.__init__(self) 98 self.timeout = timeout * TIMEOUT_FACTOR * options.timeout_factor 99 if hard_timeout: 100 self.hard_timeout = hard_timeout * TIMEOUT_FACTOR 101 self.hard_timeout *= options.timeout_factor 102 else: 103 self.hard_timeout = hard_timeout 104 self.classname = classname 105 self.options = options 106 self.application = application_name 107 self.command = [] 108 self.server_command = None 109 self.reporter = reporter 110 self.process = None 111 self.proc_env = None 112 self.thread = None 113 self.queue = None 114 self.duration = duration 115 self.stack_trace = None 116 self._uuid = None 117 if expected_issues is None: 118 self.expected_issues = [] 119 elif not isinstance(expected_issues, list): 120 self.expected_issues = [expected_issues] 121 else: 122 self.expected_issues = expected_issues 123 124 extra_env_variables = extra_env_variables or {} 125 self.extra_env_variables = extra_env_variables 126 self.optional = False 127 self.is_parallel = is_parallel 128 self.generator = None 129 # String representation of the test number in the testsuite 130 self.number = "" 131 self.workdir = workdir 132 self.allow_flakiness = False 133 self.html_log = None 134 135 self.clean() 136 137 def _generate_expected_issues(self): 138 return '' 139 140 def generate_expected_issues(self): 141 res = '%s"FIXME \'%s\' issues [REPORT A BUG ' % (" " * 4, self.classname) \ 142 + 'in https://gitlab.freedesktop.org/gstreamer/ '\ 143 + 'or use a proper bug description]": {' 144 res += """ 145 "tests": [ 146 "%s" 147 ], 148 "issues": [""" % (self.classname) 149 150 retcode = self.process.returncode if self.process else 0 151 if retcode != 0: 152 signame = EXITING_SIGNALS.get(retcode) 153 val = "'" + signame + "'" if signame else retcode 154 res += """\n { 155 '%s': %s, 156 'sometimes': True, 157 },""" % ("signame" if signame else "returncode", val) 158 159 res += self._generate_expected_issues() 160 res += "\n%s],\n%s},\n" % (" " * 8, " " * 4) 161 162 return res 163 164 def clean(self): 165 self.kill_subprocess() 166 self.message = "" 167 self.error_str = "" 168 self.time_taken = 0.0 169 self._starting_time = None 170 self.result = Result.NOT_RUN 171 self.logfile = None 172 self.out = None 173 self.extra_logfiles = set() 174 self.__env_variable = [] 175 self.kill_subprocess() 176 self.process = None 177 178 def __str__(self): 179 string = self.classname 180 if self.result != Result.NOT_RUN: 181 string += ": " + self.result 182 if self.result in [Result.FAILED, Result.TIMEOUT]: 183 string += " '%s'" % self.message 184 if not self.options.dump_on_failure: 185 if not self.options.redirect_logs and self.result != Result.PASSED: 186 string += self.get_logfile_repr() 187 else: 188 string = "\n==> %s" % string 189 190 return string 191 192 def add_env_variable(self, variable, value=None): 193 """ 194 Only usefull so that the gst-validate-launcher can print the exact 195 right command line to reproduce the tests 196 """ 197 if value is None: 198 value = os.environ.get(variable, None) 199 200 if value is None: 201 return 202 203 self.__env_variable.append(variable) 204 205 @property 206 def _env_variable(self): 207 res = "" 208 for var in set(self.__env_variable): 209 if res: 210 res += " " 211 value = self.proc_env.get(var, None) 212 if value is not None: 213 res += "%s='%s'" % (var, value) 214 215 return res 216 217 def open_logfile(self): 218 if self.out: 219 return 220 221 path = os.path.join(self.options.logsdir, 222 self.classname.replace(".", os.sep) + '.md') 223 mkdir(os.path.dirname(path)) 224 self.logfile = path 225 226 if self.options.redirect_logs == 'stdout': 227 self.out = sys.stdout 228 elif self.options.redirect_logs == 'stderr': 229 self.out = sys.stderr 230 else: 231 self.out = open(path, 'w+') 232 233 def finalize_logfiles(self): 234 if not self.options.redirect_logs: 235 self.out.flush() 236 for logfile in self.extra_logfiles: 237 self.out.write('\n\n## %s:\n\n```\n%s\n```\n' % ( 238 os.path.basename(logfile), self.get_extra_log_content(logfile)) 239 ) 240 self.out.flush() 241 self.out.close() 242 243 if self.options.html: 244 self.html_log = os.path.splitext(self.logfile)[0] + '.html' 245 import commonmark 246 parser = commonmark.Parser() 247 with open(self.logfile) as f: 248 ast = parser.parse(f.read()) 249 250 renderer = commonmark.HtmlRenderer() 251 html = renderer.render(ast) 252 with open(self.html_log, 'w') as f: 253 f.write(html) 254 255 self.out = None 256 257 def _get_file_content(self, file_name): 258 f = open(file_name, 'r+') 259 value = f.read() 260 f.close() 261 262 return value 263 264 def get_log_content(self): 265 return self._get_file_content(self.logfile) 266 267 def get_extra_log_content(self, extralog): 268 if extralog not in self.extra_logfiles: 269 return "" 270 271 return self._get_file_content(extralog) 272 273 def get_classname(self): 274 name = self.classname.split('.')[-1] 275 classname = self.classname.replace('.%s' % name, '') 276 277 return classname 278 279 def get_name(self): 280 return self.classname.split('.')[-1] 281 282 def get_uuid(self): 283 if self._uuid is None: 284 self._uuid = self.classname + str(uuid.uuid4()) 285 return self._uuid 286 287 def add_arguments(self, *args): 288 self.command += args 289 290 def build_arguments(self): 291 self.add_env_variable("LD_PRELOAD") 292 self.add_env_variable("DISPLAY") 293 294 def add_stack_trace_to_logfile(self): 295 self.debug("Adding stack trace") 296 trace_gatherer = BackTraceGenerator.get_default() 297 stack_trace = trace_gatherer.get_trace(self) 298 299 if not stack_trace: 300 return 301 302 info = "\n\n## Stack trace\n\n```\n%s\n```" % stack_trace 303 if self.options.redirect_logs: 304 print(info) 305 return 306 307 if self.options.xunit_file: 308 self.stack_trace = stack_trace 309 310 self.out.write(info) 311 self.out.flush() 312 313 def add_known_issue_information(self): 314 if self.expected_issues: 315 info = "\n\n## Already known issues\n\n``` python\n%s\n```\n\n" % ( 316 json.dumps(self.expected_issues, indent=4) 317 ) 318 else: 319 info = "" 320 321 info += "\n\n**You can mark the issues as 'known' by adding the " \ 322 + " following lines to the list of known issues**\n" \ 323 + "\n\n``` python\n%s\n```" % (self.generate_expected_issues()) 324 325 if self.options.redirect_logs: 326 print(info) 327 return 328 329 self.out.write(info) 330 331 def set_result(self, result, message="", error=""): 332 333 if not self.options.redirect_logs: 334 self.out.write("\n```\n") 335 self.out.flush() 336 337 self.debug("Setting result: %s (message: %s, error: %s)" % (result, 338 message, error)) 339 340 if result is Result.TIMEOUT: 341 if self.options.debug is True: 342 if self.options.gdb: 343 printc("Timeout, you should process <ctrl>c to get into gdb", 344 Colors.FAIL) 345 # and wait here until gdb exits 346 self.process.communicate() 347 else: 348 pname = self.command[0] 349 input("%sTimeout happened on %s you can attach gdb doing:\n $gdb %s %d%s\n" 350 "Press enter to continue" % (Colors.FAIL, self.classname, 351 pname, self.process.pid, Colors.ENDC)) 352 else: 353 self.add_stack_trace_to_logfile() 354 355 self.result = result 356 self.message = message 357 self.error_str = error 358 359 if result not in [Result.PASSED, Result.NOT_RUN]: 360 self.add_known_issue_information() 361 362 def check_results(self): 363 if self.result is Result.FAILED or self.result is Result.TIMEOUT: 364 return 365 366 self.debug("%s returncode: %s", self, self.process.returncode) 367 if self.process.returncode == 0: 368 self.set_result(Result.PASSED) 369 elif self.process.returncode in EXITING_SIGNALS: 370 self.add_stack_trace_to_logfile() 371 self.set_result(Result.FAILED, 372 "Application exited with signal %s" % ( 373 EXITING_SIGNALS[self.process.returncode])) 374 elif self.process.returncode == VALGRIND_ERROR_CODE: 375 self.set_result(Result.FAILED, "Valgrind reported errors") 376 else: 377 self.set_result(Result.FAILED, 378 "Application returned %d" % (self.process.returncode)) 379 380 def get_current_value(self): 381 """ 382 Lets subclasses implement a nicer timeout measurement method 383 They should return some value with which we will compare 384 the previous and timeout if they are egual during self.timeout 385 seconds 386 """ 387 return Result.NOT_RUN 388 389 def process_update(self): 390 """ 391 Returns True when process has finished running or has timed out. 392 """ 393 394 if self.process is None: 395 # Process has not started running yet 396 return False 397 398 self.process.poll() 399 if self.process.returncode is not None: 400 return True 401 402 val = self.get_current_value() 403 404 self.debug("Got value: %s" % val) 405 if val is Result.NOT_RUN: 406 # The get_current_value logic is not implemented... dumb 407 # timeout 408 if time.time() - self.last_change_ts > self.timeout: 409 self.set_result(Result.TIMEOUT, 410 "Application timed out: %s secs" % 411 self.timeout, 412 "timeout") 413 return True 414 return False 415 elif val is Result.FAILED: 416 return True 417 elif val is Result.KNOWN_ERROR: 418 return True 419 420 self.log("New val %s" % val) 421 422 if val == self.last_val: 423 delta = time.time() - self.last_change_ts 424 self.debug("%s: Same value for %d/%d seconds" % 425 (self, delta, self.timeout)) 426 if delta > self.timeout: 427 self.set_result(Result.TIMEOUT, 428 "Application timed out: %s secs" % 429 self.timeout, 430 "timeout") 431 return True 432 elif self.hard_timeout and time.time() - self.start_ts > self.hard_timeout: 433 self.set_result( 434 Result.TIMEOUT, "Hard timeout reached: %d secs" % self.hard_timeout) 435 return True 436 else: 437 self.last_change_ts = time.time() 438 self.last_val = val 439 440 return False 441 442 def get_subproc_env(self): 443 return os.environ.copy() 444 445 def kill_subprocess(self): 446 utils.kill_subprocess(self, self.process, DEFAULT_TIMEOUT) 447 448 def run_external_checks(self): 449 pass 450 451 def thread_wrapper(self): 452 def enable_sigint(): 453 # Restore the SIGINT handler for the child process (gdb) to ensure 454 # it can handle it. 455 signal.signal(signal.SIGINT, signal.SIG_DFL) 456 457 if self.options.gdb and os.name != "nt": 458 preexec_fn = enable_sigint 459 else: 460 preexec_fn = None 461 462 self.process = subprocess.Popen(self.command, 463 stderr=self.out, 464 stdout=self.out, 465 env=self.proc_env, 466 cwd=self.workdir, 467 preexec_fn=preexec_fn) 468 self.process.wait() 469 if self.result is not Result.TIMEOUT: 470 if self.process.returncode == 0: 471 self.run_external_checks() 472 self.queue.put(None) 473 474 def get_valgrind_suppression_file(self, subdir, name): 475 p = get_data_file(subdir, name) 476 if p: 477 return p 478 479 self.error("Could not find any %s file" % name) 480 481 def get_valgrind_suppressions(self): 482 return [self.get_valgrind_suppression_file('data', 'gstvalidate.supp')] 483 484 def use_gdb(self, command): 485 if self.hard_timeout is not None: 486 self.hard_timeout *= GDB_TIMEOUT_FACTOR 487 self.timeout *= GDB_TIMEOUT_FACTOR 488 489 if not self.options.gdb_non_stop: 490 self.timeout = sys.maxsize 491 self.hard_timeout = sys.maxsize 492 493 args = ["gdb"] 494 if self.options.gdb_non_stop: 495 args += ["-ex", "run", "-ex", "backtrace", "-ex", "quit"] 496 args += ["--args"] + command 497 return args 498 499 def use_valgrind(self, command, subenv): 500 vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind' 501 self.extra_logfiles.add(vglogsfile) 502 503 vg_args = [] 504 505 for o, v in [('trace-children', 'yes'), 506 ('tool', 'memcheck'), 507 ('leak-check', 'full'), 508 ('leak-resolution', 'high'), 509 # TODO: errors-for-leak-kinds should be set to all instead of definite 510 # and all false positives should be added to suppression 511 # files. 512 ('errors-for-leak-kinds', 'definite,indirect'), 513 ('show-leak-kinds', 'definite,indirect'), 514 ('show-possibly-lost', 'no'), 515 ('num-callers', '20'), 516 ('error-exitcode', str(VALGRIND_ERROR_CODE)), 517 ('gen-suppressions', 'all')]: 518 vg_args.append("--%s=%s" % (o, v)) 519 520 if not self.options.redirect_logs: 521 vglogsfile = os.path.splitext(self.logfile)[0] + '.valgrind' 522 self.extra_logfiles.add(vglogsfile) 523 vg_args.append("--%s=%s" % ('log-file', vglogsfile)) 524 525 for supp in self.get_valgrind_suppressions(): 526 vg_args.append("--suppressions=%s" % supp) 527 528 command = ["valgrind"] + vg_args + command 529 530 # Tune GLib's memory allocator to be more valgrind friendly 531 subenv['G_DEBUG'] = 'gc-friendly' 532 subenv['G_SLICE'] = 'always-malloc' 533 534 if self.hard_timeout is not None: 535 self.hard_timeout *= VALGRIND_TIMEOUT_FACTOR 536 self.timeout *= VALGRIND_TIMEOUT_FACTOR 537 538 # Enable 'valgrind.config' 539 self.add_validate_config(get_data_file( 540 'data', 'valgrind.config'), subenv) 541 if subenv == self.proc_env: 542 self.add_env_variable('G_DEBUG', 'gc-friendly') 543 self.add_env_variable('G_SLICE', 'always-malloc') 544 self.add_env_variable('GST_VALIDATE_CONFIG', 545 self.proc_env['GST_VALIDATE_CONFIG']) 546 547 return command 548 549 def add_validate_config(self, config, subenv=None): 550 if not subenv: 551 subenv = self.extra_env_variables 552 553 if "GST_VALIDATE_CONFIG" in subenv: 554 subenv['GST_VALIDATE_CONFIG'] = '%s%s%s' % ( 555 subenv['GST_VALIDATE_CONFIG'], os.pathsep, config) 556 else: 557 subenv['GST_VALIDATE_CONFIG'] = config 558 559 def launch_server(self): 560 return None 561 562 def get_logfile_repr(self): 563 if not self.options.redirect_logs: 564 if self.html_log: 565 log = self.html_log 566 else: 567 log = self.logfile 568 569 if CI_ARTIFACTS_URL: 570 log = CI_ARTIFACTS_URL + os.path.relpath(log, self.options.logsdir) 571 572 return "\n Log: %s" % (log) 573 574 return "" 575 576 def get_command_repr(self): 577 message = "%s %s" % (self._env_variable, ' '.join( 578 shlex.quote(arg) for arg in self.command)) 579 if self.server_command: 580 message = "%s & %s" % (self.server_command, message) 581 582 return message 583 584 def test_start(self, queue): 585 self.open_logfile() 586 587 self.server_command = self.launch_server() 588 self.queue = queue 589 self.command = [self.application] 590 self._starting_time = time.time() 591 self.build_arguments() 592 self.proc_env = self.get_subproc_env() 593 594 for var, value in list(self.extra_env_variables.items()): 595 value = self.proc_env.get(var, '') + os.pathsep + value 596 self.proc_env[var] = value.strip(os.pathsep) 597 self.add_env_variable(var, self.proc_env[var]) 598 599 if self.options.gdb: 600 self.command = self.use_gdb(self.command) 601 602 self.previous_sigint_handler = signal.getsignal(signal.SIGINT) 603 # Make the gst-validate executable ignore SIGINT while gdb is 604 # running. 605 signal.signal(signal.SIGINT, signal.SIG_IGN) 606 607 if self.options.valgrind: 608 self.command = self.use_valgrind(self.command, self.proc_env) 609 610 if not self.options.redirect_logs: 611 self.out.write("# `%s`\n\n" 612 "## Command\n\n``` bash\n%s\n```\n\n" % ( 613 self.classname, self.get_command_repr())) 614 self.out.write("## %s output\n\n``` \n\n" % os.path.basename(self.application)) 615 self.out.flush() 616 else: 617 message = "Launching: %s%s\n" \ 618 " Command: %s\n" % (Colors.ENDC, self.classname, 619 self.get_command_repr()) 620 printc(message, Colors.OKBLUE) 621 622 self.thread = threading.Thread(target=self.thread_wrapper) 623 self.thread.start() 624 625 self.last_val = 0 626 self.last_change_ts = time.time() 627 self.start_ts = time.time() 628 629 def _dump_log_file(self, logfile): 630 with open(logfile, 'r') as fin: 631 printc(self.get_logfile_repr()) 632 if mdv and utils.supports_ansi_colors(): 633 printc(mdv.main(fin.read())) 634 else: 635 for line in fin.readlines(): 636 print('> ' + line, end='') 637 638 def _dump_log_files(self): 639 self._dump_log_file(self.logfile) 640 641 def copy_logfiles(self, extra_folder="flaky_tests"): 642 path = os.path.dirname(os.path.join(self.options.logsdir, extra_folder, 643 self.classname.replace(".", os.sep))) 644 mkdir(path) 645 self.logfile = shutil.copy(self.logfile, path) 646 extra_logs = [] 647 for logfile in self.extra_logfiles: 648 extra_logs.append(shutil.copy(logfile, path)) 649 self.extra_logfiles = extra_logs 650 651 def test_end(self, retry_on_failure=False): 652 self.kill_subprocess() 653 self.thread.join() 654 self.time_taken = time.time() - self._starting_time 655 656 if self.options.gdb: 657 signal.signal(signal.SIGINT, self.previous_sigint_handler) 658 659 self.finalize_logfiles() 660 message = None 661 end = "\n" 662 if self.result != Result.PASSED: 663 if not retry_on_failure: 664 message = str(self) 665 end = "\n" 666 else: 667 if is_tty(): 668 message = "%s %s: %s%s" % (self.number, self.classname, self.result, 669 " (" + self.message + ")" if self.message else "") 670 end = "\r" 671 672 if message is not None: 673 printc(message, color=utils.get_color_for_result( 674 self.result), end=end) 675 676 if self.options.dump_on_failure: 677 if self.result is not Result.PASSED: 678 self._dump_log_files() 679 680 # Only keep around env variables we need later 681 clean_env = {} 682 for n in self.__env_variable: 683 clean_env[n] = self.proc_env.get(n, None) 684 self.proc_env = clean_env 685 686 # Don't keep around JSON report objects, they were processed 687 # in check_results already 688 self.reports = [] 689 690 return self.result 691 692 693class GstValidateTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): 694 pass 695 696 697class GstValidateListener(socketserver.BaseRequestHandler, Loggable): 698 699 def __init__(self, *args, **kwargs): 700 super().__init__(*args, **kwargs) 701 Loggable.__init__(self, "GstValidateListener") 702 703 def handle(self): 704 """Implements BaseRequestHandler handle method""" 705 test = None 706 self.logCategory = "GstValidateListener" 707 while True: 708 raw_len = self.request.recv(4) 709 if raw_len == b'': 710 return 711 msglen = struct.unpack('>I', raw_len)[0] 712 try: 713 msg = self.request.recv(msglen).decode('utf-8', 'ignore') 714 except UnicodeDecodeError as e: 715 self.error("Could not decode message: %s - %s" % (msg, e)) 716 continue 717 if msg == '': 718 return 719 720 try: 721 obj = json.loads(msg) 722 except json.decoder.JSONDecodeError: 723 self.error("Could not deserialize: %s - %s" % (msg, e)) 724 continue 725 726 if test is None: 727 # First message must contain the uuid 728 uuid = obj.get("uuid", None) 729 if uuid is None: 730 return 731 # Find test from launcher 732 for t in self.server.launcher.tests: 733 if uuid == t.get_uuid(): 734 test = t 735 break 736 if test is None: 737 self.server.launcher.error( 738 "Could not find test for UUID %s" % uuid) 739 return 740 741 obj_type = obj.get("type", '') 742 if obj_type == 'position': 743 test.set_position(obj['position'], obj['duration'], 744 obj['speed']) 745 elif obj_type == 'buffering': 746 test.set_position(obj['position'], 100) 747 elif obj_type == 'action': 748 test.add_action_execution(obj) 749 # Make sure that action is taken into account when checking if process 750 # is updating 751 test.position += 1 752 elif obj_type == 'action-done': 753 # Make sure that action end is taken into account when checking if process 754 # is updating 755 test.position += 1 756 test.actions_infos[-1]['execution-duration'] = obj['execution-duration'] 757 elif obj_type == 'report': 758 test.add_report(obj) 759 760 761class GstValidateTest(Test): 762 763 """ A class representing a particular test. """ 764 HARD_TIMEOUT_FACTOR = 5 765 fault_sig_regex = re.compile("<Caught SIGNAL: .*>") 766 767 def __init__(self, application_name, classname, 768 options, reporter, duration=0, 769 timeout=DEFAULT_TIMEOUT, scenario=None, hard_timeout=None, 770 media_descriptor=None, extra_env_variables=None, 771 expected_issues=None, workdir=None): 772 773 extra_env_variables = extra_env_variables or {} 774 775 if not hard_timeout and self.HARD_TIMEOUT_FACTOR: 776 if timeout: 777 hard_timeout = timeout * self.HARD_TIMEOUT_FACTOR 778 elif duration: 779 hard_timeout = duration * self.HARD_TIMEOUT_FACTOR 780 else: 781 hard_timeout = None 782 783 # If we are running from source, use the -debug version of the 784 # application which is using rpath instead of libtool's wrappers. It's 785 # slightly faster to start and will not confuse valgrind. 786 debug = '%s-debug' % application_name 787 p = look_for_file_in_source_dir('tools', debug) 788 if p: 789 application_name = p 790 791 self.reports = [] 792 self.position = -1 793 self.media_duration = -1 794 self.speed = 1.0 795 self.actions_infos = [] 796 self.media_descriptor = media_descriptor 797 self.server = None 798 self.criticals = [] 799 800 override_path = self.get_override_file(media_descriptor) 801 if override_path: 802 if extra_env_variables: 803 if extra_env_variables.get("GST_VALIDATE_OVERRIDE", ""): 804 extra_env_variables[ 805 "GST_VALIDATE_OVERRIDE"] += os.path.pathsep 806 807 extra_env_variables["GST_VALIDATE_OVERRIDE"] = override_path 808 809 super(GstValidateTest, self).__init__(application_name, classname, 810 options, reporter, 811 duration=duration, 812 timeout=timeout, 813 hard_timeout=hard_timeout, 814 extra_env_variables=extra_env_variables, 815 expected_issues=expected_issues, 816 workdir=workdir) 817 818 if scenario is None or scenario.name.lower() == "none": 819 self.scenario = None 820 else: 821 self.scenario = scenario 822 823 def kill_subprocess(self): 824 Test.kill_subprocess(self) 825 826 def add_report(self, report): 827 self.reports.append(report) 828 829 def set_position(self, position, duration, speed=None): 830 self.position = position 831 self.media_duration = duration 832 if speed: 833 self.speed = speed 834 835 def add_action_execution(self, action_infos): 836 self.actions_infos.append(action_infos) 837 838 def get_override_file(self, media_descriptor): 839 if media_descriptor: 840 if media_descriptor.get_path(): 841 override_path = os.path.splitext(media_descriptor.get_path())[ 842 0] + VALIDATE_OVERRIDE_EXTENSION 843 if os.path.exists(override_path): 844 return override_path 845 846 return None 847 848 def get_current_position(self): 849 return self.position 850 851 def get_current_value(self): 852 return self.position 853 854 def get_subproc_env(self): 855 subproc_env = os.environ.copy() 856 857 subproc_env["GST_VALIDATE_UUID"] = self.get_uuid() 858 859 if 'GST_DEBUG' in os.environ and not self.options.redirect_logs: 860 gstlogsfile = os.path.splitext(self.logfile)[0] + '.gstdebug' 861 self.extra_logfiles.add(gstlogsfile) 862 subproc_env["GST_DEBUG_FILE"] = gstlogsfile 863 864 if self.options.no_color: 865 subproc_env["GST_DEBUG_NO_COLOR"] = '1' 866 867 # Ensure XInitThreads is called, see bgo#731525 868 subproc_env['GST_GL_XINITTHREADS'] = '1' 869 self.add_env_variable('GST_GL_XINITTHREADS', '1') 870 871 if self.scenario is not None: 872 scenario = self.scenario.get_execution_name() 873 subproc_env["GST_VALIDATE_SCENARIO"] = scenario 874 self.add_env_variable("GST_VALIDATE_SCENARIO", 875 subproc_env["GST_VALIDATE_SCENARIO"]) 876 else: 877 try: 878 del subproc_env["GST_VALIDATE_SCENARIO"] 879 except KeyError: 880 pass 881 882 if not subproc_env.get('GST_DEBUG_DUMP_DOT_DIR'): 883 dotfilesdir = os.path.join(self.options.logsdir, 884 self.classname.replace(".", os.sep) + '.pipelines_dot_files') 885 mkdir(dotfilesdir) 886 subproc_env['GST_DEBUG_DUMP_DOT_DIR'] = dotfilesdir 887 if CI_ARTIFACTS_URL: 888 dotfilesurl = CI_ARTIFACTS_URL + os.path.relpath(dotfilesdir, 889 self.options.logsdir) 890 subproc_env['GST_VALIDATE_DEBUG_DUMP_DOT_URL'] = dotfilesurl 891 892 return subproc_env 893 894 def clean(self): 895 Test.clean(self) 896 self.reports = [] 897 self.position = -1 898 self.media_duration = -1 899 self.speed = 1.0 900 self.actions_infos = [] 901 902 def build_arguments(self): 903 super(GstValidateTest, self).build_arguments() 904 if "GST_VALIDATE" in os.environ: 905 self.add_env_variable("GST_VALIDATE", os.environ["GST_VALIDATE"]) 906 907 if "GST_VALIDATE_SCENARIOS_PATH" in os.environ: 908 self.add_env_variable("GST_VALIDATE_SCENARIOS_PATH", 909 os.environ["GST_VALIDATE_SCENARIOS_PATH"]) 910 911 self.add_env_variable("GST_VALIDATE_CONFIG") 912 self.add_env_variable("GST_VALIDATE_OVERRIDE") 913 914 def get_extra_log_content(self, extralog): 915 value = Test.get_extra_log_content(self, extralog) 916 917 return value 918 919 def report_matches_expected_issues(self, report, expected_issues): 920 for key in ['bug', 'bugs', 'sometimes']: 921 if key in expected_issues: 922 del expected_issues[key] 923 for key, value in list(report.items()): 924 if key in expected_issues: 925 if not re.findall(expected_issues[key], str(value)): 926 return False 927 expected_issues.pop(key) 928 929 return not bool(expected_issues) 930 931 def check_reported_issues(self, expected_issues): 932 ret = [] 933 expected_retcode = [0] 934 for report in self.reports: 935 found = None 936 for expected_issue in expected_issues: 937 if self.report_matches_expected_issues(report, 938 expected_issue.copy()): 939 found = expected_issue 940 break 941 942 if found is not None: 943 if not found.get('can-happen-several-times', False): 944 expected_issues.remove(found) 945 if report['level'] == 'critical': 946 if found.get('sometimes', True) and isinstance(expected_retcode, list): 947 expected_retcode.append(18) 948 else: 949 expected_retcode = [18] 950 elif report['level'] == 'critical': 951 ret.append(report) 952 953 if not ret: 954 return None, expected_issues, expected_retcode 955 956 return ret, expected_issues, expected_retcode 957 958 def check_expected_issue(self, expected_issue): 959 res = True 960 msg = '' 961 expected_symbols = expected_issue.get('stacktrace_symbols') 962 if expected_symbols: 963 trace_gatherer = BackTraceGenerator.get_default() 964 stack_trace = trace_gatherer.get_trace(self) 965 966 if stack_trace: 967 if not isinstance(expected_symbols, list): 968 expected_symbols = [expected_symbols] 969 970 not_found_symbols = [s for s in expected_symbols 971 if s not in stack_trace] 972 if not_found_symbols: 973 msg = " Expected symbols '%s' not found in stack trace " % ( 974 not_found_symbols) 975 res = False 976 else: 977 msg += " No stack trace available, could not verify symbols " 978 979 _, not_found_expected_issues, _ = self.check_reported_issues(expected_issue.get('issues', [])) 980 if not_found_expected_issues: 981 mandatory_failures = [f for f in not_found_expected_issues 982 if not f.get('sometimes', True)] 983 if mandatory_failures: 984 msg = " (Expected issues not found: %s) " % mandatory_failures 985 res = False 986 987 return msg, res 988 989 def check_expected_timeout(self, expected_timeout): 990 msg = "Expected timeout happened. " 991 result = Result.PASSED 992 message = expected_timeout.get('message') 993 if message: 994 if not re.findall(message, self.message): 995 result = Result.FAILED 996 msg = "Expected timeout message: %s got %s " % ( 997 message, self.message) 998 999 stack_msg, stack_res = self.check_expected_issue(expected_timeout) 1000 if not stack_res: 1001 result = Result.TIMEOUT 1002 msg += stack_msg 1003 1004 return result, msg 1005 1006 def check_results(self): 1007 if self.result in [Result.FAILED, self.result is Result.PASSED]: 1008 return 1009 1010 for report in self.reports: 1011 if report.get('issue-id') == 'runtime::missing-plugin': 1012 self.set_result(Result.SKIPPED, "%s\n%s" % (report['summary'], 1013 report['details'])) 1014 return 1015 1016 self.debug("%s returncode: %s", self, self.process.returncode) 1017 1018 expected_issues = copy.deepcopy(self.expected_issues) 1019 self.criticals, not_found_expected_issues, expected_returncode = self.check_reported_issues(expected_issues) 1020 expected_timeout = None 1021 expected_signal = None 1022 for i, f in enumerate(not_found_expected_issues): 1023 returncode = f.get('returncode', []) 1024 if not isinstance(returncode, list): 1025 returncode = [returncode] 1026 1027 if f.get('signame'): 1028 signames = f['signame'] 1029 if not isinstance(signames, list): 1030 signames = [signames] 1031 1032 returncode = [EXITING_SIGNALS[signame] for signame in signames] 1033 1034 if returncode: 1035 if 'sometimes' in f: 1036 returncode.append(0) 1037 expected_returncode = returncode 1038 expected_signal = f 1039 elif f.get("timeout"): 1040 expected_timeout = f 1041 1042 not_found_expected_issues = [f for f in not_found_expected_issues 1043 if not f.get('returncode') and not f.get('signame')] 1044 1045 msg = "" 1046 result = Result.PASSED 1047 if self.result == Result.TIMEOUT: 1048 with open(self.logfile) as f: 1049 signal_fault_info = self.fault_sig_regex.findall(f.read()) 1050 if signal_fault_info: 1051 result = Result.FAILED 1052 msg = signal_fault_info[0] 1053 elif expected_timeout: 1054 not_found_expected_issues.remove(expected_timeout) 1055 result, msg = self.check_expected_timeout(expected_timeout) 1056 else: 1057 return 1058 elif self.process.returncode in EXITING_SIGNALS: 1059 msg = "Application exited with signal %s" % ( 1060 EXITING_SIGNALS[self.process.returncode]) 1061 if self.process.returncode not in expected_returncode: 1062 result = Result.FAILED 1063 else: 1064 if expected_signal: 1065 stack_msg, stack_res = self.check_expected_issue( 1066 expected_signal) 1067 if not stack_res: 1068 msg += stack_msg 1069 result = Result.FAILED 1070 self.add_stack_trace_to_logfile() 1071 elif self.process.returncode == VALGRIND_ERROR_CODE: 1072 msg = "Valgrind reported errors " 1073 result = Result.FAILED 1074 elif self.process.returncode not in expected_returncode: 1075 msg = "Application returned %s " % self.process.returncode 1076 if expected_returncode != [0]: 1077 msg += "(expected %s) " % expected_returncode 1078 result = Result.FAILED 1079 1080 if self.criticals: 1081 msg += "(critical errors: [%s]) " % ', '.join([c['summary'] 1082 for c in self.criticals]) 1083 result = Result.FAILED 1084 1085 if not_found_expected_issues: 1086 mandatory_failures = [f for f in not_found_expected_issues 1087 if not f.get('sometimes', True)] 1088 1089 if mandatory_failures: 1090 msg += " (Expected errors not found: %s) " % mandatory_failures 1091 result = Result.FAILED 1092 elif self.expected_issues: 1093 msg += ' %s(Expected errors occured: %s)%s' % (Colors.OKBLUE, 1094 self.expected_issues, 1095 Colors.ENDC) 1096 1097 self.set_result(result, msg.strip()) 1098 1099 def _generate_expected_issues(self): 1100 res = "" 1101 self.criticals = self.criticals or [] 1102 if self.result == Result.TIMEOUT: 1103 res += """ { 1104 'timeout': True, 1105 'sometimes': True, 1106 },""" 1107 1108 for report in self.criticals: 1109 res += "\n%s{" % (" " * 12) 1110 1111 for key, value in report.items(): 1112 if key == "type": 1113 continue 1114 if value is None: 1115 continue 1116 res += '\n%s%s"%s": "%s",' % ( 1117 " " * 16, "# " if key == "details" else "", 1118 key, value.replace('\n', '\\n')) 1119 1120 res += "\n%s}," % (" " * 12) 1121 1122 return res 1123 1124 def get_valgrind_suppressions(self): 1125 result = super(GstValidateTest, self).get_valgrind_suppressions() 1126 result.extend(utils.get_gst_build_valgrind_suppressions()) 1127 gst_sup = self.get_valgrind_suppression_file('common', 'gst.supp') 1128 if gst_sup: 1129 result.append(gst_sup) 1130 1131 return result 1132 1133 1134class GstValidateEncodingTestInterface(object): 1135 DURATION_TOLERANCE = GST_SECOND / 4 1136 1137 def __init__(self, combination, media_descriptor, duration_tolerance=None): 1138 super(GstValidateEncodingTestInterface, self).__init__() 1139 1140 self.media_descriptor = media_descriptor 1141 self.combination = combination 1142 self.dest_file = "" 1143 1144 self._duration_tolerance = duration_tolerance 1145 if duration_tolerance is None: 1146 self._duration_tolerance = self.DURATION_TOLERANCE 1147 1148 def get_current_size(self): 1149 try: 1150 size = os.stat(urllib.parse.urlparse(self.dest_file).path).st_size 1151 except OSError: 1152 return None 1153 1154 self.debug("Size: %s" % size) 1155 return size 1156 1157 def _get_profile_full(self, muxer, venc, aenc, video_restriction=None, 1158 audio_restriction=None, audio_presence=0, 1159 video_presence=0): 1160 ret = "" 1161 if muxer: 1162 ret += muxer 1163 ret += ":" 1164 if venc: 1165 if video_restriction is not None: 1166 ret = ret + video_restriction + '->' 1167 ret += venc 1168 if video_presence: 1169 ret = ret + '|' + str(video_presence) 1170 if aenc: 1171 ret += ":" 1172 if audio_restriction is not None: 1173 ret = ret + audio_restriction + '->' 1174 ret += aenc 1175 if audio_presence: 1176 ret = ret + '|' + str(audio_presence) 1177 1178 return ret.replace("::", ":") 1179 1180 def get_profile(self, video_restriction=None, audio_restriction=None): 1181 vcaps = self.combination.get_video_caps() 1182 acaps = self.combination.get_audio_caps() 1183 if self.media_descriptor is not None: 1184 if self.media_descriptor.get_num_tracks("video") == 0: 1185 vcaps = None 1186 1187 if self.media_descriptor.get_num_tracks("audio") == 0: 1188 acaps = None 1189 1190 return self._get_profile_full(self.combination.get_muxer_caps(), 1191 vcaps, acaps, 1192 video_restriction=video_restriction, 1193 audio_restriction=audio_restriction) 1194 1195 def _clean_caps(self, caps): 1196 """ 1197 Returns a list of key=value or structure name, without "(types)" or ";" or "," 1198 """ 1199 return re.sub(r"\(.+?\)\s*| |;", '', caps).split(',') 1200 1201 # pylint: disable=E1101 1202 def _has_caps_type_variant(self, c, ccaps): 1203 """ 1204 Handle situations where we can have application/ogg or video/ogg or 1205 audio/ogg 1206 """ 1207 has_variant = False 1208 media_type = re.findall("application/|video/|audio/", c) 1209 if media_type: 1210 media_type = media_type[0].replace('/', '') 1211 possible_mtypes = ["application", "video", "audio"] 1212 possible_mtypes.remove(media_type) 1213 for tmptype in possible_mtypes: 1214 possible_c_variant = c.replace(media_type, tmptype) 1215 if possible_c_variant in ccaps: 1216 self.info( 1217 "Found %s in %s, good enough!", possible_c_variant, ccaps) 1218 has_variant = True 1219 1220 return has_variant 1221 1222 # pylint: disable=E1101 1223 def run_iqa_test(self, reference_file_uri): 1224 """ 1225 Runs IQA test if @reference_file_path exists 1226 @test: The test to run tests on 1227 """ 1228 if not GstValidateBaseTestManager.has_feature('iqa'): 1229 self.debug('Iqa element not present, not running extra test.') 1230 return 1231 1232 pipeline_desc = """ 1233 uridecodebin uri=%s ! 1234 iqa name=iqa do-dssim=true dssim-error-threshold=1.0 ! fakesink 1235 uridecodebin uri=%s ! iqa. 1236 """ % (reference_file_uri, self.dest_file) 1237 pipeline_desc = pipeline_desc.replace("\n", "") 1238 1239 command = [GstValidateBaseTestManager.COMMAND] + \ 1240 shlex.split(pipeline_desc) 1241 msg = "## Running IQA tests on results of: " \ 1242 + "%s\n### Command: \n```\n%s\n```\n" % ( 1243 self.classname, ' '.join(command)) 1244 if not self.options.redirect_logs: 1245 self.out.write(msg) 1246 self.out.flush() 1247 else: 1248 printc(msg, Colors.OKBLUE) 1249 1250 self.process = subprocess.Popen(command, 1251 stderr=self.out, 1252 stdout=self.out, 1253 env=self.proc_env, 1254 cwd=self.workdir) 1255 self.process.wait() 1256 1257 def check_encoded_file(self): 1258 result_descriptor = GstValidateMediaDescriptor.new_from_uri( 1259 self.dest_file) 1260 if result_descriptor is None: 1261 return (Result.FAILED, "Could not discover encoded file %s" 1262 % self.dest_file) 1263 1264 duration = result_descriptor.get_duration() 1265 orig_duration = self.media_descriptor.get_duration() 1266 tolerance = self._duration_tolerance 1267 1268 if orig_duration - tolerance >= duration <= orig_duration + tolerance: 1269 os.remove(result_descriptor.get_path()) 1270 return (Result.FAILED, "Duration of encoded file is " 1271 " wrong (%s instead of %s)" % 1272 (utils.TIME_ARGS(duration), 1273 utils.TIME_ARGS(orig_duration))) 1274 else: 1275 all_tracks_caps = result_descriptor.get_tracks_caps() 1276 container_caps = result_descriptor.get_caps() 1277 if container_caps: 1278 all_tracks_caps.insert(0, ("container", container_caps)) 1279 1280 for track_type, caps in all_tracks_caps: 1281 ccaps = self._clean_caps(caps) 1282 wanted_caps = self.combination.get_caps(track_type) 1283 cwanted_caps = self._clean_caps(wanted_caps) 1284 1285 if wanted_caps is None: 1286 os.remove(result_descriptor.get_path()) 1287 return (Result.FAILED, 1288 "Found a track of type %s in the encoded files" 1289 " but none where wanted in the encoded profile: %s" 1290 % (track_type, self.combination)) 1291 1292 for c in cwanted_caps: 1293 if c not in ccaps: 1294 if not self._has_caps_type_variant(c, ccaps): 1295 os.remove(result_descriptor.get_path()) 1296 return (Result.FAILED, 1297 "Field: %s (from %s) not in caps of the outputed file %s" 1298 % (wanted_caps, c, ccaps)) 1299 1300 os.remove(result_descriptor.get_path()) 1301 return (Result.PASSED, "") 1302 1303 1304class TestsManager(Loggable): 1305 1306 """ A class responsible for managing tests. """ 1307 1308 name = "base" 1309 loading_testsuite = None 1310 1311 def __init__(self): 1312 1313 Loggable.__init__(self) 1314 1315 self.tests = [] 1316 self.unwanted_tests = [] 1317 self.options = None 1318 self.args = None 1319 self.reporter = None 1320 self.wanted_tests_patterns = [] 1321 self.blacklisted_tests_patterns = [] 1322 self._generators = [] 1323 self.check_testslist = True 1324 self.all_tests = None 1325 self.expected_issues = {} 1326 self.blacklisted_tests = [] 1327 1328 def init(self): 1329 return True 1330 1331 def list_tests(self): 1332 return sorted(list(self.tests), key=lambda x: x.classname) 1333 1334 def find_tests(self, classname): 1335 regex = re.compile(classname) 1336 return [test for test in self.list_tests() if regex.findall(test.classname)] 1337 1338 def add_expected_issues(self, expected_issues): 1339 for bugid, failure_def in list(expected_issues.items()): 1340 tests_regexes = [] 1341 for test_name_regex in failure_def['tests']: 1342 regex = re.compile(test_name_regex) 1343 tests_regexes.append(regex) 1344 for test in self.tests: 1345 if regex.findall(test.classname): 1346 if failure_def.get('allow_flakiness'): 1347 test.allow_flakiness = True 1348 self.debug("%s allow flakyness" % (test.classname)) 1349 else: 1350 for issue in failure_def['issues']: 1351 issue['bug'] = bugid 1352 test.expected_issues.extend(failure_def['issues']) 1353 self.debug("%s added expected issues from %s" % ( 1354 test.classname, bugid)) 1355 failure_def['tests'] = tests_regexes 1356 1357 self.expected_issues.update(expected_issues) 1358 1359 def add_test(self, test): 1360 if test.generator is None: 1361 test.classname = self.loading_testsuite + '.' + test.classname 1362 1363 for bugid, failure_def in list(self.expected_issues.items()): 1364 failure_def['bug'] = bugid 1365 for regex in failure_def['tests']: 1366 if regex.findall(test.classname): 1367 if failure_def.get('allow_flakiness'): 1368 test.allow_flakiness = True 1369 self.debug("%s allow flakyness" % (test.classname)) 1370 else: 1371 for issue in failure_def['issues']: 1372 issue['bug'] = bugid 1373 test.expected_issues.extend(failure_def['issues']) 1374 self.debug("%s added expected issues from %s" % ( 1375 test.classname, bugid)) 1376 1377 if self._is_test_wanted(test): 1378 if test not in self.tests: 1379 self.tests.append(test) 1380 else: 1381 if test not in self.tests: 1382 self.unwanted_tests.append(test) 1383 1384 def get_tests(self): 1385 return self.tests 1386 1387 def populate_testsuite(self): 1388 pass 1389 1390 def add_generators(self, generators): 1391 """ 1392 @generators: A list of, or one single #TestsGenerator to be used to generate tests 1393 """ 1394 if not isinstance(generators, list): 1395 generators = [generators] 1396 self._generators.extend(generators) 1397 for generator in generators: 1398 generator.testsuite = self.loading_testsuite 1399 1400 self._generators = list(set(self._generators)) 1401 1402 def get_generators(self): 1403 return self._generators 1404 1405 def _add_blacklist(self, blacklisted_tests): 1406 if not isinstance(blacklisted_tests, list): 1407 blacklisted_tests = [blacklisted_tests] 1408 1409 for patterns in blacklisted_tests: 1410 for pattern in patterns.split(","): 1411 self.blacklisted_tests_patterns.append(re.compile(pattern)) 1412 1413 def set_default_blacklist(self, default_blacklist): 1414 for test_regex, reason in default_blacklist: 1415 if not test_regex.startswith(self.loading_testsuite + '.'): 1416 test_regex = self.loading_testsuite + '.' + test_regex 1417 self.blacklisted_tests.append((test_regex, reason)) 1418 self._add_blacklist(test_regex) 1419 1420 def add_options(self, parser): 1421 """ Add more arguments. """ 1422 pass 1423 1424 def set_settings(self, options, args, reporter): 1425 """ Set properties after options parsing. """ 1426 self.options = options 1427 self.args = args 1428 self.reporter = reporter 1429 1430 self.populate_testsuite() 1431 1432 if self.options.valgrind: 1433 self.print_valgrind_bugs() 1434 1435 if options.wanted_tests: 1436 for patterns in options.wanted_tests: 1437 for pattern in patterns.split(","): 1438 self.wanted_tests_patterns.append(re.compile(pattern)) 1439 1440 if options.blacklisted_tests: 1441 for patterns in options.blacklisted_tests: 1442 self._add_blacklist(patterns) 1443 1444 def check_blacklists(self): 1445 if self.options.check_bugs_status: 1446 if not check_bugs_resolution(self.blacklisted_tests): 1447 return False 1448 1449 return True 1450 1451 def log_blacklists(self): 1452 if self.blacklisted_tests: 1453 self.info("Currently 'hardcoded' %s blacklisted tests:" % 1454 self.name) 1455 1456 for name, bug in self.blacklisted_tests: 1457 if not self.options.check_bugs_status: 1458 self.info(" + %s --> bug: %s" % (name, bug)) 1459 1460 def check_expected_issues(self): 1461 if not self.expected_issues or not self.options.check_bugs_status: 1462 return True 1463 1464 bugs_definitions = defaultdict(list) 1465 for bug, failure_def in list(self.expected_issues.items()): 1466 tests_names = '|'.join( 1467 [regex.pattern for regex in failure_def['tests']]) 1468 bugs_definitions[tests_names].extend([bug]) 1469 1470 return check_bugs_resolution(bugs_definitions.items()) 1471 1472 def _check_blacklisted(self, test): 1473 for pattern in self.blacklisted_tests_patterns: 1474 if pattern.findall(test.classname): 1475 self.info("%s is blacklisted by %s", test.classname, pattern) 1476 return True 1477 1478 return False 1479 1480 def _check_whitelisted(self, test): 1481 for pattern in self.wanted_tests_patterns: 1482 if pattern.findall(test.classname): 1483 if self._check_blacklisted(test): 1484 # If explicitly white listed that specific test 1485 # bypass the blacklisting 1486 if pattern.pattern != test.classname: 1487 return False 1488 return True 1489 return False 1490 1491 def _check_duration(self, test): 1492 if test.duration > 0 and int(self.options.long_limit) < int(test.duration): 1493 self.info("Not activating %s as its duration (%d) is superior" 1494 " than the long limit (%d)" % (test, test.duration, 1495 int(self.options.long_limit))) 1496 return False 1497 1498 return True 1499 1500 def _is_test_wanted(self, test): 1501 if self._check_whitelisted(test): 1502 if not self._check_duration(test): 1503 return False 1504 return True 1505 1506 if self._check_blacklisted(test): 1507 return False 1508 1509 if not self._check_duration(test): 1510 return False 1511 1512 if not self.wanted_tests_patterns: 1513 return True 1514 1515 return False 1516 1517 def needs_http_server(self): 1518 return False 1519 1520 def print_valgrind_bugs(self): 1521 pass 1522 1523 1524class TestsGenerator(Loggable): 1525 1526 def __init__(self, name, test_manager, tests=[]): 1527 Loggable.__init__(self) 1528 self.name = name 1529 self.test_manager = test_manager 1530 self.testsuite = None 1531 self._tests = {} 1532 for test in tests: 1533 self._tests[test.classname] = test 1534 1535 def generate_tests(self, *kwargs): 1536 """ 1537 Method that generates tests 1538 """ 1539 return list(self._tests.values()) 1540 1541 def add_test(self, test): 1542 test.generator = self 1543 test.classname = self.testsuite + '.' + test.classname 1544 self._tests[test.classname] = test 1545 1546 1547class GstValidateTestsGenerator(TestsGenerator): 1548 1549 def populate_tests(self, uri_minfo_special_scenarios, scenarios): 1550 pass 1551 1552 @staticmethod 1553 def get_fakesink_for_media_type(media_type, needs_clock=False): 1554 if media_type == "video": 1555 if needs_clock: 1556 return 'fakevideosink qos=true max-lateness=20000000' 1557 1558 return "fakevideosink sync=false" 1559 1560 if needs_clock: 1561 return "fakesink sync=true" 1562 1563 return "fakesink" 1564 1565 def generate_tests(self, uri_minfo_special_scenarios, scenarios): 1566 self.populate_tests(uri_minfo_special_scenarios, scenarios) 1567 return super(GstValidateTestsGenerator, self).generate_tests() 1568 1569 1570class _TestsLauncher(Loggable): 1571 1572 def __init__(self): 1573 1574 Loggable.__init__(self) 1575 1576 self.options = None 1577 self.testers = [] 1578 self.tests = [] 1579 self.reporter = None 1580 self._list_testers() 1581 self.all_tests = None 1582 self.wanted_tests_patterns = [] 1583 1584 self.queue = queue.Queue() 1585 self.jobs = [] 1586 self.total_num_tests = 0 1587 self.server = None 1588 self.httpsrv = None 1589 self.vfb_server = None 1590 1591 def _list_app_dirs(self): 1592 app_dirs = [] 1593 env_dirs = os.environ["GST_VALIDATE_APPS_DIR"] 1594 if env_dirs is not None: 1595 for dir_ in env_dirs.split(":"): 1596 app_dirs.append(dir_) 1597 1598 return app_dirs 1599 1600 def _exec_app(self, app_dir, env): 1601 try: 1602 files = os.listdir(app_dir) 1603 except OSError as e: 1604 self.debug("Could not list %s: %s" % (app_dir, e)) 1605 files = [] 1606 for f in files: 1607 if f.endswith(".py"): 1608 exec(compile(open(os.path.join(app_dir, f)).read(), 1609 os.path.join(app_dir, f), 'exec'), env) 1610 1611 def _exec_apps(self, env): 1612 app_dirs = self._list_app_dirs() 1613 for app_dir in app_dirs: 1614 self._exec_app(app_dir, env) 1615 1616 def _list_testers(self): 1617 env = globals().copy() 1618 self._exec_apps(env) 1619 1620 testers = [i() for i in utils.get_subclasses(TestsManager, env)] 1621 for tester in testers: 1622 if tester.init() is True: 1623 self.testers.append(tester) 1624 else: 1625 self.warning("Can not init tester: %s -- PATH is %s" 1626 % (tester.name, os.environ["PATH"])) 1627 1628 def add_options(self, parser): 1629 for tester in self.testers: 1630 tester.add_options(parser) 1631 1632 def _load_testsuite(self, testsuites): 1633 exceptions = [] 1634 for testsuite in testsuites: 1635 try: 1636 sys.path.insert(0, os.path.dirname(testsuite)) 1637 return (__import__(os.path.basename(testsuite).replace(".py", "")), None) 1638 except Exception as e: 1639 exceptions.append("Could not load %s: %s" % (testsuite, e)) 1640 continue 1641 finally: 1642 sys.path.remove(os.path.dirname(testsuite)) 1643 1644 return (None, exceptions) 1645 1646 def _load_testsuites(self): 1647 testsuites = set() 1648 for testsuite in self.options.testsuites: 1649 if testsuite.endswith('.py') and os.path.exists(testsuite): 1650 testsuite = os.path.abspath(os.path.expanduser(testsuite)) 1651 loaded_module = self._load_testsuite([testsuite]) 1652 else: 1653 possible_testsuites_paths = [os.path.join(d, testsuite + ".py") 1654 for d in self.options.testsuites_dirs] 1655 loaded_module = self._load_testsuite(possible_testsuites_paths) 1656 1657 module = loaded_module[0] 1658 if not loaded_module[0]: 1659 if "." in testsuite: 1660 self.options.testsuites.append(testsuite.split('.')[0]) 1661 self.info("%s looks like a test name, trying that" % 1662 testsuite) 1663 self.options.wanted_tests.append(testsuite) 1664 else: 1665 printc("Could not load testsuite: %s, reasons: %s" % ( 1666 testsuite, loaded_module[1]), Colors.FAIL) 1667 continue 1668 1669 testsuites.add(module) 1670 if not hasattr(module, "TEST_MANAGER"): 1671 module.TEST_MANAGER = [tester.name for tester in self.testers] 1672 elif not isinstance(module.TEST_MANAGER, list): 1673 module.TEST_MANAGER = [module.TEST_MANAGER] 1674 1675 self.options.testsuites = list(testsuites) 1676 1677 def _setup_testsuites(self): 1678 for testsuite in self.options.testsuites: 1679 loaded = False 1680 wanted_test_manager = None 1681 # TEST_MANAGER has been set in _load_testsuites() 1682 assert hasattr(testsuite, "TEST_MANAGER") 1683 wanted_test_manager = testsuite.TEST_MANAGER 1684 if not isinstance(wanted_test_manager, list): 1685 wanted_test_manager = [wanted_test_manager] 1686 1687 for tester in self.testers: 1688 if wanted_test_manager is not None and \ 1689 tester.name not in wanted_test_manager: 1690 continue 1691 1692 prev_testsuite_name = TestsManager.loading_testsuite 1693 if self.options.user_paths: 1694 TestsManager.loading_testsuite = tester.name 1695 tester.register_defaults() 1696 loaded = True 1697 else: 1698 TestsManager.loading_testsuite = testsuite.__name__ 1699 if testsuite.setup_tests(tester, self.options): 1700 loaded = True 1701 if prev_testsuite_name: 1702 TestsManager.loading_testsuite = prev_testsuite_name 1703 1704 if not loaded: 1705 printc("Could not load testsuite: %s" 1706 " maybe because of missing TestManager" 1707 % (testsuite), Colors.FAIL) 1708 return False 1709 1710 def _load_config(self, options): 1711 printc("Loading config files is DEPRECATED" 1712 " you should use the new testsuite format now",) 1713 1714 for tester in self.testers: 1715 tester.options = options 1716 globals()[tester.name] = tester 1717 globals()["options"] = options 1718 c__file__ = __file__ 1719 globals()["__file__"] = self.options.config 1720 exec(compile(open(self.options.config).read(), 1721 self.options.config, 'exec'), globals()) 1722 globals()["__file__"] = c__file__ 1723 1724 def set_settings(self, options, args): 1725 if options.xunit_file: 1726 self.reporter = reporters.XunitReporter(options) 1727 else: 1728 self.reporter = reporters.Reporter(options) 1729 1730 self.options = options 1731 wanted_testers = None 1732 for tester in self.testers: 1733 if tester.name in args: 1734 wanted_testers = tester.name 1735 1736 if wanted_testers: 1737 testers = self.testers 1738 self.testers = [] 1739 for tester in testers: 1740 if tester.name in args: 1741 self.testers.append(tester) 1742 args.remove(tester.name) 1743 1744 if options.config: 1745 self._load_config(options) 1746 1747 self._load_testsuites() 1748 if not self.options.testsuites: 1749 printc("Not testsuite loaded!", Colors.FAIL) 1750 return False 1751 1752 for tester in self.testers: 1753 tester.set_settings(options, args, self.reporter) 1754 1755 if not options.config and options.testsuites: 1756 if self._setup_testsuites() is False: 1757 return False 1758 1759 if self.options.check_bugs_status: 1760 printc("-> Checking bugs resolution... ", end='') 1761 1762 for tester in self.testers: 1763 if not tester.check_blacklists(): 1764 return False 1765 1766 tester.log_blacklists() 1767 1768 if not tester.check_expected_issues(): 1769 return False 1770 1771 if self.options.check_bugs_status: 1772 printc("OK", Colors.OKGREEN) 1773 1774 if self.needs_http_server() or options.httponly is True: 1775 self.httpsrv = HTTPServer(options) 1776 self.httpsrv.start() 1777 1778 if options.no_display: 1779 self.vfb_server = get_virual_frame_buffer_server(options) 1780 res = self.vfb_server.start() 1781 if res[0] is False: 1782 printc("Could not start virtual frame server: %s" % res[1], 1783 Colors.FAIL) 1784 return False 1785 os.environ["DISPLAY"] = self.vfb_server.display_id 1786 1787 return True 1788 1789 def _check_tester_has_other_testsuite(self, testsuite, tester): 1790 if tester.name != testsuite.TEST_MANAGER[0]: 1791 return True 1792 1793 for t in self.options.testsuites: 1794 if t != testsuite: 1795 for other_testmanager in t.TEST_MANAGER: 1796 if other_testmanager == tester.name: 1797 return True 1798 1799 return False 1800 1801 def _check_defined_tests(self, tester, tests): 1802 if self.options.blacklisted_tests or self.options.wanted_tests: 1803 return 1804 1805 tests_names = [test.classname for test in tests] 1806 testlist_changed = False 1807 for testsuite in self.options.testsuites: 1808 if not self._check_tester_has_other_testsuite(testsuite, tester) \ 1809 and tester.check_testslist: 1810 try: 1811 testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist", 1812 'r+') 1813 1814 know_tests = testlist_file.read().split("\n") 1815 testlist_file.close() 1816 1817 testlist_file = open(os.path.splitext(testsuite.__file__)[0] + ".testslist", 1818 'w') 1819 except IOError: 1820 continue 1821 1822 optional_out = [] 1823 for test in know_tests: 1824 if test and test.strip('~') not in tests_names: 1825 if not test.startswith('~'): 1826 testlist_changed = True 1827 printc("Test %s Not in testsuite %s anymore" 1828 % (test, testsuite.__file__), Colors.FAIL) 1829 else: 1830 optional_out.append((test, None)) 1831 1832 tests_names = sorted([(test.classname, test) for test in tests] + optional_out, 1833 key=lambda x: x[0].strip('~')) 1834 1835 for tname, test in tests_names: 1836 if test and test.optional: 1837 tname = '~' + tname 1838 testlist_file.write("%s\n" % (tname)) 1839 if tname and tname not in know_tests: 1840 printc("Test %s is NEW in testsuite %s" 1841 % (tname, testsuite.__file__), 1842 Colors.FAIL if self.options.fail_on_testlist_change else Colors.OKGREEN) 1843 testlist_changed = True 1844 1845 testlist_file.close() 1846 break 1847 1848 return testlist_changed 1849 1850 def list_tests(self): 1851 for tester in self.testers: 1852 if not self._tester_needed(tester): 1853 continue 1854 1855 tests = tester.list_tests() 1856 if self._check_defined_tests(tester, tests) and \ 1857 self.options.fail_on_testlist_change: 1858 raise RuntimeError("Unexpected new test in testsuite.") 1859 1860 self.tests.extend(tests) 1861 self.tests.sort(key=lambda test: test.classname) 1862 return self.tests 1863 1864 def _tester_needed(self, tester): 1865 for testsuite in self.options.testsuites: 1866 if tester.name in testsuite.TEST_MANAGER: 1867 return True 1868 return False 1869 1870 def server_wrapper(self, ready): 1871 self.server = GstValidateTCPServer( 1872 ('localhost', 0), GstValidateListener) 1873 self.server.socket.settimeout(None) 1874 self.server.launcher = self 1875 self.serverport = self.server.socket.getsockname()[1] 1876 self.info("%s server port: %s" % (self, self.serverport)) 1877 ready.set() 1878 1879 self.server.serve_forever(poll_interval=0.05) 1880 1881 def _start_server(self): 1882 self.info("Starting TCP Server") 1883 ready = threading.Event() 1884 self.server_thread = threading.Thread(target=self.server_wrapper, 1885 kwargs={'ready': ready}) 1886 self.server_thread.start() 1887 ready.wait() 1888 os.environ["GST_VALIDATE_SERVER"] = "tcp://localhost:%s" % self.serverport 1889 1890 def _stop_server(self): 1891 if self.server: 1892 self.server.shutdown() 1893 self.server_thread.join() 1894 self.server.server_close() 1895 self.server = None 1896 1897 def test_wait(self): 1898 while True: 1899 # Check process every second for timeout 1900 try: 1901 self.queue.get(timeout=1) 1902 except queue.Empty: 1903 pass 1904 1905 for test in self.jobs: 1906 if test.process_update(): 1907 self.jobs.remove(test) 1908 return test 1909 1910 def tests_wait(self): 1911 try: 1912 test = self.test_wait() 1913 test.check_results() 1914 except KeyboardInterrupt: 1915 for test in self.jobs: 1916 test.kill_subprocess() 1917 raise 1918 1919 return test 1920 1921 def start_new_job(self, tests_left): 1922 try: 1923 test = tests_left.pop(0) 1924 except IndexError: 1925 return False 1926 1927 test.test_start(self.queue) 1928 1929 self.jobs.append(test) 1930 1931 return True 1932 1933 def _run_tests(self, running_tests=None, all_alone=False, retry_on_failures=False): 1934 if not self.all_tests: 1935 self.all_tests = self.list_tests() 1936 1937 if not running_tests: 1938 running_tests = self.tests 1939 1940 self.total_num_tests = len(self.all_tests) 1941 printc("\nRunning %d tests..." % self.total_num_tests, color=Colors.HEADER) 1942 1943 self.reporter.init_timer() 1944 alone_tests = [] 1945 tests = [] 1946 for test in running_tests: 1947 if test.is_parallel and not all_alone: 1948 tests.append(test) 1949 else: 1950 alone_tests.append(test) 1951 1952 max_num_jobs = min(self.options.num_jobs, len(tests)) 1953 jobs_running = 0 1954 1955 # if order of test execution doesn't matter, shuffle 1956 # the order to optimize cpu usage 1957 if self.options.shuffle: 1958 random.shuffle(tests) 1959 random.shuffle(alone_tests) 1960 1961 current_test_num = 1 1962 to_retry = [] 1963 for num_jobs, tests in [(max_num_jobs, tests), (1, alone_tests)]: 1964 tests_left = list(tests) 1965 for i in range(num_jobs): 1966 if not self.start_new_job(tests_left): 1967 break 1968 jobs_running += 1 1969 1970 while jobs_running != 0: 1971 test = self.tests_wait() 1972 jobs_running -= 1 1973 test.number = "[%d / %d] " % (current_test_num, 1974 self.total_num_tests) 1975 current_test_num += 1 1976 res = test.test_end(retry_on_failure=retry_on_failures) 1977 to_report = True 1978 if res != Result.PASSED: 1979 if self.options.forever or self.options.fatal_error: 1980 return False 1981 1982 if retry_on_failures: 1983 if not self.options.redirect_logs and test.allow_flakiness: 1984 test.copy_logfiles() 1985 printc(test) 1986 to_retry.append(test) 1987 1988 # Not adding to final report if flakiness is tolerated 1989 to_report = not test.allow_flakiness 1990 if to_report: 1991 self.reporter.after_test(test) 1992 if retry_on_failures: 1993 test.clean() 1994 if self.start_new_job(tests_left): 1995 jobs_running += 1 1996 1997 if to_retry: 1998 printc("--> Rerunning the following tests to see if they are flaky:", Colors.WARNING) 1999 for test in to_retry: 2000 printc(' * %s' % test.classname) 2001 printc('') 2002 return self._run_tests(to_retry, all_alone=True, retry_on_failures=False) 2003 2004 return True 2005 2006 def clean_tests(self, stop_server=False): 2007 for test in self.tests: 2008 test.clean() 2009 if stop_server: 2010 self._stop_server() 2011 2012 def run_tests(self): 2013 r = 0 2014 try: 2015 self._start_server() 2016 if self.options.forever: 2017 r = 1 2018 while True: 2019 printc("-> Iteration %d" % r, end='\r') 2020 2021 if not self._run_tests(): 2022 break 2023 r += 1 2024 self.clean_tests() 2025 msg = "-> Iteration %d... %sOK%s" % (r, Colors.OKGREEN, Colors.ENDC) 2026 printc(msg, end="\r") 2027 2028 return False 2029 elif self.options.n_runs: 2030 res = True 2031 for r in range(self.options.n_runs): 2032 printc("-> Iteration %d" % r, end='\r') 2033 if not self._run_tests(): 2034 res = False 2035 printc("ERROR", Colors.FAIL, end="\r") 2036 else: 2037 printc("OK", Colors.OKGREEN, end="\r") 2038 self.clean_tests() 2039 2040 return res 2041 else: 2042 return self._run_tests(retry_on_failures=self.options.retry_on_failures) 2043 finally: 2044 if self.options.forever: 2045 printc("\n-> Ran %d times" % r) 2046 if self.httpsrv: 2047 self.httpsrv.stop() 2048 if self.vfb_server: 2049 self.vfb_server.stop() 2050 self.clean_tests(True) 2051 2052 def final_report(self): 2053 return self.reporter.final_report() 2054 2055 def needs_http_server(self): 2056 for tester in self.testers: 2057 if tester.needs_http_server(): 2058 return True 2059 2060 2061class NamedDic(object): 2062 2063 def __init__(self, props): 2064 if props: 2065 for name, value in props.items(): 2066 setattr(self, name, value) 2067 2068 2069class Scenario(object): 2070 2071 def __init__(self, name, props, path=None): 2072 self.name = name 2073 self.path = path 2074 2075 for prop, value in props: 2076 setattr(self, prop.replace("-", "_"), value) 2077 2078 def get_execution_name(self): 2079 if self.path is not None: 2080 return self.path 2081 else: 2082 return self.name 2083 2084 def seeks(self): 2085 if hasattr(self, "seek"): 2086 return bool(self.seek) 2087 2088 return False 2089 2090 def needs_clock_sync(self): 2091 if hasattr(self, "need_clock_sync"): 2092 return bool(self.need_clock_sync) 2093 2094 return False 2095 2096 def needs_live_content(self): 2097 # Scenarios that can only be used on live content 2098 if hasattr(self, "live_content_required"): 2099 return bool(self.live_content_required) 2100 return False 2101 2102 def compatible_with_live_content(self): 2103 # if a live content is required it's implicitely compatible with 2104 # live content 2105 if self.needs_live_content(): 2106 return True 2107 if hasattr(self, "live_content_compatible"): 2108 return bool(self.live_content_compatible) 2109 return False 2110 2111 def get_min_media_duration(self): 2112 if hasattr(self, "min_media_duration"): 2113 return float(self.min_media_duration) 2114 2115 return 0 2116 2117 def does_reverse_playback(self): 2118 if hasattr(self, "reverse_playback"): 2119 return bool(self.reverse_playback) 2120 2121 return False 2122 2123 def get_duration(self): 2124 try: 2125 return float(getattr(self, "duration")) 2126 except AttributeError: 2127 return 0 2128 2129 def get_min_tracks(self, track_type): 2130 try: 2131 return int(getattr(self, "min_%s_track" % track_type)) 2132 except AttributeError: 2133 return 0 2134 2135 def __repr__(self): 2136 return "<Scenario %s>" % self.name 2137 2138 2139class ScenarioManager(Loggable): 2140 _instance = None 2141 all_scenarios = [] 2142 2143 FILE_EXTENSION = "scenario" 2144 2145 def __new__(cls, *args, **kwargs): 2146 if not cls._instance: 2147 cls._instance = super(ScenarioManager, cls).__new__( 2148 cls, *args, **kwargs) 2149 cls._instance.config = None 2150 cls._instance.discovered = False 2151 Loggable.__init__(cls._instance) 2152 2153 return cls._instance 2154 2155 def find_special_scenarios(self, mfile): 2156 scenarios = [] 2157 mfile_bname = os.path.basename(mfile) 2158 2159 for f in os.listdir(os.path.dirname(mfile)): 2160 if re.findall("%s\..*\.%s$" % (re.escape(mfile_bname), self.FILE_EXTENSION), f): 2161 scenarios.append(os.path.join(os.path.dirname(mfile), f)) 2162 2163 if scenarios: 2164 scenarios = self.discover_scenarios(scenarios, mfile) 2165 2166 return scenarios 2167 2168 def discover_scenarios(self, scenario_paths=[], mfile=None): 2169 """ 2170 Discover scenarios specified in scenario_paths or the default ones 2171 if nothing specified there 2172 """ 2173 scenarios = [] 2174 scenario_defs = os.path.join(self.config.main_dir, "scenarios.def") 2175 logs = open(os.path.join(self.config.logsdir, 2176 "scenarios_discovery.log"), 'w') 2177 2178 try: 2179 command = [GstValidateBaseTestManager.COMMAND, 2180 "--scenarios-defs-output-file", scenario_defs] 2181 command.extend(scenario_paths) 2182 subprocess.check_call(command, stdout=logs, stderr=logs) 2183 except subprocess.CalledProcessError as e: 2184 self.error(e) 2185 pass 2186 2187 config = configparser.RawConfigParser() 2188 f = open(scenario_defs) 2189 config.readfp(f) 2190 2191 for section in config.sections(): 2192 if scenario_paths: 2193 for scenario_path in scenario_paths: 2194 if section in os.path.splitext(os.path.basename(scenario_path))[0]: 2195 if mfile is None: 2196 name = section 2197 path = scenario_path 2198 else: 2199 # The real name of the scenario is: 2200 # filename.REALNAME.scenario 2201 name = scenario_path.replace(mfile + ".", "").replace( 2202 "." + self.FILE_EXTENSION, "") 2203 path = scenario_path 2204 else: 2205 name = section 2206 path = None 2207 2208 props = config.items(section) 2209 scenarios.append(Scenario(name, props, path)) 2210 2211 if not scenario_paths: 2212 self.discovered = True 2213 self.all_scenarios.extend(scenarios) 2214 2215 return scenarios 2216 2217 def get_scenario(self, name): 2218 if name is not None and os.path.isabs(name) and name.endswith(self.FILE_EXTENSION): 2219 scenarios = self.discover_scenarios([name]) 2220 2221 if scenarios: 2222 return scenarios[0] 2223 2224 if self.discovered is False: 2225 self.discover_scenarios() 2226 2227 if name is None: 2228 return self.all_scenarios 2229 2230 try: 2231 return [scenario for scenario in self.all_scenarios if scenario.name == name][0] 2232 except IndexError: 2233 self.warning("Scenario: %s not found" % name) 2234 return None 2235 2236 2237class GstValidateBaseTestManager(TestsManager): 2238 scenarios_manager = ScenarioManager() 2239 features_cache = {} 2240 2241 def __init__(self): 2242 super(GstValidateBaseTestManager, self).__init__() 2243 self._scenarios = [] 2244 self._encoding_formats = [] 2245 2246 @classmethod 2247 def update_commands(cls, extra_paths=None): 2248 for varname, cmd in {'': 'gst-validate', 2249 'TRANSCODING_': 'gst-validate-transcoding', 2250 'MEDIA_CHECK_': 'gst-validate-media-check', 2251 'RTSP_SERVER_': 'gst-validate-rtsp-server', 2252 'INSPECT_': 'gst-inspect'}.items(): 2253 setattr(cls, varname + 'COMMAND', which(cmd + '-1.0', extra_paths)) 2254 2255 @classmethod 2256 def has_feature(cls, featurename): 2257 try: 2258 return cls.features_cache[featurename] 2259 except KeyError: 2260 pass 2261 2262 try: 2263 subprocess.check_output([cls.INSPECT_COMMAND, featurename]) 2264 res = True 2265 except subprocess.CalledProcessError: 2266 res = False 2267 2268 cls.features_cache[featurename] = res 2269 return res 2270 2271 def add_scenarios(self, scenarios): 2272 """ 2273 @scenarios A list or a unic scenario name(s) to be run on the tests. 2274 They are just the default scenarios, and then depending on 2275 the TestsGenerator to be used you can have more fine grained 2276 control on what to be run on each serie of tests. 2277 """ 2278 if isinstance(scenarios, list): 2279 self._scenarios.extend(scenarios) 2280 else: 2281 self._scenarios.append(scenarios) 2282 2283 self._scenarios = list(set(self._scenarios)) 2284 2285 def set_scenarios(self, scenarios): 2286 """ 2287 Override the scenarios 2288 """ 2289 self._scenarios = [] 2290 self.add_scenarios(scenarios) 2291 2292 def get_scenarios(self): 2293 return self._scenarios 2294 2295 def add_encoding_formats(self, encoding_formats): 2296 """ 2297 :param encoding_formats: A list or one single #MediaFormatCombinations describing wanted output 2298 formats for transcoding test. 2299 They are just the default encoding formats, and then depending on 2300 the TestsGenerator to be used you can have more fine grained 2301 control on what to be run on each serie of tests. 2302 """ 2303 if isinstance(encoding_formats, list): 2304 self._encoding_formats.extend(encoding_formats) 2305 else: 2306 self._encoding_formats.append(encoding_formats) 2307 2308 self._encoding_formats = list(set(self._encoding_formats)) 2309 2310 def get_encoding_formats(self): 2311 return self._encoding_formats 2312 2313 2314GstValidateBaseTestManager.update_commands() 2315 2316 2317class MediaDescriptor(Loggable): 2318 2319 def __init__(self): 2320 Loggable.__init__(self) 2321 2322 def get_path(self): 2323 raise NotImplemented 2324 2325 def has_frames(self): 2326 return False 2327 2328 def get_media_filepath(self): 2329 raise NotImplemented 2330 2331 def skip_parsers(self): 2332 return False 2333 2334 def get_caps(self): 2335 raise NotImplemented 2336 2337 def get_uri(self): 2338 raise NotImplemented 2339 2340 def get_duration(self): 2341 raise NotImplemented 2342 2343 def get_protocol(self): 2344 raise NotImplemented 2345 2346 def is_seekable(self): 2347 raise NotImplemented 2348 2349 def is_live(self): 2350 raise NotImplemented 2351 2352 def is_image(self): 2353 raise NotImplemented 2354 2355 def get_num_tracks(self, track_type): 2356 raise NotImplemented 2357 2358 def can_play_reverse(self): 2359 raise NotImplemented 2360 2361 def prerrols(self): 2362 return True 2363 2364 def is_compatible(self, scenario): 2365 if scenario is None: 2366 return True 2367 2368 if scenario.seeks() and (not self.is_seekable() or self.is_image()): 2369 self.debug("Do not run %s as %s does not support seeking", 2370 scenario, self.get_uri()) 2371 return False 2372 2373 if self.is_image() and scenario.needs_clock_sync(): 2374 self.debug("Do not run %s as %s is an image", 2375 scenario, self.get_uri()) 2376 return False 2377 2378 if not self.can_play_reverse() and scenario.does_reverse_playback(): 2379 return False 2380 2381 if not self.is_live() and scenario.needs_live_content(): 2382 self.debug("Do not run %s as %s is not a live content", 2383 scenario, self.get_uri()) 2384 return False 2385 2386 if self.is_live() and not scenario.compatible_with_live_content(): 2387 self.debug("Do not run %s as %s is a live content", 2388 scenario, self.get_uri()) 2389 return False 2390 2391 if not self.prerrols() and getattr(scenario, 'needs_preroll', False): 2392 return False 2393 2394 if self.get_duration() and self.get_duration() / GST_SECOND < scenario.get_min_media_duration(): 2395 self.debug( 2396 "Do not run %s as %s is too short (%i < min media duation : %i", 2397 scenario, self.get_uri(), 2398 self.get_duration() / GST_SECOND, 2399 scenario.get_min_media_duration()) 2400 return False 2401 2402 for track_type in ['audio', 'subtitle', 'video']: 2403 if self.get_num_tracks(track_type) < scenario.get_min_tracks(track_type): 2404 self.debug("%s -- %s | At least %s %s track needed < %s" 2405 % (scenario, self.get_uri(), track_type, 2406 scenario.get_min_tracks(track_type), 2407 self.get_num_tracks(track_type))) 2408 return False 2409 2410 return True 2411 2412 2413class GstValidateMediaDescriptor(MediaDescriptor): 2414 # Some extension file for discovering results 2415 MEDIA_INFO_EXT = "media_info" 2416 PUSH_MEDIA_INFO_EXT = "media_info.push" 2417 STREAM_INFO_EXT = "stream_info" 2418 2419 def __init__(self, xml_path): 2420 super(GstValidateMediaDescriptor, self).__init__() 2421 2422 self._xml_path = xml_path 2423 try: 2424 media_xml = ET.parse(xml_path).getroot() 2425 except xml.etree.ElementTree.ParseError: 2426 printc("Could not parse %s" % xml_path, 2427 Colors.FAIL) 2428 raise 2429 2430 self._extract_data(media_xml) 2431 2432 self.set_protocol(urllib.parse.urlparse( 2433 urllib.parse.urlparse(self.get_uri()).scheme).scheme) 2434 2435 def skip_parsers(self): 2436 return self._skip_parsers 2437 2438 def has_frames(self): 2439 return self._has_frames 2440 2441 def _extract_data(self, media_xml): 2442 # Extract the information we need from the xml 2443 self._caps = media_xml.findall("streams")[0].attrib["caps"] 2444 self._track_caps = [] 2445 try: 2446 streams = media_xml.findall("streams")[0].findall("stream") 2447 except IndexError: 2448 pass 2449 else: 2450 for stream in streams: 2451 self._track_caps.append( 2452 (stream.attrib["type"], stream.attrib["caps"])) 2453 self._uri = media_xml.attrib["uri"] 2454 self._skip_parsers = bool(int(media_xml.attrib.get('skip-parsers', 0))) 2455 self._has_frames = bool(int(media_xml.attrib["frame-detection"])) 2456 self._duration = int(media_xml.attrib["duration"]) 2457 self._protocol = media_xml.get("protocol", None) 2458 self._is_seekable = media_xml.attrib["seekable"].lower() == "true" 2459 self._is_live = media_xml.get("live", "false").lower() == "true" 2460 self._is_image = False 2461 for stream in media_xml.findall("streams")[0].findall("stream"): 2462 if stream.attrib["type"] == "image": 2463 self._is_image = True 2464 self._track_types = [] 2465 for stream in media_xml.findall("streams")[0].findall("stream"): 2466 self._track_types.append(stream.attrib["type"]) 2467 2468 @staticmethod 2469 def new_from_uri(uri, verbose=False, include_frames=False, is_push=False): 2470 """ 2471 include_frames = 0 # Never 2472 include_frames = 1 # always 2473 include_frames = 2 # if previous file included them 2474 2475 """ 2476 media_path = utils.url2path(uri) 2477 2478 ext = GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT if is_push else \ 2479 GstValidateMediaDescriptor.MEDIA_INFO_EXT 2480 descriptor_path = "%s.%s" % (media_path, ext) 2481 args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ") 2482 args.append(uri) 2483 if include_frames == 2: 2484 try: 2485 media_xml = ET.parse(descriptor_path).getroot() 2486 2487 include_frames = bool(int(media_xml.attrib["frame-detection"])) 2488 if bool(int(media_xml.attrib.get("skip-parsers", 0))): 2489 args.append("--skip-parsers") 2490 except FileNotFoundError: 2491 pass 2492 else: 2493 include_frames = bool(include_frames) 2494 2495 args.extend(["--output-file", descriptor_path]) 2496 if include_frames: 2497 args.extend(["--full"]) 2498 2499 if verbose: 2500 printc("Generating media info for %s\n" 2501 " Command: '%s'" % (media_path, ' '.join(args)), 2502 Colors.OKBLUE) 2503 2504 try: 2505 subprocess.check_output(args, stderr=open(os.devnull)) 2506 except subprocess.CalledProcessError as e: 2507 if verbose: 2508 printc("Result: Failed", Colors.FAIL) 2509 else: 2510 loggable.warning("GstValidateMediaDescriptor", 2511 "Exception: %s" % e) 2512 return None 2513 2514 if verbose: 2515 printc("Result: Passed", Colors.OKGREEN) 2516 2517 try: 2518 return GstValidateMediaDescriptor(descriptor_path) 2519 except (IOError, xml.etree.ElementTree.ParseError): 2520 return None 2521 2522 def get_path(self): 2523 return self._xml_path 2524 2525 def need_clock_sync(self): 2526 return Protocols.needs_clock_sync(self.get_protocol()) 2527 2528 def get_media_filepath(self): 2529 if self.get_protocol() == Protocols.FILE: 2530 return self._xml_path.replace("." + self.MEDIA_INFO_EXT, "") 2531 elif self.get_protocol() == Protocols.PUSHFILE: 2532 return self._xml_path.replace("." + self.PUSH_MEDIA_INFO_EXT, "") 2533 else: 2534 return self._xml_path.replace("." + self.STREAM_INFO_EXT, "") 2535 2536 def get_caps(self): 2537 return self._caps 2538 2539 def get_tracks_caps(self): 2540 return self._track_caps 2541 2542 def get_uri(self): 2543 return self._uri 2544 2545 def get_duration(self): 2546 return self._duration 2547 2548 def set_protocol(self, protocol): 2549 if self._xml_path.endswith(GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT): 2550 self._protocol = Protocols.PUSHFILE 2551 else: 2552 self._protocol = protocol 2553 2554 def get_protocol(self): 2555 return self._protocol 2556 2557 def is_seekable(self): 2558 return self._is_seekable 2559 2560 def is_live(self): 2561 return self._is_live 2562 2563 def can_play_reverse(self): 2564 return True 2565 2566 def is_image(self): 2567 return self._is_image 2568 2569 def get_num_tracks(self, track_type): 2570 n = 0 2571 for t in self._track_types: 2572 if t == track_type: 2573 n += 1 2574 2575 return n 2576 2577 def get_clean_name(self): 2578 name = os.path.basename(self.get_path()) 2579 name = re.sub("\.stream_info|\.media_info", "", name) 2580 2581 return name.replace('.', "_") 2582 2583 2584class MediaFormatCombination(object): 2585 FORMATS = {"aac": "audio/mpeg,mpegversion=4", # Audio 2586 "ac3": "audio/x-ac3", 2587 "vorbis": "audio/x-vorbis", 2588 "mp3": "audio/mpeg,mpegversion=1,layer=3", 2589 "opus": "audio/x-opus", 2590 "rawaudio": "audio/x-raw", 2591 2592 # Video 2593 "h264": "video/x-h264", 2594 "h265": "video/x-h265", 2595 "vp8": "video/x-vp8", 2596 "vp9": "video/x-vp9", 2597 "theora": "video/x-theora", 2598 "prores": "video/x-prores", 2599 "jpeg": "image/jpeg", 2600 2601 # Containers 2602 "webm": "video/webm", 2603 "ogg": "application/ogg", 2604 "mkv": "video/x-matroska", 2605 "mp4": "video/quicktime,variant=iso;", 2606 "quicktime": "video/quicktime;"} 2607 2608 def __str__(self): 2609 return "%s and %s in %s" % (self.audio, self.video, self.container) 2610 2611 def __init__(self, container, audio, video): 2612 """ 2613 Describes a media format to be used for transcoding tests. 2614 2615 :param container: A string defining the container format to be used, must bin in self.FORMATS 2616 :param audio: A string defining the audio format to be used, must bin in self.FORMATS 2617 :param video: A string defining the video format to be used, must bin in self.FORMATS 2618 """ 2619 self.container = container 2620 self.audio = audio 2621 self.video = video 2622 2623 def get_caps(self, track_type): 2624 try: 2625 return self.FORMATS[self.__dict__[track_type]] 2626 except KeyError: 2627 return None 2628 2629 def get_audio_caps(self): 2630 return self.get_caps("audio") 2631 2632 def get_video_caps(self): 2633 return self.get_caps("video") 2634 2635 def get_muxer_caps(self): 2636 return self.get_caps("container") 2637