1#!/usr/bin/env python3 2# 3# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com> 4# 5# This program is free software; you can redistribute it and/or 6# modify it under the terms of the GNU Lesser General Public 7# License as published by the Free Software Foundation; either 8# version 2.1 of the License, or (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13# Lesser General Public License for more details. 14# 15# You should have received a copy of the GNU Lesser General Public 16# License along with this program; if not, write to the 17# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, 18# Boston, MA 02110-1301, USA. 19import argparse 20import os 21import copy 22import sys 23import time 24import urllib.parse 25import shlex 26import socket 27import subprocess 28import configparser 29import json 30from launcher.loggable import Loggable 31 32from launcher.baseclasses import GstValidateTest, Test, \ 33 ScenarioManager, NamedDic, GstValidateTestsGenerator, \ 34 GstValidateMediaDescriptor, GstValidateEncodingTestInterface, \ 35 GstValidateBaseTestManager, MediaDescriptor, MediaFormatCombination 36 37from launcher.utils import path2url, url2path, DEFAULT_TIMEOUT, which, \ 38 GST_SECOND, Result, Protocols, mkdir, printc, Colors, get_data_file, \ 39 kill_subprocess, format_config_template 40 41# 42# Private global variables # 43# 44 45# definitions of commands to use 46parser = argparse.ArgumentParser(add_help=False) 47parser.add_argument("--validate-tools-path", dest="validate_tools_path", 48 default="", 49 help="defines the paths to look for GstValidate tools.") 50options, args = parser.parse_known_args() 51 52GstValidateBaseTestManager.update_commands(options.validate_tools_path) 53AUDIO_ONLY_FILE_TRANSCODING_RATIO = 5 54 55# 56# API to be used to create testsuites # 57# 58 59""" 60Some info about protocols and how to handle them 61""" 62GST_VALIDATE_CAPS_TO_PROTOCOL = [("application/x-hls", Protocols.HLS), 63 ("application/dash+xml", Protocols.DASH)] 64 65 66class GstValidateMediaCheckTestsGenerator(GstValidateTestsGenerator): 67 68 def __init__(self, test_manager): 69 GstValidateTestsGenerator.__init__(self, "media_check", test_manager) 70 71 def populate_tests(self, uri_minfo_special_scenarios, scenarios): 72 for uri, mediainfo, special_scenarios in uri_minfo_special_scenarios: 73 protocol = mediainfo.media_descriptor.get_protocol() 74 timeout = DEFAULT_TIMEOUT 75 76 classname = "%s.media_check.%s" % (protocol, 77 os.path.basename(url2path(uri)).replace(".", "_")) 78 self.add_test(GstValidateMediaCheckTest(classname, 79 self.test_manager.options, 80 self.test_manager.reporter, 81 mediainfo.media_descriptor, 82 uri, 83 mediainfo.path, 84 timeout=timeout)) 85 86 87class GstValidateTranscodingTestsGenerator(GstValidateTestsGenerator): 88 89 def __init__(self, test_manager): 90 GstValidateTestsGenerator.__init__(self, "transcode", test_manager) 91 92 def populate_tests(self, uri_minfo_special_scenarios, scenarios): 93 for uri, mediainfo, special_scenarios in uri_minfo_special_scenarios: 94 if mediainfo.media_descriptor.is_image(): 95 continue 96 97 protocol = mediainfo.media_descriptor.get_protocol() 98 if protocol == Protocols.RTSP: 99 continue 100 101 for comb in self.test_manager.get_encoding_formats(): 102 classname = "%s.transcode.to_%s.%s" % (mediainfo.media_descriptor.get_protocol(), 103 str(comb).replace( 104 ' ', '_'), 105 mediainfo.media_descriptor.get_clean_name()) 106 self.add_test(GstValidateTranscodingTest(classname, 107 self.test_manager.options, 108 self.test_manager.reporter, 109 comb, 110 uri, 111 mediainfo.media_descriptor)) 112 113 114class FakeMediaDescriptor(MediaDescriptor): 115 116 def __init__(self, infos, pipeline_desc): 117 MediaDescriptor.__init__(self) 118 self._infos = infos 119 self._pipeline_desc = pipeline_desc 120 121 def get_path(self): 122 return self._infos.get('path', None) 123 124 def get_media_filepath(self): 125 return self._infos.get('media-filepath', None) 126 127 def get_caps(self): 128 return self._infos.get('caps', None) 129 130 def get_uri(self): 131 return self._infos.get('uri', None) 132 133 def get_duration(self): 134 return int(self._infos.get('duration', 0)) * GST_SECOND 135 136 def get_protocol(self): 137 return self._infos.get('protocol', "launch_pipeline") 138 139 def is_seekable(self): 140 return self._infos.get('is-seekable', True) 141 142 def is_image(self): 143 return self._infos.get('is-image', False) 144 145 def is_live(self): 146 return self._infos.get('is-live', False) 147 148 def get_num_tracks(self, track_type): 149 return self._infos.get('num-%s-tracks' % track_type, 150 self._pipeline_desc.count(track_type + "sink")) 151 152 def can_play_reverse(self): 153 return self._infos.get('plays-reverse', False) 154 155 156class GstValidatePipelineTestsGenerator(GstValidateTestsGenerator): 157 158 def __init__(self, name, test_manager, pipeline_template=None, 159 pipelines_descriptions=None, valid_scenarios=None): 160 """ 161 @name: The name of the generator 162 @pipeline_template: A template pipeline to be used to generate actual pipelines 163 @pipelines_descriptions: A list of tuple of the form: 164 (test_name, pipeline_description, extra_data) 165 extra_data being a dictionnary with the follwing keys: 166 'scenarios': ["the", "valide", "scenarios", "names"] 167 'duration': the_duration # in seconds 168 'timeout': a_timeout # in seconds 169 'hard_timeout': a_hard_timeout # in seconds 170 171 @valid_scenarios: A list of scenario name that can be used with that generator 172 """ 173 valid_scenarios = valid_scenarios or [] 174 GstValidateTestsGenerator.__init__(self, name, test_manager) 175 self._pipeline_template = pipeline_template 176 self._pipelines_descriptions = [] 177 for description in pipelines_descriptions or []: 178 if not isinstance(description, dict): 179 desc_dict = {"name": description[0], 180 "pipeline": description[1]} 181 if len(description) >= 3: 182 desc_dict["extra_data"] = description[2] 183 self._pipelines_descriptions.append(desc_dict) 184 else: 185 self._pipelines_descriptions.append(description) 186 self._valid_scenarios = valid_scenarios 187 188 @classmethod 189 def from_json(cls, test_manager, json_file, extra_data=None): 190 """ 191 :param json_file: Path to a JSON file containing pipeline tests. 192 :param extra_data: Variables available for interpolation in validate 193 configs and scenario actions. 194 """ 195 if extra_data is None: 196 extra_data = {} 197 with open(json_file, 'r') as f: 198 descriptions = json.load(f) 199 200 name = os.path.basename(json_file).replace('.json', '') 201 pipelines_descriptions = [] 202 for test_name, defs in descriptions.items(): 203 tests_definition = {'name': test_name, 'pipeline': defs['pipeline']} 204 test_private_dir = os.path.join(test_manager.options.privatedir, 205 name, test_name) 206 207 config_file = None 208 if 'config' in defs: 209 os.makedirs(test_private_dir, exist_ok=True) 210 config_file = os.path.join(test_private_dir, 211 test_name + '.config') 212 with open(config_file, 'w') as f: 213 f.write(format_config_template(extra_data, 214 '\n'.join(defs['config']) + '\n', test_name)) 215 216 scenarios = [] 217 for scenario in defs.get('scenarios', []): 218 if isinstance(scenario, str): 219 # Path to a scenario file 220 scenarios.append(scenario) 221 else: 222 # Dictionary defining a new scenario in-line 223 scenario_name = scenario_file = scenario['name'] 224 actions = scenario.get('actions') 225 if actions: 226 os.makedirs(test_private_dir, exist_ok=True) 227 scenario_file = os.path.join( 228 test_private_dir, scenario_name + '.scenario') 229 with open(scenario_file, 'w') as f: 230 f.write('\n'.join(action % extra_data for action in actions) + '\n') 231 scenarios.append(scenario_file) 232 tests_definition['extra_data'] = {'scenarios': scenarios, 'config_file': config_file} 233 tests_definition['pipeline_data'] = {"config_path": os.path.dirname(json_file)} 234 tests_definition['pipeline_data'].update(extra_data) 235 pipelines_descriptions.append(tests_definition) 236 237 return GstValidatePipelineTestsGenerator(name, test_manager, pipelines_descriptions=pipelines_descriptions) 238 239 def get_fname(self, scenario, protocol=None, name=None): 240 if name is None: 241 name = self.name 242 243 if protocol is not None: 244 protocol_str = "%s." % protocol 245 else: 246 protocol_str = "" 247 248 if scenario is not None and scenario.name.lower() != "none": 249 return "%s%s.%s" % (protocol_str, name, scenario.name) 250 251 return ("%s.%s.%s" % (protocol_str, self.name, name)).replace("..", ".") 252 253 def generate_tests(self, uri_minfo_special_scenarios, scenarios): 254 if self._valid_scenarios is None: 255 scenarios = [None] 256 elif self._valid_scenarios: 257 scenarios = [scenario for scenario in scenarios if 258 scenario is not None and scenario.name in self._valid_scenarios] 259 260 return super(GstValidatePipelineTestsGenerator, self).generate_tests( 261 uri_minfo_special_scenarios, scenarios) 262 263 def populate_tests(self, uri_minfo_special_scenarios, scenarios): 264 for description in self._pipelines_descriptions: 265 pipeline = description['pipeline'] 266 extra_data = description.get('extra_data', {}) 267 pipeline_data = description.get('pipeline_data', {}) 268 269 if 'scenarios' in extra_data: 270 # A pipeline description can override the default scenario set. 271 # The pipeline description may specify an empty list of 272 # scenarios, in which case one test will be generated with no 273 # scenario. 274 scenarios_to_iterate = extra_data['scenarios'] or [None] 275 else: 276 scenarios_to_iterate = scenarios 277 278 for scenario in scenarios_to_iterate: 279 if isinstance(scenario, str): 280 scenario = self.test_manager.scenarios_manager.get_scenario( 281 scenario) 282 283 mediainfo = FakeMediaDescriptor(extra_data, pipeline) 284 if not mediainfo.is_compatible(scenario): 285 continue 286 287 if self.test_manager.options.mute: 288 needs_clock = scenario.needs_clock_sync() \ 289 if scenario else False 290 audiosink = self.get_fakesink_for_media_type( 291 "audio", needs_clock) 292 videosink = self.get_fakesink_for_media_type( 293 "video", needs_clock) 294 else: 295 audiosink = 'autoaudiosink' 296 videosink = 'autovideosink' 297 298 pipeline_data.update({'videosink': videosink, 'audiosink': audiosink}) 299 pipeline_desc = pipeline % pipeline_data 300 301 fname = self.get_fname( 302 scenario, protocol=mediainfo.get_protocol(), name=description["name"]) 303 304 expected_issues = extra_data.get("expected-issues") 305 extra_env_vars = extra_data.get("extra_env_vars") 306 test = GstValidateLaunchTest(fname, 307 self.test_manager.options, 308 self.test_manager.reporter, 309 pipeline_desc, 310 scenario=scenario, 311 media_descriptor=mediainfo, 312 expected_issues=expected_issues, 313 extra_env_variables=extra_env_vars) 314 if extra_data.get('config_file'): 315 test.add_validate_config(extra_data['config_file']) 316 self.add_test(test) 317 318 319class GstValidatePlaybinTestsGenerator(GstValidatePipelineTestsGenerator): 320 321 def __init__(self, test_manager): 322 if os.getenv("USE_PLAYBIN3") is None: 323 GstValidatePipelineTestsGenerator.__init__( 324 self, "playback", test_manager, "playbin") 325 else: 326 GstValidatePipelineTestsGenerator.__init__( 327 self, "playback", test_manager, "playbin3") 328 329 def _set_sinks(self, minfo, pipe_str, scenario): 330 if self.test_manager.options.mute: 331 needs_clock = scenario.needs_clock_sync() or minfo.media_descriptor.need_clock_sync() 332 333 afakesink = self.get_fakesink_for_media_type("audio", needs_clock) 334 vfakesink = self.get_fakesink_for_media_type("video", needs_clock) 335 pipe_str += " audio-sink='%s' video-sink='%s'" % ( 336 afakesink, vfakesink) 337 338 return pipe_str 339 340 def _get_name(self, scenario, protocol, minfo): 341 return "%s.%s" % (self.get_fname(scenario, 342 protocol), 343 os.path.basename(minfo.media_descriptor.get_clean_name())) 344 345 def populate_tests(self, uri_minfo_special_scenarios, scenarios): 346 test_rtsp = GstValidateBaseTestManager.RTSP_SERVER_COMMAND 347 if not test_rtsp: 348 printc("\n\nRTSP server not available, you should make sure" 349 " that %s is available in your $PATH." % GstValidateBaseTestManager.RTSP_SERVER_COMMAND, 350 Colors.FAIL) 351 elif self.test_manager.options.disable_rtsp: 352 printc("\n\nRTSP tests are disabled") 353 test_rtsp = False 354 355 for uri, minfo, special_scenarios in uri_minfo_special_scenarios: 356 pipe = self._pipeline_template 357 protocol = minfo.media_descriptor.get_protocol() 358 359 if protocol == Protocols.RTSP: 360 self.debug("SKIPPING %s as it is a RTSP stream" % uri) 361 continue 362 363 pipe += " uri=%s" % uri 364 365 for scenario in special_scenarios + scenarios: 366 cpipe = pipe 367 if not minfo.media_descriptor.is_compatible(scenario): 368 continue 369 370 cpipe = self._set_sinks(minfo, cpipe, scenario) 371 fname = self._get_name(scenario, protocol, minfo) 372 373 self.debug("Adding: %s", fname) 374 375 if scenario.does_reverse_playback() and protocol == Protocols.HTTP: 376 # 10MB so we can reverse playback 377 cpipe += " ring-buffer-max-size=10485760" 378 379 self.add_test(GstValidateLaunchTest(fname, 380 self.test_manager.options, 381 self.test_manager.reporter, 382 cpipe, 383 scenario=scenario, 384 media_descriptor=minfo.media_descriptor) 385 ) 386 387 if test_rtsp and protocol == Protocols.FILE and not minfo.media_descriptor.is_image(): 388 rtspminfo = NamedDic({"path": minfo.media_descriptor.get_path(), 389 "media_descriptor": GstValidateRTSPMediaDesciptor(minfo.media_descriptor.get_path())}) 390 if not rtspminfo.media_descriptor.is_compatible(scenario): 391 continue 392 393 cpipe = self._set_sinks(rtspminfo, "%s uri=rtsp://127.0.0.1:<RTSPPORTNUMBER>/test" 394 % self._pipeline_template, scenario) 395 fname = self._get_name(scenario, Protocols.RTSP, rtspminfo) 396 397 self.add_test(GstValidateRTSPTest( 398 fname, self.test_manager.options, self.test_manager.reporter, 399 cpipe, uri, scenario=scenario, 400 media_descriptor=rtspminfo.media_descriptor)) 401 402 fname = self._get_name(scenario, Protocols.RTSP + '2', rtspminfo) 403 self.add_test(GstValidateRTSPTest( 404 fname, self.test_manager.options, self.test_manager.reporter, 405 cpipe, uri, scenario=scenario, 406 media_descriptor=rtspminfo.media_descriptor, 407 rtsp2=True)) 408 409 410class GstValidateMixerTestsGenerator(GstValidatePipelineTestsGenerator): 411 412 def __init__(self, name, test_manager, mixer, media_type, converter="", 413 num_sources=3, mixed_srcs=None, valid_scenarios=None): 414 mixed_srcs = mixed_srcs or {} 415 valid_scenarios = valid_scenarios or [] 416 417 pipe_template = "%(mixer)s name=_mixer ! " + \ 418 converter + " ! %(sink)s " 419 self.converter = converter 420 self.mixer = mixer 421 self.media_type = media_type 422 self.num_sources = num_sources 423 self.mixed_srcs = mixed_srcs 424 super( 425 GstValidateMixerTestsGenerator, self).__init__(name, test_manager, pipe_template, 426 valid_scenarios=valid_scenarios) 427 428 def populate_tests(self, uri_minfo_special_scenarios, scenarios): 429 if self.test_manager.options.validate_uris: 430 return 431 432 wanted_ressources = [] 433 for uri, minfo, special_scenarios in uri_minfo_special_scenarios: 434 protocol = minfo.media_descriptor.get_protocol() 435 if protocol == Protocols.FILE and \ 436 minfo.media_descriptor.get_num_tracks(self.media_type) > 0: 437 wanted_ressources.append((uri, minfo)) 438 439 if not self.mixed_srcs: 440 if not wanted_ressources: 441 return 442 443 for i in range(len(uri_minfo_special_scenarios) / self.num_sources): 444 srcs = [] 445 name = "" 446 for nsource in range(self.num_sources): 447 uri, minfo = wanted_ressources[i + nsource] 448 if os.getenv("USE_PLAYBIN3") is None: 449 srcs.append( 450 "uridecodebin uri=%s ! %s" % (uri, self.converter)) 451 else: 452 srcs.append( 453 "uridecodebin3 uri=%s ! %s" % (uri, self.converter)) 454 fname = os.path.basename(uri).replace(".", "_") 455 if not name: 456 name = fname 457 else: 458 name += "+%s" % fname 459 460 self.mixed_srcs[name] = tuple(srcs) 461 462 for name, srcs in self.mixed_srcs.items(): 463 if isinstance(srcs, dict): 464 pipe_arguments = { 465 "mixer": self.mixer + " %s" % srcs["mixer_props"]} 466 srcs = srcs["sources"] 467 else: 468 pipe_arguments = {"mixer": self.mixer} 469 470 for scenario in scenarios: 471 fname = self.get_fname(scenario, Protocols.FILE) + "." 472 fname += name 473 474 self.debug("Adding: %s", fname) 475 476 if self.test_manager.options.mute: 477 pipe_arguments["sink"] = self.get_fakesink_for_media_type(self.media_type, 478 scenario.needs_clock_sync()) 479 else: 480 pipe_arguments["sink"] = "auto%ssink" % self.media_type 481 482 pipe = self._pipeline_template % pipe_arguments 483 484 for src in srcs: 485 pipe += "%s ! _mixer. " % src 486 487 self.add_test(GstValidateLaunchTest(fname, 488 self.test_manager.options, 489 self.test_manager.reporter, 490 pipe, 491 scenario=scenario) 492 ) 493 494 495class GstValidateLaunchTest(GstValidateTest): 496 497 def __init__(self, classname, options, reporter, pipeline_desc, 498 timeout=DEFAULT_TIMEOUT, scenario=None, 499 media_descriptor=None, duration=0, hard_timeout=None, 500 extra_env_variables=None, expected_issues=None): 501 502 extra_env_variables = extra_env_variables or {} 503 504 if scenario: 505 duration = scenario.get_duration() 506 elif media_descriptor: 507 duration = media_descriptor.get_duration() / GST_SECOND 508 509 super( 510 GstValidateLaunchTest, self).__init__(GstValidateBaseTestManager.COMMAND, 511 classname, 512 options, reporter, 513 duration=duration, 514 scenario=scenario, 515 timeout=timeout, 516 hard_timeout=hard_timeout, 517 media_descriptor=media_descriptor, 518 extra_env_variables=extra_env_variables, 519 expected_issues=expected_issues) 520 521 self.pipeline_desc = pipeline_desc 522 self.media_descriptor = media_descriptor 523 524 def build_arguments(self): 525 GstValidateTest.build_arguments(self) 526 self.add_arguments(*shlex.split(self.pipeline_desc)) 527 if self.media_descriptor is not None and self.media_descriptor.get_path(): 528 self.add_arguments( 529 "--set-media-info", self.media_descriptor.get_path()) 530 531 532class GstValidateMediaCheckTest(GstValidateTest): 533 534 def __init__(self, classname, options, reporter, media_descriptor, 535 uri, minfo_path, timeout=DEFAULT_TIMEOUT, 536 extra_env_variables=None, 537 expected_issues=None): 538 extra_env_variables = extra_env_variables or {} 539 540 super( 541 GstValidateMediaCheckTest, self).__init__(GstValidateBaseTestManager.MEDIA_CHECK_COMMAND, classname, 542 options, reporter, 543 timeout=timeout, 544 media_descriptor=media_descriptor, 545 extra_env_variables=extra_env_variables, 546 expected_issues=expected_issues) 547 self._uri = uri 548 self._media_info_path = minfo_path 549 550 def build_arguments(self): 551 Test.build_arguments(self) 552 self.add_arguments(self._uri, "--expected-results", 553 self._media_info_path) 554 555 if self.media_descriptor.skip_parsers(): 556 self.add_arguments("--skip-parsers") 557 558 559class GstValidateTranscodingTest(GstValidateTest, GstValidateEncodingTestInterface): 560 scenarios_manager = ScenarioManager() 561 562 def __init__(self, classname, options, reporter, 563 combination, uri, media_descriptor, 564 timeout=DEFAULT_TIMEOUT, 565 scenario=None, 566 extra_env_variables=None, 567 expected_issues=None): 568 Loggable.__init__(self) 569 570 extra_env_variables = extra_env_variables or {} 571 572 file_dur = int(media_descriptor.get_duration()) / GST_SECOND 573 if not media_descriptor.get_num_tracks("video"): 574 self.debug("%s audio only file applying transcoding ratio." 575 "File 'duration' : %s" % (classname, file_dur)) 576 duration = file_dur / AUDIO_ONLY_FILE_TRANSCODING_RATIO 577 else: 578 duration = file_dur 579 580 super( 581 GstValidateTranscodingTest, self).__init__(GstValidateBaseTestManager.TRANSCODING_COMMAND, 582 classname, 583 options, 584 reporter, 585 duration=duration, 586 timeout=timeout, 587 scenario=scenario, 588 media_descriptor=media_descriptor, 589 extra_env_variables=None, 590 expected_issues=expected_issues) 591 extra_env_variables = extra_env_variables or {} 592 593 GstValidateEncodingTestInterface.__init__( 594 self, combination, media_descriptor) 595 596 self.uri = uri 597 598 def run_external_checks(self): 599 if self.media_descriptor.get_num_tracks("video") == 1 and \ 600 self.options.validate_enable_iqa_tests: 601 self.run_iqa_test(self.uri) 602 603 def set_rendering_info(self): 604 self.dest_file = os.path.join(self.options.dest, 605 self.classname.replace(".transcode.", os.sep). 606 replace(".", os.sep)) 607 mkdir(os.path.dirname(urllib.parse.urlsplit(self.dest_file).path)) 608 if urllib.parse.urlparse(self.dest_file).scheme == "": 609 self.dest_file = path2url(self.dest_file) 610 611 profile = self.get_profile() 612 self.add_arguments("-o", profile) 613 614 def build_arguments(self): 615 GstValidateTest.build_arguments(self) 616 self.set_rendering_info() 617 self.add_arguments(self.uri, self.dest_file) 618 619 def get_current_value(self): 620 if self.scenario: 621 sent_eos = self.sent_eos_position() 622 if sent_eos is not None: 623 t = time.time() 624 if ((t - sent_eos)) > 30: 625 if self.media_descriptor.get_protocol() == Protocols.HLS: 626 self.set_result(Result.PASSED, 627 """Got no EOS 30 seconds after sending EOS, 628 in HLS known and tolerated issue: 629 https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/issues/132""") 630 return Result.KNOWN_ERROR 631 632 self.set_result( 633 Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS") 634 635 return Result.FAILED 636 637 size = self.get_current_size() 638 if size is None: 639 return self.get_current_position() 640 641 return size 642 643 def check_results(self): 644 if self.result in [Result.FAILED, Result.TIMEOUT] or \ 645 self.process.returncode != 0: 646 GstValidateTest.check_results(self) 647 return 648 649 res, msg = self.check_encoded_file() 650 self.set_result(res, msg) 651 652 653class GstValidateBaseRTSPTest: 654 """ Interface for RTSP tests, requires implementing Test""" 655 __used_ports = set() 656 657 def __init__(self, local_uri): 658 self._local_uri = local_uri 659 self.rtsp_server = None 660 self._unsetport_pipeline_desc = None 661 self.optional = True 662 663 @classmethod 664 def __get_open_port(cls): 665 while True: 666 # hackish trick from 667 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python?answertab=votes#tab-top 668 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 669 s.bind(("", 0)) 670 port = s.getsockname()[1] 671 if port not in cls.__used_ports: 672 cls.__used_ports.add(port) 673 s.close() 674 return port 675 676 s.close() 677 678 def launch_server(self): 679 if self.options.redirect_logs == 'stdout': 680 self.rtspserver_logs = sys.stdout 681 elif self.options.redirect_logs == 'stderr': 682 self.rtspserver_logs = sys.stderr 683 684 self.server_port = self.__get_open_port() 685 command = [GstValidateBaseTestManager.RTSP_SERVER_COMMAND, self._local_uri, '--port', str(self.server_port)] 686 687 if self.options.validate_gdb_server: 688 command = self.use_gdb(command) 689 self.rtspserver_logs = sys.stdout 690 elif self.options.redirect_logs: 691 self.rtspserver_logs = sys.stdout 692 else: 693 self.rtspserver_logs = open(self.logfile + '_rtspserver.log', 'w+') 694 self.extra_logfiles.add(self.rtspserver_logs.name) 695 696 server_env = os.environ.copy() 697 698 self.rtsp_server = subprocess.Popen(command, 699 stderr=self.rtspserver_logs, 700 stdout=self.rtspserver_logs, 701 env=server_env) 702 while True: 703 s = socket.socket() 704 try: 705 s.connect((("127.0.0.1", self.server_port))) 706 break 707 except ConnectionRefusedError: 708 time.sleep(0.1) 709 continue 710 finally: 711 s.close() 712 713 if not self._unsetport_pipeline_desc: 714 self._unsetport_pipeline_desc = self.pipeline_desc 715 716 self.pipeline_desc = self._unsetport_pipeline_desc.replace( 717 "<RTSPPORTNUMBER>", str(self.server_port)) 718 719 return ' '.join(command) 720 721 def close_logfile(self): 722 super().close_logfile() 723 if not self.options.redirect_logs: 724 self.rtspserver_logs.close() 725 726 def process_update(self): 727 res = super().process_update() 728 if res: 729 kill_subprocess(self, self.rtsp_server, DEFAULT_TIMEOUT) 730 self.__used_ports.remove(self.server_port) 731 732 return res 733 734 735class GstValidateRTSPTest(GstValidateBaseRTSPTest, GstValidateLaunchTest): 736 737 def __init__(self, classname, options, reporter, pipeline_desc, 738 local_uri, timeout=DEFAULT_TIMEOUT, scenario=None, 739 media_descriptor=None, rtsp2=False): 740 GstValidateLaunchTest.__init__(self, classname, options, reporter, 741 pipeline_desc, timeout, scenario, 742 media_descriptor) 743 GstValidateBaseRTSPTest.__init__(self, local_uri) 744 self.rtsp2 = rtsp2 745 746 def get_subproc_env(self): 747 env = super().get_subproc_env() 748 path = env.get('GST_VALIDATE_SCENARIOS_PATH', '') 749 override_dir = get_data_file(os.path.join('data', 'scenarios'), 'rtsp_overrides') 750 env['GST_VALIDATE_SCENARIOS_PATH'] = '%s:%s' % (override_dir, path) 751 if self.rtsp2: 752 env['GST_VALIDATE_SCENARIO'] = env.get('GST_VALIDATE_SCENARIO', '') + ':' + 'force_rtsp2' 753 754 return env 755 756 757class GstValidateRTSPMediaDesciptor(GstValidateMediaDescriptor): 758 759 def __init__(self, xml_path): 760 GstValidateMediaDescriptor.__init__(self, xml_path) 761 762 def get_uri(self): 763 return "rtsp://127.0.0.1:8554/test" 764 765 def get_protocol(self): 766 return Protocols.RTSP 767 768 def prerrols(self): 769 return False 770 771 772class GstValidateTestManager(GstValidateBaseTestManager): 773 774 name = "validate" 775 776 # List of all classes to create testsuites 777 GstValidateMediaCheckTestsGenerator = GstValidateMediaCheckTestsGenerator 778 GstValidateTranscodingTestsGenerator = GstValidateTranscodingTestsGenerator 779 GstValidatePipelineTestsGenerator = GstValidatePipelineTestsGenerator 780 GstValidatePlaybinTestsGenerator = GstValidatePlaybinTestsGenerator 781 GstValidateMixerTestsGenerator = GstValidateMixerTestsGenerator 782 GstValidateLaunchTest = GstValidateLaunchTest 783 GstValidateMediaCheckTest = GstValidateMediaCheckTest 784 GstValidateTranscodingTest = GstValidateTranscodingTest 785 786 def __init__(self): 787 super(GstValidateTestManager, self).__init__() 788 self._uris = [] 789 self._run_defaults = True 790 self._is_populated = False 791 self._default_generators_registered = False 792 793 def init(self): 794 for command, name in [ 795 (GstValidateBaseTestManager.TRANSCODING_COMMAND, "gst-validate-transcoding-1.0"), 796 (GstValidateBaseTestManager.COMMAND, "gst-validate-1.0"), 797 (GstValidateBaseTestManager.MEDIA_CHECK_COMMAND, "gst-validate-media-check-1.0")]: 798 if not command: 799 self.error("command not found: %s" % name) 800 return False 801 802 return True 803 804 def add_options(self, parser): 805 group = parser.add_argument_group("GstValidate tools specific options" 806 " and behaviours", 807 description="""When using --wanted-tests, all the scenarios can be used, even those which have 808not been tested and explicitely activated if you set use --wanted-tests ALL""") 809 group.add_argument("--validate-check-uri", dest="validate_uris", 810 action="append", help="defines the uris to run default tests on") 811 group.add_argument("--validate-tools-path", dest="validate_tools_path", 812 action="append", help="defines the paths to look for GstValidate tools.") 813 group.add_argument("--validate-gdb-server", dest="validate_gdb_server", 814 help="Run the server in GDB.") 815 group.add_argument("--validate-disable-rtsp", dest="disable_rtsp", 816 help="Disable RTSP tests.") 817 group.add_argument("--validate-enable-iqa-tests", dest="validate_enable_iqa_tests", 818 help="Enable Image Quality Assessment validation tests.", 819 default=False, action='store_true') 820 821 def print_valgrind_bugs(self): 822 # Look for all the 'pending' bugs in our supp file 823 bugs = [] 824 p = get_data_file('data', 'gstvalidate.supp') 825 with open(p) as f: 826 for line in f.readlines(): 827 line = line.strip() 828 if line.startswith('# PENDING:'): 829 tmp = line.split(' ') 830 bugs.append(tmp[2]) 831 832 if bugs: 833 msg = "Ignored valgrind bugs:\n" 834 for b in bugs: 835 msg += " + %s\n" % b 836 printc(msg, Colors.FAIL, True) 837 838 def populate_testsuite(self): 839 840 if self._is_populated is True: 841 return 842 843 if not self.options.config and not self.options.testsuites: 844 if self._run_defaults: 845 self.register_defaults() 846 else: 847 self.register_all() 848 849 self._is_populated = True 850 851 def list_tests(self): 852 if self.tests: 853 return self.tests 854 855 if self._run_defaults: 856 scenarios = [self.scenarios_manager.get_scenario(scenario_name) 857 for scenario_name in self.get_scenarios()] 858 else: 859 scenarios = self.scenarios_manager.get_scenario(None) 860 uris = self._list_uris() 861 862 for generator in self.get_generators(): 863 for test in generator.generate_tests(uris, scenarios): 864 self.add_test(test) 865 866 if not self.tests and not uris and not self.options.wanted_tests: 867 self.info("No valid uris present in the path. Check if media files and info files exist") 868 869 return self.tests 870 871 def _add_media(self, media_info, uri=None): 872 self.debug("Checking %s", media_info) 873 if isinstance(media_info, GstValidateMediaDescriptor): 874 media_descriptor = media_info 875 media_info = media_descriptor.get_path() 876 else: 877 media_descriptor = GstValidateMediaDescriptor(media_info) 878 879 try: 880 # Just testing that the vairous mandatory infos are present 881 caps = media_descriptor.get_caps() 882 if uri is None: 883 uri = media_descriptor.get_uri() 884 885 # Adjust local http uri 886 if self.options.http_server_port != 8079 and \ 887 uri.startswith("http://127.0.0.1:8079/"): 888 uri = uri.replace("http://127.0.0.1:8079/", 889 "http://127.0.0.1:%r/" % self.options.http_server_port, 1) 890 media_descriptor.set_protocol(urllib.parse.urlparse(uri).scheme) 891 for caps2, prot in GST_VALIDATE_CAPS_TO_PROTOCOL: 892 if caps2 == caps: 893 media_descriptor.set_protocol(prot) 894 break 895 896 scenario_bname = media_descriptor.get_media_filepath() 897 special_scenarios = self.scenarios_manager.find_special_scenarios( 898 scenario_bname) 899 self._uris.append((uri, 900 NamedDic({"path": media_info, 901 "media_descriptor": media_descriptor}), 902 special_scenarios)) 903 except configparser.NoOptionError as e: 904 self.debug("Exception: %s for %s", e, media_info) 905 906 def _discover_file(self, uri, fpath): 907 for ext in (GstValidateMediaDescriptor.MEDIA_INFO_EXT, 908 GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT): 909 try: 910 is_push = False 911 media_info = "%s.%s" % (fpath, ext) 912 if ext == GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT: 913 if not os.path.exists(media_info): 914 continue 915 is_push = True 916 uri = "push" + uri 917 args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ") 918 919 args.append(uri) 920 if os.path.isfile(media_info) and not self.options.update_media_info: 921 self._add_media(media_info, uri) 922 continue 923 elif fpath.endswith(GstValidateMediaDescriptor.STREAM_INFO_EXT): 924 self._add_media(fpath) 925 continue 926 elif not self.options.generate_info and not self.options.update_media_info and not self.options.validate_uris: 927 continue 928 elif self.options.update_media_info and not os.path.isfile(media_info): 929 self.info( 930 "%s not present. Use --generate-media-info", media_info) 931 continue 932 elif os.path.islink(media_info): 933 self.info( 934 "%s is a symlink, not updating and hopefully the actual file gets updated!", media_info) 935 continue 936 937 include_frames = 0 938 if self.options.update_media_info: 939 include_frames = 2 940 elif self.options.generate_info_full: 941 include_frames = 1 942 943 media_descriptor = GstValidateMediaDescriptor.new_from_uri( 944 uri, True, include_frames, is_push) 945 if media_descriptor: 946 self._add_media(media_descriptor, uri) 947 else: 948 self.warning("Could not get any descriptor for %s" % uri) 949 950 except subprocess.CalledProcessError as e: 951 if self.options.generate_info: 952 printc("Result: Failed", Colors.FAIL) 953 else: 954 self.error("Exception: %s", e) 955 return False 956 return True 957 958 def _list_uris(self): 959 if self._uris: 960 return self._uris 961 962 if self.options.validate_uris: 963 for uri in self.options.validate_uris: 964 self._discover_file(uri, uri) 965 return self._uris 966 967 if not self.args: 968 if isinstance(self.options.paths, str): 969 self.options.paths = [os.path.join(self.options.paths)] 970 971 for path in self.options.paths: 972 if os.path.isfile(path): 973 path = os.path.abspath(path) 974 self._discover_file(path2url(path), path) 975 else: 976 for root, dirs, files in os.walk(path): 977 for f in files: 978 fpath = os.path.abspath(os.path.join(root, f)) 979 if os.path.isdir(fpath) or \ 980 fpath.endswith(GstValidateMediaDescriptor.MEDIA_INFO_EXT) or\ 981 fpath.endswith(ScenarioManager.FILE_EXTENSION): 982 continue 983 else: 984 self._discover_file(path2url(fpath), fpath) 985 986 self.debug("Uris found: %s", self._uris) 987 988 return self._uris 989 990 def needs_http_server(self): 991 for test in self.list_tests(): 992 if self._is_test_wanted(test) and test.media_descriptor is not None: 993 protocol = test.media_descriptor.get_protocol() 994 uri = test.media_descriptor.get_uri() 995 996 if protocol in [Protocols.HTTP, Protocols.HLS, Protocols.DASH] and \ 997 ("127.0.0.1:%s" % ( 998 self.options.http_server_port) in uri or "127.0.0.1:8079" in uri): 999 return True 1000 return False 1001 1002 def set_settings(self, options, args, reporter): 1003 if options.wanted_tests: 1004 for i in range(len(options.wanted_tests)): 1005 if "ALL" in options.wanted_tests[i]: 1006 self._run_defaults = False 1007 options.wanted_tests[ 1008 i] = options.wanted_tests[i].replace("ALL", "") 1009 try: 1010 options.wanted_tests.remove("") 1011 except ValueError: 1012 pass 1013 1014 if options.validate_uris: 1015 self.check_testslist = False 1016 1017 super(GstValidateTestManager, self).set_settings( 1018 options, args, reporter) 1019 1020 def register_defaults(self): 1021 """ 1022 Registers the defaults: 1023 * Scenarios to be used 1024 * Encoding formats to be used 1025 * Blacklisted tests 1026 * Test generators 1027 """ 1028 printc("-> Registering default 'validate' tests... ", end='') 1029 self.register_default_scenarios() 1030 self.register_default_encoding_formats() 1031 self.register_default_blacklist() 1032 self.register_default_test_generators() 1033 printc("OK", Colors.OKGREEN) 1034 1035 def register_default_scenarios(self): 1036 """ 1037 Registers default test scenarios 1038 """ 1039 if self.options.long_limit != 0: 1040 self.add_scenarios([ 1041 "play_15s", 1042 "reverse_playback", 1043 "fast_forward", 1044 "seek_forward", 1045 "seek_backward", 1046 "seek_with_stop", 1047 "switch_audio_track", 1048 "switch_audio_track_while_paused", 1049 "switch_subtitle_track", 1050 "switch_subtitle_track_while_paused", 1051 "disable_subtitle_track_while_paused", 1052 "change_state_intensive", 1053 "scrub_forward_seeking"]) 1054 else: 1055 self.add_scenarios([ 1056 "play_15s", 1057 "reverse_playback", 1058 "fast_forward", 1059 "seek_forward", 1060 "seek_backward", 1061 "seek_with_stop", 1062 "switch_audio_track", 1063 "switch_audio_track_while_paused", 1064 "switch_subtitle_track", 1065 "switch_subtitle_track_while_paused", 1066 "disable_subtitle_track_while_paused", 1067 "change_state_intensive", 1068 "scrub_forward_seeking"]) 1069 1070 def register_default_encoding_formats(self): 1071 """ 1072 Registers default encoding formats 1073 """ 1074 self.add_encoding_formats([ 1075 MediaFormatCombination("ogg", "vorbis", "theora"), 1076 MediaFormatCombination("webm", "vorbis", "vp8"), 1077 MediaFormatCombination("mp4", "mp3", "h264"), 1078 MediaFormatCombination("mkv", "vorbis", "h264"), 1079 ]) 1080 1081 def register_default_blacklist(self): 1082 self.set_default_blacklist([ 1083 # testbin known issues 1084 ("testbin.media_check.*", 1085 "Not supported by GstDiscoverer."), 1086 1087 # dash known issues 1088 ("dash.media_check.*", 1089 "Caps are different depending on selected bitrates, etc"), 1090 1091 # Matroska/WEBM known issues: 1092 ("*.reverse_playback.*webm$", 1093 "https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/issues/65"), 1094 ("*.reverse_playback.*mkv$", 1095 "https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/issues/65"), 1096 ("http.playback.seek_with_stop.*webm", 1097 "matroskademux.gst_matroska_demux_handle_seek_push: Seek end-time not supported in streaming mode"), 1098 ("http.playback.seek_with_stop.*mkv", 1099 "matroskademux.gst_matroska_demux_handle_seek_push: Seek end-time not supported in streaming mode"), 1100 1101 # MPEG TS known issues: 1102 ('(?i)*playback.reverse_playback.*(?:_|.)(?:|m)ts$', 1103 "https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/issues/97"), 1104 1105 # Fragmented MP4 disabled tests: 1106 ('*.playback..*seek.*.fragmented_nonseekable_sink_mp4', 1107 "Seeking on fragmented files without indexes isn't implemented"), 1108 ('*.playback.reverse_playback.fragmented_nonseekable_sink_mp4', 1109 "Seeking on fragmented files without indexes isn't implemented"), 1110 1111 # HTTP known issues: 1112 ("http.*scrub_forward_seeking.*", 1113 "This is not stable enough for now."), 1114 ("http.playback.change_state_intensive.raw_video_mov", 1115 "This is not stable enough for now. (flow return from pad push doesn't match expected value)"), 1116 1117 # MXF known issues" 1118 ("*reverse_playback.*mxf", 1119 "Reverse playback is not handled in MXF"), 1120 ("file\.transcode.*mxf", 1121 "FIXME: Transcoding and mixing tests need to be tested"), 1122 1123 # WMV known issues" 1124 ("*reverse_playback.*wmv", 1125 "Reverse playback is not handled in wmv"), 1126 (".*reverse_playback.*asf", 1127 "Reverse playback is not handled in asf"), 1128 1129 # ogg known issues 1130 ("http.playback.seek.*vorbis_theora_1_ogg", 1131 "https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/issues/281"), 1132 # RTSP known issues 1133 ('rtsp.*playback.reverse.*', 1134 'https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/issues/32'), 1135 ('rtsp.*playback.seek_with_stop.*', 1136 'https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/issues/386'), 1137 ('rtsp.*playback.fast_*', 1138 'https://gitlab.freedesktop.org/gstreamer/gst-rtsp-server/issues/14'), 1139 ]) 1140 1141 def register_default_test_generators(self): 1142 """ 1143 Registers default test generators 1144 """ 1145 if self._default_generators_registered: 1146 return 1147 1148 self.add_generators([GstValidatePlaybinTestsGenerator(self), 1149 GstValidateMediaCheckTestsGenerator(self), 1150 GstValidateTranscodingTestsGenerator(self)]) 1151 self._default_generators_registered = True 1152