1#!/usr/bin/env python3
2#
3# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
4#
5# This program is free software; you can redistribute it and/or
6# modify it under the terms of the GNU Lesser General Public
7# License as published by the Free Software Foundation; either
8# version 2.1 of the License, or (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13# Lesser General Public License for more details.
14#
15# You should have received a copy of the GNU Lesser General Public
16# License along with this program; if not, write to the
17# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18# Boston, MA 02110-1301, USA.
19import argparse
20import os
21import copy
22import sys
23import time
24import urllib.parse
25import shlex
26import socket
27import subprocess
28import configparser
29import json
30from launcher.loggable import Loggable
31
32from launcher.baseclasses import GstValidateTest, Test, \
33    ScenarioManager, NamedDic, GstValidateTestsGenerator, \
34    GstValidateMediaDescriptor, GstValidateEncodingTestInterface, \
35    GstValidateBaseTestManager, MediaDescriptor, MediaFormatCombination
36
37from launcher.utils import path2url, url2path, DEFAULT_TIMEOUT, which, \
38    GST_SECOND, Result, Protocols, mkdir, printc, Colors, get_data_file, \
39    kill_subprocess, format_config_template
40
41#
42# Private global variables     #
43#
44
45# definitions of commands to use
46parser = argparse.ArgumentParser(add_help=False)
47parser.add_argument("--validate-tools-path", dest="validate_tools_path",
48                    default="",
49                    help="defines the paths to look for GstValidate tools.")
50options, args = parser.parse_known_args()
51
52GstValidateBaseTestManager.update_commands(options.validate_tools_path)
53AUDIO_ONLY_FILE_TRANSCODING_RATIO = 5
54
55#
56# API to be used to create testsuites     #
57#
58
59"""
60Some info about protocols and how to handle them
61"""
62GST_VALIDATE_CAPS_TO_PROTOCOL = [("application/x-hls", Protocols.HLS),
63                                 ("application/dash+xml", Protocols.DASH)]
64
65
66class GstValidateMediaCheckTestsGenerator(GstValidateTestsGenerator):
67
68    def __init__(self, test_manager):
69        GstValidateTestsGenerator.__init__(self, "media_check", test_manager)
70
71    def populate_tests(self, uri_minfo_special_scenarios, scenarios):
72        for uri, mediainfo, special_scenarios in uri_minfo_special_scenarios:
73            protocol = mediainfo.media_descriptor.get_protocol()
74            timeout = DEFAULT_TIMEOUT
75
76            classname = "%s.media_check.%s" % (protocol,
77                                               os.path.basename(url2path(uri)).replace(".", "_"))
78            self.add_test(GstValidateMediaCheckTest(classname,
79                                                    self.test_manager.options,
80                                                    self.test_manager.reporter,
81                                                    mediainfo.media_descriptor,
82                                                    uri,
83                                                    mediainfo.path,
84                                                    timeout=timeout))
85
86
87class GstValidateTranscodingTestsGenerator(GstValidateTestsGenerator):
88
89    def __init__(self, test_manager):
90        GstValidateTestsGenerator.__init__(self, "transcode", test_manager)
91
92    def populate_tests(self, uri_minfo_special_scenarios, scenarios):
93        for uri, mediainfo, special_scenarios in uri_minfo_special_scenarios:
94            if mediainfo.media_descriptor.is_image():
95                continue
96
97            protocol = mediainfo.media_descriptor.get_protocol()
98            if protocol == Protocols.RTSP:
99                continue
100
101            for comb in self.test_manager.get_encoding_formats():
102                classname = "%s.transcode.to_%s.%s" % (mediainfo.media_descriptor.get_protocol(),
103                                                       str(comb).replace(
104                                                       ' ', '_'),
105                                                       mediainfo.media_descriptor.get_clean_name())
106                self.add_test(GstValidateTranscodingTest(classname,
107                                                         self.test_manager.options,
108                                                         self.test_manager.reporter,
109                                                         comb,
110                                                         uri,
111                                                         mediainfo.media_descriptor))
112
113
114class FakeMediaDescriptor(MediaDescriptor):
115
116    def __init__(self, infos, pipeline_desc):
117        MediaDescriptor.__init__(self)
118        self._infos = infos
119        self._pipeline_desc = pipeline_desc
120
121    def get_path(self):
122        return self._infos.get('path', None)
123
124    def get_media_filepath(self):
125        return self._infos.get('media-filepath', None)
126
127    def get_caps(self):
128        return self._infos.get('caps', None)
129
130    def get_uri(self):
131        return self._infos.get('uri', None)
132
133    def get_duration(self):
134        return int(self._infos.get('duration', 0)) * GST_SECOND
135
136    def get_protocol(self):
137        return self._infos.get('protocol', "launch_pipeline")
138
139    def is_seekable(self):
140        return self._infos.get('is-seekable', True)
141
142    def is_image(self):
143        return self._infos.get('is-image', False)
144
145    def is_live(self):
146        return self._infos.get('is-live', False)
147
148    def get_num_tracks(self, track_type):
149        return self._infos.get('num-%s-tracks' % track_type,
150                               self._pipeline_desc.count(track_type + "sink"))
151
152    def can_play_reverse(self):
153        return self._infos.get('plays-reverse', False)
154
155
156class GstValidatePipelineTestsGenerator(GstValidateTestsGenerator):
157
158    def __init__(self, name, test_manager, pipeline_template=None,
159                 pipelines_descriptions=None, valid_scenarios=None):
160        """
161        @name: The name of the generator
162        @pipeline_template: A template pipeline to be used to generate actual pipelines
163        @pipelines_descriptions: A list of tuple of the form:
164                                 (test_name, pipeline_description, extra_data)
165                                 extra_data being a dictionnary with the follwing keys:
166                                    'scenarios': ["the", "valide", "scenarios", "names"]
167                                    'duration': the_duration # in seconds
168                                    'timeout': a_timeout # in seconds
169                                    'hard_timeout': a_hard_timeout # in seconds
170
171        @valid_scenarios: A list of scenario name that can be used with that generator
172        """
173        valid_scenarios = valid_scenarios or []
174        GstValidateTestsGenerator.__init__(self, name, test_manager)
175        self._pipeline_template = pipeline_template
176        self._pipelines_descriptions = []
177        for description in pipelines_descriptions or []:
178            if not isinstance(description, dict):
179                desc_dict = {"name": description[0],
180                     "pipeline": description[1]}
181                if len(description) >= 3:
182                    desc_dict["extra_data"] = description[2]
183                self._pipelines_descriptions.append(desc_dict)
184            else:
185                self._pipelines_descriptions.append(description)
186        self._valid_scenarios = valid_scenarios
187
188    @classmethod
189    def from_json(cls, test_manager, json_file, extra_data=None):
190        """
191        :param json_file: Path to a JSON file containing pipeline tests.
192        :param extra_data: Variables available for interpolation in validate
193        configs and scenario actions.
194        """
195        if extra_data is None:
196            extra_data = {}
197        with open(json_file, 'r') as f:
198            descriptions = json.load(f)
199
200        name = os.path.basename(json_file).replace('.json', '')
201        pipelines_descriptions = []
202        for test_name, defs in descriptions.items():
203            tests_definition = {'name': test_name, 'pipeline': defs['pipeline']}
204            test_private_dir = os.path.join(test_manager.options.privatedir,
205                                            name, test_name)
206
207            config_file = None
208            if 'config' in defs:
209                os.makedirs(test_private_dir, exist_ok=True)
210                config_file = os.path.join(test_private_dir,
211                                           test_name + '.config')
212                with open(config_file, 'w') as f:
213                    f.write(format_config_template(extra_data,
214                            '\n'.join(defs['config']) + '\n', test_name))
215
216            scenarios = []
217            for scenario in defs.get('scenarios', []):
218                if isinstance(scenario, str):
219                    # Path to a scenario file
220                    scenarios.append(scenario)
221                else:
222                    # Dictionary defining a new scenario in-line
223                    scenario_name = scenario_file = scenario['name']
224                    actions = scenario.get('actions')
225                    if actions:
226                        os.makedirs(test_private_dir, exist_ok=True)
227                        scenario_file = os.path.join(
228                            test_private_dir, scenario_name + '.scenario')
229                        with open(scenario_file, 'w') as f:
230                            f.write('\n'.join(action % extra_data for action in actions) + '\n')
231                    scenarios.append(scenario_file)
232            tests_definition['extra_data'] = {'scenarios': scenarios, 'config_file': config_file}
233            tests_definition['pipeline_data'] = {"config_path": os.path.dirname(json_file)}
234            tests_definition['pipeline_data'].update(extra_data)
235            pipelines_descriptions.append(tests_definition)
236
237        return GstValidatePipelineTestsGenerator(name, test_manager, pipelines_descriptions=pipelines_descriptions)
238
239    def get_fname(self, scenario, protocol=None, name=None):
240        if name is None:
241            name = self.name
242
243        if protocol is not None:
244            protocol_str = "%s." % protocol
245        else:
246            protocol_str = ""
247
248        if scenario is not None and scenario.name.lower() != "none":
249            return "%s%s.%s" % (protocol_str, name, scenario.name)
250
251        return ("%s.%s.%s" % (protocol_str, self.name, name)).replace("..", ".")
252
253    def generate_tests(self, uri_minfo_special_scenarios, scenarios):
254        if self._valid_scenarios is None:
255            scenarios = [None]
256        elif self._valid_scenarios:
257            scenarios = [scenario for scenario in scenarios if
258                         scenario is not None and scenario.name in self._valid_scenarios]
259
260        return super(GstValidatePipelineTestsGenerator, self).generate_tests(
261            uri_minfo_special_scenarios, scenarios)
262
263    def populate_tests(self, uri_minfo_special_scenarios, scenarios):
264        for description in self._pipelines_descriptions:
265            pipeline = description['pipeline']
266            extra_data = description.get('extra_data', {})
267            pipeline_data = description.get('pipeline_data', {})
268
269            if 'scenarios' in extra_data:
270                # A pipeline description can override the default scenario set.
271                # The pipeline description may specify an empty list of
272                # scenarios, in which case one test will be generated with no
273                # scenario.
274                scenarios_to_iterate = extra_data['scenarios'] or [None]
275            else:
276                scenarios_to_iterate = scenarios
277
278            for scenario in scenarios_to_iterate:
279                if isinstance(scenario, str):
280                    scenario = self.test_manager.scenarios_manager.get_scenario(
281                        scenario)
282
283                mediainfo = FakeMediaDescriptor(extra_data, pipeline)
284                if not mediainfo.is_compatible(scenario):
285                    continue
286
287                if self.test_manager.options.mute:
288                    needs_clock = scenario.needs_clock_sync() \
289                        if scenario else False
290                    audiosink = self.get_fakesink_for_media_type(
291                        "audio", needs_clock)
292                    videosink = self.get_fakesink_for_media_type(
293                        "video", needs_clock)
294                else:
295                    audiosink = 'autoaudiosink'
296                    videosink = 'autovideosink'
297
298                pipeline_data.update({'videosink': videosink, 'audiosink': audiosink})
299                pipeline_desc = pipeline % pipeline_data
300
301                fname = self.get_fname(
302                    scenario, protocol=mediainfo.get_protocol(), name=description["name"])
303
304                expected_issues = extra_data.get("expected-issues")
305                extra_env_vars = extra_data.get("extra_env_vars")
306                test = GstValidateLaunchTest(fname,
307                                             self.test_manager.options,
308                                             self.test_manager.reporter,
309                                             pipeline_desc,
310                                             scenario=scenario,
311                                             media_descriptor=mediainfo,
312                                             expected_issues=expected_issues,
313                                             extra_env_variables=extra_env_vars)
314                if extra_data.get('config_file'):
315                    test.add_validate_config(extra_data['config_file'])
316                self.add_test(test)
317
318
319class GstValidatePlaybinTestsGenerator(GstValidatePipelineTestsGenerator):
320
321    def __init__(self, test_manager):
322        if os.getenv("USE_PLAYBIN3") is None:
323            GstValidatePipelineTestsGenerator.__init__(
324                self, "playback", test_manager, "playbin")
325        else:
326            GstValidatePipelineTestsGenerator.__init__(
327                self, "playback", test_manager, "playbin3")
328
329    def _set_sinks(self, minfo, pipe_str, scenario):
330        if self.test_manager.options.mute:
331            needs_clock = scenario.needs_clock_sync() or minfo.media_descriptor.need_clock_sync()
332
333            afakesink = self.get_fakesink_for_media_type("audio", needs_clock)
334            vfakesink = self.get_fakesink_for_media_type("video", needs_clock)
335            pipe_str += " audio-sink='%s' video-sink='%s'" % (
336                afakesink, vfakesink)
337
338        return pipe_str
339
340    def _get_name(self, scenario, protocol, minfo):
341        return "%s.%s" % (self.get_fname(scenario,
342                                         protocol),
343                          os.path.basename(minfo.media_descriptor.get_clean_name()))
344
345    def populate_tests(self, uri_minfo_special_scenarios, scenarios):
346        test_rtsp = GstValidateBaseTestManager.RTSP_SERVER_COMMAND
347        if not test_rtsp:
348            printc("\n\nRTSP server not available, you should make sure"
349                   " that %s is available in your $PATH." % GstValidateBaseTestManager.RTSP_SERVER_COMMAND,
350                   Colors.FAIL)
351        elif self.test_manager.options.disable_rtsp:
352            printc("\n\nRTSP tests are disabled")
353            test_rtsp = False
354
355        for uri, minfo, special_scenarios in uri_minfo_special_scenarios:
356            pipe = self._pipeline_template
357            protocol = minfo.media_descriptor.get_protocol()
358
359            if protocol == Protocols.RTSP:
360                self.debug("SKIPPING %s as it is a RTSP stream" % uri)
361                continue
362
363            pipe += " uri=%s" % uri
364
365            for scenario in special_scenarios + scenarios:
366                cpipe = pipe
367                if not minfo.media_descriptor.is_compatible(scenario):
368                    continue
369
370                cpipe = self._set_sinks(minfo, cpipe, scenario)
371                fname = self._get_name(scenario, protocol, minfo)
372
373                self.debug("Adding: %s", fname)
374
375                if scenario.does_reverse_playback() and protocol == Protocols.HTTP:
376                    # 10MB so we can reverse playback
377                    cpipe += " ring-buffer-max-size=10485760"
378
379                self.add_test(GstValidateLaunchTest(fname,
380                                                    self.test_manager.options,
381                                                    self.test_manager.reporter,
382                                                    cpipe,
383                                                    scenario=scenario,
384                                                    media_descriptor=minfo.media_descriptor)
385                              )
386
387                if test_rtsp and protocol == Protocols.FILE and not minfo.media_descriptor.is_image():
388                    rtspminfo = NamedDic({"path": minfo.media_descriptor.get_path(),
389                                          "media_descriptor": GstValidateRTSPMediaDesciptor(minfo.media_descriptor.get_path())})
390                    if not rtspminfo.media_descriptor.is_compatible(scenario):
391                        continue
392
393                    cpipe = self._set_sinks(rtspminfo, "%s uri=rtsp://127.0.0.1:<RTSPPORTNUMBER>/test"
394                                            % self._pipeline_template, scenario)
395                    fname = self._get_name(scenario, Protocols.RTSP, rtspminfo)
396
397                    self.add_test(GstValidateRTSPTest(
398                        fname, self.test_manager.options, self.test_manager.reporter,
399                        cpipe, uri, scenario=scenario,
400                        media_descriptor=rtspminfo.media_descriptor))
401
402                    fname = self._get_name(scenario, Protocols.RTSP + '2', rtspminfo)
403                    self.add_test(GstValidateRTSPTest(
404                        fname, self.test_manager.options, self.test_manager.reporter,
405                        cpipe, uri, scenario=scenario,
406                        media_descriptor=rtspminfo.media_descriptor,
407                        rtsp2=True))
408
409
410class GstValidateMixerTestsGenerator(GstValidatePipelineTestsGenerator):
411
412    def __init__(self, name, test_manager, mixer, media_type, converter="",
413                 num_sources=3, mixed_srcs=None, valid_scenarios=None):
414        mixed_srcs = mixed_srcs or {}
415        valid_scenarios = valid_scenarios or []
416
417        pipe_template = "%(mixer)s name=_mixer !  " + \
418            converter + " ! %(sink)s "
419        self.converter = converter
420        self.mixer = mixer
421        self.media_type = media_type
422        self.num_sources = num_sources
423        self.mixed_srcs = mixed_srcs
424        super(
425            GstValidateMixerTestsGenerator, self).__init__(name, test_manager, pipe_template,
426                                                           valid_scenarios=valid_scenarios)
427
428    def populate_tests(self, uri_minfo_special_scenarios, scenarios):
429        if self.test_manager.options.validate_uris:
430            return
431
432        wanted_ressources = []
433        for uri, minfo, special_scenarios in uri_minfo_special_scenarios:
434            protocol = minfo.media_descriptor.get_protocol()
435            if protocol == Protocols.FILE and \
436                    minfo.media_descriptor.get_num_tracks(self.media_type) > 0:
437                wanted_ressources.append((uri, minfo))
438
439        if not self.mixed_srcs:
440            if not wanted_ressources:
441                return
442
443            for i in range(len(uri_minfo_special_scenarios) / self.num_sources):
444                srcs = []
445                name = ""
446                for nsource in range(self.num_sources):
447                    uri, minfo = wanted_ressources[i + nsource]
448                    if os.getenv("USE_PLAYBIN3") is None:
449                        srcs.append(
450                            "uridecodebin uri=%s ! %s" % (uri, self.converter))
451                    else:
452                        srcs.append(
453                            "uridecodebin3 uri=%s ! %s" % (uri, self.converter))
454                    fname = os.path.basename(uri).replace(".", "_")
455                    if not name:
456                        name = fname
457                    else:
458                        name += "+%s" % fname
459
460                self.mixed_srcs[name] = tuple(srcs)
461
462        for name, srcs in self.mixed_srcs.items():
463            if isinstance(srcs, dict):
464                pipe_arguments = {
465                    "mixer": self.mixer + " %s" % srcs["mixer_props"]}
466                srcs = srcs["sources"]
467            else:
468                pipe_arguments = {"mixer": self.mixer}
469
470            for scenario in scenarios:
471                fname = self.get_fname(scenario, Protocols.FILE) + "."
472                fname += name
473
474                self.debug("Adding: %s", fname)
475
476                if self.test_manager.options.mute:
477                    pipe_arguments["sink"] = self.get_fakesink_for_media_type(self.media_type,
478                                                                              scenario.needs_clock_sync())
479                else:
480                    pipe_arguments["sink"] = "auto%ssink" % self.media_type
481
482                pipe = self._pipeline_template % pipe_arguments
483
484                for src in srcs:
485                    pipe += "%s ! _mixer. " % src
486
487                self.add_test(GstValidateLaunchTest(fname,
488                                                    self.test_manager.options,
489                                                    self.test_manager.reporter,
490                                                    pipe,
491                                                    scenario=scenario)
492                              )
493
494
495class GstValidateLaunchTest(GstValidateTest):
496
497    def __init__(self, classname, options, reporter, pipeline_desc,
498                 timeout=DEFAULT_TIMEOUT, scenario=None,
499                 media_descriptor=None, duration=0, hard_timeout=None,
500                 extra_env_variables=None, expected_issues=None):
501
502        extra_env_variables = extra_env_variables or {}
503
504        if scenario:
505            duration = scenario.get_duration()
506        elif media_descriptor:
507            duration = media_descriptor.get_duration() / GST_SECOND
508
509        super(
510            GstValidateLaunchTest, self).__init__(GstValidateBaseTestManager.COMMAND,
511                                                  classname,
512                                                  options, reporter,
513                                                  duration=duration,
514                                                  scenario=scenario,
515                                                  timeout=timeout,
516                                                  hard_timeout=hard_timeout,
517                                                  media_descriptor=media_descriptor,
518                                                  extra_env_variables=extra_env_variables,
519                                                  expected_issues=expected_issues)
520
521        self.pipeline_desc = pipeline_desc
522        self.media_descriptor = media_descriptor
523
524    def build_arguments(self):
525        GstValidateTest.build_arguments(self)
526        self.add_arguments(*shlex.split(self.pipeline_desc))
527        if self.media_descriptor is not None and self.media_descriptor.get_path():
528            self.add_arguments(
529                "--set-media-info", self.media_descriptor.get_path())
530
531
532class GstValidateMediaCheckTest(GstValidateTest):
533
534    def __init__(self, classname, options, reporter, media_descriptor,
535                 uri, minfo_path, timeout=DEFAULT_TIMEOUT,
536                 extra_env_variables=None,
537                 expected_issues=None):
538        extra_env_variables = extra_env_variables or {}
539
540        super(
541            GstValidateMediaCheckTest, self).__init__(GstValidateBaseTestManager.MEDIA_CHECK_COMMAND, classname,
542                                                      options, reporter,
543                                                      timeout=timeout,
544                                                      media_descriptor=media_descriptor,
545                                                      extra_env_variables=extra_env_variables,
546                                                      expected_issues=expected_issues)
547        self._uri = uri
548        self._media_info_path = minfo_path
549
550    def build_arguments(self):
551        Test.build_arguments(self)
552        self.add_arguments(self._uri, "--expected-results",
553                           self._media_info_path)
554
555        if self.media_descriptor.skip_parsers():
556            self.add_arguments("--skip-parsers")
557
558
559class GstValidateTranscodingTest(GstValidateTest, GstValidateEncodingTestInterface):
560    scenarios_manager = ScenarioManager()
561
562    def __init__(self, classname, options, reporter,
563                 combination, uri, media_descriptor,
564                 timeout=DEFAULT_TIMEOUT,
565                 scenario=None,
566                 extra_env_variables=None,
567                 expected_issues=None):
568        Loggable.__init__(self)
569
570        extra_env_variables = extra_env_variables or {}
571
572        file_dur = int(media_descriptor.get_duration()) / GST_SECOND
573        if not media_descriptor.get_num_tracks("video"):
574            self.debug("%s audio only file applying transcoding ratio."
575                       "File 'duration' : %s" % (classname, file_dur))
576            duration = file_dur / AUDIO_ONLY_FILE_TRANSCODING_RATIO
577        else:
578            duration = file_dur
579
580        super(
581            GstValidateTranscodingTest, self).__init__(GstValidateBaseTestManager.TRANSCODING_COMMAND,
582                                                       classname,
583                                                       options,
584                                                       reporter,
585                                                       duration=duration,
586                                                       timeout=timeout,
587                                                       scenario=scenario,
588                                                       media_descriptor=media_descriptor,
589                                                       extra_env_variables=None,
590                                                       expected_issues=expected_issues)
591        extra_env_variables = extra_env_variables or {}
592
593        GstValidateEncodingTestInterface.__init__(
594            self, combination, media_descriptor)
595
596        self.uri = uri
597
598    def run_external_checks(self):
599        if self.media_descriptor.get_num_tracks("video") == 1 and \
600                self.options.validate_enable_iqa_tests:
601            self.run_iqa_test(self.uri)
602
603    def set_rendering_info(self):
604        self.dest_file = os.path.join(self.options.dest,
605                                      self.classname.replace(".transcode.", os.sep).
606                                      replace(".", os.sep))
607        mkdir(os.path.dirname(urllib.parse.urlsplit(self.dest_file).path))
608        if urllib.parse.urlparse(self.dest_file).scheme == "":
609            self.dest_file = path2url(self.dest_file)
610
611        profile = self.get_profile()
612        self.add_arguments("-o", profile)
613
614    def build_arguments(self):
615        GstValidateTest.build_arguments(self)
616        self.set_rendering_info()
617        self.add_arguments(self.uri, self.dest_file)
618
619    def get_current_value(self):
620        if self.scenario:
621            sent_eos = self.sent_eos_position()
622            if sent_eos is not None:
623                t = time.time()
624                if ((t - sent_eos)) > 30:
625                    if self.media_descriptor.get_protocol() == Protocols.HLS:
626                        self.set_result(Result.PASSED,
627                                        """Got no EOS 30 seconds after sending EOS,
628                                        in HLS known and tolerated issue:
629                                        https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/issues/132""")
630                        return Result.KNOWN_ERROR
631
632                    self.set_result(
633                        Result.FAILED, "Pipeline did not stop 30 Seconds after sending EOS")
634
635                    return Result.FAILED
636
637        size = self.get_current_size()
638        if size is None:
639            return self.get_current_position()
640
641        return size
642
643    def check_results(self):
644        if self.result in [Result.FAILED, Result.TIMEOUT] or \
645                self.process.returncode != 0:
646            GstValidateTest.check_results(self)
647            return
648
649        res, msg = self.check_encoded_file()
650        self.set_result(res, msg)
651
652
653class GstValidateBaseRTSPTest:
654    """ Interface for RTSP tests, requires implementing Test"""
655    __used_ports = set()
656
657    def __init__(self, local_uri):
658        self._local_uri = local_uri
659        self.rtsp_server = None
660        self._unsetport_pipeline_desc = None
661        self.optional = True
662
663    @classmethod
664    def __get_open_port(cls):
665        while True:
666            # hackish trick from
667            # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python?answertab=votes#tab-top
668            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
669            s.bind(("", 0))
670            port = s.getsockname()[1]
671            if port not in cls.__used_ports:
672                cls.__used_ports.add(port)
673                s.close()
674                return port
675
676            s.close()
677
678    def launch_server(self):
679        if self.options.redirect_logs == 'stdout':
680            self.rtspserver_logs = sys.stdout
681        elif self.options.redirect_logs == 'stderr':
682            self.rtspserver_logs = sys.stderr
683
684        self.server_port = self.__get_open_port()
685        command = [GstValidateBaseTestManager.RTSP_SERVER_COMMAND, self._local_uri, '--port', str(self.server_port)]
686
687        if self.options.validate_gdb_server:
688            command = self.use_gdb(command)
689            self.rtspserver_logs = sys.stdout
690        elif self.options.redirect_logs:
691            self.rtspserver_logs = sys.stdout
692        else:
693            self.rtspserver_logs = open(self.logfile + '_rtspserver.log', 'w+')
694            self.extra_logfiles.add(self.rtspserver_logs.name)
695
696        server_env = os.environ.copy()
697
698        self.rtsp_server = subprocess.Popen(command,
699                                            stderr=self.rtspserver_logs,
700                                            stdout=self.rtspserver_logs,
701                                            env=server_env)
702        while True:
703            s = socket.socket()
704            try:
705                s.connect((("127.0.0.1", self.server_port)))
706                break
707            except ConnectionRefusedError:
708                time.sleep(0.1)
709                continue
710            finally:
711                s.close()
712
713        if not self._unsetport_pipeline_desc:
714            self._unsetport_pipeline_desc = self.pipeline_desc
715
716        self.pipeline_desc = self._unsetport_pipeline_desc.replace(
717            "<RTSPPORTNUMBER>", str(self.server_port))
718
719        return ' '.join(command)
720
721    def close_logfile(self):
722        super().close_logfile()
723        if not self.options.redirect_logs:
724            self.rtspserver_logs.close()
725
726    def process_update(self):
727        res = super().process_update()
728        if res:
729            kill_subprocess(self, self.rtsp_server, DEFAULT_TIMEOUT)
730            self.__used_ports.remove(self.server_port)
731
732        return res
733
734
735class GstValidateRTSPTest(GstValidateBaseRTSPTest, GstValidateLaunchTest):
736
737    def __init__(self, classname, options, reporter, pipeline_desc,
738                 local_uri, timeout=DEFAULT_TIMEOUT, scenario=None,
739                 media_descriptor=None, rtsp2=False):
740        GstValidateLaunchTest.__init__(self, classname, options, reporter,
741                                       pipeline_desc, timeout, scenario,
742                                       media_descriptor)
743        GstValidateBaseRTSPTest.__init__(self, local_uri)
744        self.rtsp2 = rtsp2
745
746    def get_subproc_env(self):
747        env = super().get_subproc_env()
748        path = env.get('GST_VALIDATE_SCENARIOS_PATH', '')
749        override_dir = get_data_file(os.path.join('data', 'scenarios'), 'rtsp_overrides')
750        env['GST_VALIDATE_SCENARIOS_PATH'] = '%s:%s' % (override_dir, path)
751        if self.rtsp2:
752            env['GST_VALIDATE_SCENARIO'] = env.get('GST_VALIDATE_SCENARIO', '') + ':' + 'force_rtsp2'
753
754        return env
755
756
757class GstValidateRTSPMediaDesciptor(GstValidateMediaDescriptor):
758
759    def __init__(self, xml_path):
760        GstValidateMediaDescriptor.__init__(self, xml_path)
761
762    def get_uri(self):
763        return "rtsp://127.0.0.1:8554/test"
764
765    def get_protocol(self):
766        return Protocols.RTSP
767
768    def prerrols(self):
769        return False
770
771
772class GstValidateTestManager(GstValidateBaseTestManager):
773
774    name = "validate"
775
776    # List of all classes to create testsuites
777    GstValidateMediaCheckTestsGenerator = GstValidateMediaCheckTestsGenerator
778    GstValidateTranscodingTestsGenerator = GstValidateTranscodingTestsGenerator
779    GstValidatePipelineTestsGenerator = GstValidatePipelineTestsGenerator
780    GstValidatePlaybinTestsGenerator = GstValidatePlaybinTestsGenerator
781    GstValidateMixerTestsGenerator = GstValidateMixerTestsGenerator
782    GstValidateLaunchTest = GstValidateLaunchTest
783    GstValidateMediaCheckTest = GstValidateMediaCheckTest
784    GstValidateTranscodingTest = GstValidateTranscodingTest
785
786    def __init__(self):
787        super(GstValidateTestManager, self).__init__()
788        self._uris = []
789        self._run_defaults = True
790        self._is_populated = False
791        self._default_generators_registered = False
792
793    def init(self):
794        for command, name in [
795                (GstValidateBaseTestManager.TRANSCODING_COMMAND, "gst-validate-transcoding-1.0"),
796                (GstValidateBaseTestManager.COMMAND, "gst-validate-1.0"),
797                (GstValidateBaseTestManager.MEDIA_CHECK_COMMAND, "gst-validate-media-check-1.0")]:
798            if not command:
799                self.error("command not found: %s" % name)
800                return False
801
802        return True
803
804    def add_options(self, parser):
805        group = parser.add_argument_group("GstValidate tools specific options"
806                                          " and behaviours",
807                                          description="""When using --wanted-tests, all the scenarios can be used, even those which have
808not been tested and explicitely activated if you set use --wanted-tests ALL""")
809        group.add_argument("--validate-check-uri", dest="validate_uris",
810                           action="append", help="defines the uris to run default tests on")
811        group.add_argument("--validate-tools-path", dest="validate_tools_path",
812                           action="append", help="defines the paths to look for GstValidate tools.")
813        group.add_argument("--validate-gdb-server", dest="validate_gdb_server",
814                           help="Run the server in GDB.")
815        group.add_argument("--validate-disable-rtsp", dest="disable_rtsp",
816                           help="Disable RTSP tests.")
817        group.add_argument("--validate-enable-iqa-tests", dest="validate_enable_iqa_tests",
818                           help="Enable Image Quality Assessment validation tests.",
819                           default=False, action='store_true')
820
821    def print_valgrind_bugs(self):
822        # Look for all the 'pending' bugs in our supp file
823        bugs = []
824        p = get_data_file('data', 'gstvalidate.supp')
825        with open(p) as f:
826            for line in f.readlines():
827                line = line.strip()
828                if line.startswith('# PENDING:'):
829                    tmp = line.split(' ')
830                    bugs.append(tmp[2])
831
832        if bugs:
833            msg = "Ignored valgrind bugs:\n"
834            for b in bugs:
835                msg += "  + %s\n" % b
836            printc(msg, Colors.FAIL, True)
837
838    def populate_testsuite(self):
839
840        if self._is_populated is True:
841            return
842
843        if not self.options.config and not self.options.testsuites:
844            if self._run_defaults:
845                self.register_defaults()
846            else:
847                self.register_all()
848
849        self._is_populated = True
850
851    def list_tests(self):
852        if self.tests:
853            return self.tests
854
855        if self._run_defaults:
856            scenarios = [self.scenarios_manager.get_scenario(scenario_name)
857                         for scenario_name in self.get_scenarios()]
858        else:
859            scenarios = self.scenarios_manager.get_scenario(None)
860        uris = self._list_uris()
861
862        for generator in self.get_generators():
863            for test in generator.generate_tests(uris, scenarios):
864                self.add_test(test)
865
866        if not self.tests and not uris and not self.options.wanted_tests:
867            self.info("No valid uris present in the path. Check if media files and info files exist")
868
869        return self.tests
870
871    def _add_media(self, media_info, uri=None):
872        self.debug("Checking %s", media_info)
873        if isinstance(media_info, GstValidateMediaDescriptor):
874            media_descriptor = media_info
875            media_info = media_descriptor.get_path()
876        else:
877            media_descriptor = GstValidateMediaDescriptor(media_info)
878
879        try:
880            # Just testing that the vairous mandatory infos are present
881            caps = media_descriptor.get_caps()
882            if uri is None:
883                uri = media_descriptor.get_uri()
884
885            # Adjust local http uri
886            if self.options.http_server_port != 8079 and \
887               uri.startswith("http://127.0.0.1:8079/"):
888                uri = uri.replace("http://127.0.0.1:8079/",
889                                  "http://127.0.0.1:%r/" % self.options.http_server_port, 1)
890            media_descriptor.set_protocol(urllib.parse.urlparse(uri).scheme)
891            for caps2, prot in GST_VALIDATE_CAPS_TO_PROTOCOL:
892                if caps2 == caps:
893                    media_descriptor.set_protocol(prot)
894                    break
895
896            scenario_bname = media_descriptor.get_media_filepath()
897            special_scenarios = self.scenarios_manager.find_special_scenarios(
898                scenario_bname)
899            self._uris.append((uri,
900                               NamedDic({"path": media_info,
901                                         "media_descriptor": media_descriptor}),
902                               special_scenarios))
903        except configparser.NoOptionError as e:
904            self.debug("Exception: %s for %s", e, media_info)
905
906    def _discover_file(self, uri, fpath):
907        for ext in (GstValidateMediaDescriptor.MEDIA_INFO_EXT,
908                GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT):
909            try:
910                is_push = False
911                media_info = "%s.%s" % (fpath, ext)
912                if ext == GstValidateMediaDescriptor.PUSH_MEDIA_INFO_EXT:
913                    if not os.path.exists(media_info):
914                        continue
915                    is_push = True
916                    uri = "push" + uri
917                args = GstValidateBaseTestManager.MEDIA_CHECK_COMMAND.split(" ")
918
919                args.append(uri)
920                if os.path.isfile(media_info) and not self.options.update_media_info:
921                    self._add_media(media_info, uri)
922                    continue
923                elif fpath.endswith(GstValidateMediaDescriptor.STREAM_INFO_EXT):
924                    self._add_media(fpath)
925                    continue
926                elif not self.options.generate_info and not self.options.update_media_info and not self.options.validate_uris:
927                    continue
928                elif self.options.update_media_info and not os.path.isfile(media_info):
929                    self.info(
930                        "%s not present. Use --generate-media-info", media_info)
931                    continue
932                elif os.path.islink(media_info):
933                    self.info(
934                        "%s is a symlink, not updating and hopefully the actual file gets updated!", media_info)
935                    continue
936
937                include_frames = 0
938                if self.options.update_media_info:
939                    include_frames = 2
940                elif self.options.generate_info_full:
941                    include_frames = 1
942
943                media_descriptor = GstValidateMediaDescriptor.new_from_uri(
944                    uri, True, include_frames, is_push)
945                if media_descriptor:
946                    self._add_media(media_descriptor, uri)
947                else:
948                    self.warning("Could not get any descriptor for %s" % uri)
949
950            except subprocess.CalledProcessError as e:
951                if self.options.generate_info:
952                    printc("Result: Failed", Colors.FAIL)
953                else:
954                    self.error("Exception: %s", e)
955                return False
956        return True
957
958    def _list_uris(self):
959        if self._uris:
960            return self._uris
961
962        if self.options.validate_uris:
963            for uri in self.options.validate_uris:
964                self._discover_file(uri, uri)
965            return self._uris
966
967        if not self.args:
968            if isinstance(self.options.paths, str):
969                self.options.paths = [os.path.join(self.options.paths)]
970
971            for path in self.options.paths:
972                if os.path.isfile(path):
973                    path = os.path.abspath(path)
974                    self._discover_file(path2url(path), path)
975                else:
976                    for root, dirs, files in os.walk(path):
977                        for f in files:
978                            fpath = os.path.abspath(os.path.join(root, f))
979                            if os.path.isdir(fpath) or \
980                                    fpath.endswith(GstValidateMediaDescriptor.MEDIA_INFO_EXT) or\
981                                    fpath.endswith(ScenarioManager.FILE_EXTENSION):
982                                continue
983                            else:
984                                self._discover_file(path2url(fpath), fpath)
985
986        self.debug("Uris found: %s", self._uris)
987
988        return self._uris
989
990    def needs_http_server(self):
991        for test in self.list_tests():
992            if self._is_test_wanted(test) and test.media_descriptor is not None:
993                protocol = test.media_descriptor.get_protocol()
994                uri = test.media_descriptor.get_uri()
995
996                if protocol in [Protocols.HTTP, Protocols.HLS, Protocols.DASH] and \
997                        ("127.0.0.1:%s" % (
998                            self.options.http_server_port) in uri or "127.0.0.1:8079" in uri):
999                    return True
1000        return False
1001
1002    def set_settings(self, options, args, reporter):
1003        if options.wanted_tests:
1004            for i in range(len(options.wanted_tests)):
1005                if "ALL" in options.wanted_tests[i]:
1006                    self._run_defaults = False
1007                    options.wanted_tests[
1008                        i] = options.wanted_tests[i].replace("ALL", "")
1009        try:
1010            options.wanted_tests.remove("")
1011        except ValueError:
1012            pass
1013
1014        if options.validate_uris:
1015            self.check_testslist = False
1016
1017        super(GstValidateTestManager, self).set_settings(
1018            options, args, reporter)
1019
1020    def register_defaults(self):
1021        """
1022        Registers the defaults:
1023            * Scenarios to be used
1024            * Encoding formats to be used
1025            * Blacklisted tests
1026            * Test generators
1027        """
1028        printc("-> Registering default 'validate' tests... ", end='')
1029        self.register_default_scenarios()
1030        self.register_default_encoding_formats()
1031        self.register_default_blacklist()
1032        self.register_default_test_generators()
1033        printc("OK", Colors.OKGREEN)
1034
1035    def register_default_scenarios(self):
1036        """
1037        Registers default test scenarios
1038        """
1039        if self.options.long_limit != 0:
1040            self.add_scenarios([
1041                "play_15s",
1042                "reverse_playback",
1043                "fast_forward",
1044                "seek_forward",
1045                "seek_backward",
1046                "seek_with_stop",
1047                "switch_audio_track",
1048                "switch_audio_track_while_paused",
1049                "switch_subtitle_track",
1050                "switch_subtitle_track_while_paused",
1051                "disable_subtitle_track_while_paused",
1052                "change_state_intensive",
1053                "scrub_forward_seeking"])
1054        else:
1055            self.add_scenarios([
1056                "play_15s",
1057                "reverse_playback",
1058                "fast_forward",
1059                "seek_forward",
1060                "seek_backward",
1061                "seek_with_stop",
1062                "switch_audio_track",
1063                "switch_audio_track_while_paused",
1064                "switch_subtitle_track",
1065                "switch_subtitle_track_while_paused",
1066                "disable_subtitle_track_while_paused",
1067                "change_state_intensive",
1068                "scrub_forward_seeking"])
1069
1070    def register_default_encoding_formats(self):
1071        """
1072        Registers default encoding formats
1073        """
1074        self.add_encoding_formats([
1075            MediaFormatCombination("ogg", "vorbis", "theora"),
1076            MediaFormatCombination("webm", "vorbis", "vp8"),
1077            MediaFormatCombination("mp4", "mp3", "h264"),
1078            MediaFormatCombination("mkv", "vorbis", "h264"),
1079        ])
1080
1081    def register_default_blacklist(self):
1082        self.set_default_blacklist([
1083            # testbin known issues
1084            ("testbin.media_check.*",
1085             "Not supported by GstDiscoverer."),
1086
1087            # dash known issues
1088            ("dash.media_check.*",
1089             "Caps are different depending on selected bitrates, etc"),
1090
1091            # Matroska/WEBM known issues:
1092            ("*.reverse_playback.*webm$",
1093             "https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/issues/65"),
1094            ("*.reverse_playback.*mkv$",
1095             "https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/issues/65"),
1096            ("http.playback.seek_with_stop.*webm",
1097             "matroskademux.gst_matroska_demux_handle_seek_push: Seek end-time not supported in streaming mode"),
1098            ("http.playback.seek_with_stop.*mkv",
1099             "matroskademux.gst_matroska_demux_handle_seek_push: Seek end-time not supported in streaming mode"),
1100
1101            # MPEG TS known issues:
1102            ('(?i)*playback.reverse_playback.*(?:_|.)(?:|m)ts$',
1103             "https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/issues/97"),
1104
1105            # Fragmented MP4 disabled tests:
1106            ('*.playback..*seek.*.fragmented_nonseekable_sink_mp4',
1107             "Seeking on fragmented files without indexes isn't implemented"),
1108            ('*.playback.reverse_playback.fragmented_nonseekable_sink_mp4',
1109             "Seeking on fragmented files without indexes isn't implemented"),
1110
1111            # HTTP known issues:
1112            ("http.*scrub_forward_seeking.*",
1113             "This is not stable enough for now."),
1114            ("http.playback.change_state_intensive.raw_video_mov",
1115             "This is not stable enough for now. (flow return from pad push doesn't match expected value)"),
1116
1117            # MXF known issues"
1118            ("*reverse_playback.*mxf",
1119             "Reverse playback is not handled in MXF"),
1120            ("file\.transcode.*mxf",
1121             "FIXME: Transcoding and mixing tests need to be tested"),
1122
1123            # WMV known issues"
1124            ("*reverse_playback.*wmv",
1125             "Reverse playback is not handled in wmv"),
1126            (".*reverse_playback.*asf",
1127             "Reverse playback is not handled in asf"),
1128
1129            # ogg known issues
1130            ("http.playback.seek.*vorbis_theora_1_ogg",
1131             "https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/issues/281"),
1132            # RTSP known issues
1133            ('rtsp.*playback.reverse.*',
1134             'https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/issues/32'),
1135            ('rtsp.*playback.seek_with_stop.*',
1136             'https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/issues/386'),
1137            ('rtsp.*playback.fast_*',
1138             'https://gitlab.freedesktop.org/gstreamer/gst-rtsp-server/issues/14'),
1139        ])
1140
1141    def register_default_test_generators(self):
1142        """
1143        Registers default test generators
1144        """
1145        if self._default_generators_registered:
1146            return
1147
1148        self.add_generators([GstValidatePlaybinTestsGenerator(self),
1149                             GstValidateMediaCheckTestsGenerator(self),
1150                             GstValidateTranscodingTestsGenerator(self)])
1151        self._default_generators_registered = True
1152