1 /* GStreamer
2  *
3  * tests for the ipcpipelinesrc/ipcpipelinesink elements
4  *
5  * Copyright (C) 2015-2017 YouView TV Ltd
6  *   Author: Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
7  *   Author: George Kiagiadakis <george.kiagiadakis@collabora.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 
25 #ifdef HAVE_CONFIG_H
26 #include "config.h"
27 #endif
28 
29 #define _GNU_SOURCE             /* See feature_test_macros(7) */
30 #include <unistd.h>
31 #include <stdio.h>
32 #include <fcntl.h>
33 #include <sys/wait.h>
34 #include <sys/file.h>
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <gst/check/gstcheck.h>
38 #include <string.h>
39 
40 #ifndef HAVE_PIPE2
41 static int
pipe2(int pipedes[2],int flags)42 pipe2 (int pipedes[2], int flags)
43 {
44   int ret = pipe (pipedes);
45   if (ret < 0)
46     return ret;
47   if (flags != 0) {
48     ret = fcntl (pipedes[0], F_SETFL, flags);
49     if (ret < 0)
50       return ret;
51     ret = fcntl (pipedes[1], F_SETFL, flags);
52     if (ret < 0)
53       return ret;
54   }
55   return 0;
56 }
57 #endif
58 
59 /* This enum contains flags that are used to configure the setup that
60  * test_base() will do internally */
61 typedef enum
62 {
63   /* Features related to the multi-process setup */
64   TEST_FEATURE_SPLIT_SINKS = 0x1,       /* separate audio and video sink processes */
65   TEST_FEATURE_RECOVERY_SLAVE_PROCESS = 0x2,
66   TEST_FEATURE_RECOVERY_MASTER_PROCESS = 0x4,
67 
68   TEST_FEATURE_HAS_VIDEO = 0x10,
69   TEST_FEATURE_LIVE = 0x20,     /* sets is-live=true in {audio,video}testsrc */
70   TEST_FEATURE_ASYNC_SINK = 0x40,       /* sets sync=false in fakesink */
71   TEST_FEATURE_ERROR_SINK = 0x80,       /* generates error message in the slave */
72   TEST_FEATURE_LONG_DURATION = 0x100,   /* bigger num-buffers in {audio,video}testsrc */
73   TEST_FEATURE_FILTER_SINK_CAPS = 0x200,        /* plugs capsfilter before fakesink */
74 
75   /* Source selection; Use only one of those, do not combine! */
76   TEST_FEATURE_TEST_SOURCE = 0x400,
77   TEST_FEATURE_WAV_SOURCE = 0x800,
78   TEST_FEATURE_MPEGTS_SOURCE = 0x1000 | TEST_FEATURE_HAS_VIDEO,
79   TEST_FEATURE_LIVE_A_SOURCE =
80       TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_LIVE | TEST_FEATURE_ASYNC_SINK,
81   TEST_FEATURE_LIVE_AV_SOURCE =
82       TEST_FEATURE_LIVE_A_SOURCE | TEST_FEATURE_HAS_VIDEO,
83 } TestFeatures;
84 
85 /* This is the data structure that each function of the each test receives
86  * in user_data. It contains pointers to stack-allocated, test-specific
87  * structures that contain the test parameters (input data), the runtime
88  * data of the master (source) process (master data) and the runtime data
89  * of the slave (sink) process (slave data) */
90 typedef struct
91 {
92   gpointer id;                  /* input data struct */
93   gpointer md;                  /* master data struct */
94   gpointer sd;                  /* slave data struct */
95 
96   TestFeatures features;        /* the features that this test is running with */
97 
98   /* whether there is both an audio and a video stream
99    * in this process'es pipeline */
100   gboolean two_streams;
101 
102   /* the pipeline of this process; could be either master or slave */
103   GstElement *p;
104 
105   /* this callback will be called in the master process when
106    * the master gets STATE_CHANGED with the new state being state_target */
107   void (*state_changed_cb) (gpointer);
108   GstState state_target;
109 
110   /* used by EXCLUSIVE_CALL() */
111   gint exclusive_call_counter;
112 } test_data;
113 
114 /* All pipelines do not start buffers at exactly zero, so we consider
115    timestamps within a small tolerance to be zero */
116 #define CLOSE_ENOUGH_TO_ZERO (GST_SECOND / 5)
117 
118 /* milliseconds */
119 #define STEP_AT  100
120 #define PAUSE_AT 500
121 #define SEEK_AT  700
122 #define QUERY_AT 600
123 #define MESSAGE_AT 600
124 #define CRASH_AT 600
125 #define STOP_AT  600
126 
127 /* Rough duration of the sample files we use */
128 #define MPEGTS_SAMPLE_ROUGH_DURATION (GST_SECOND * 64 / 10)
129 #define WAV_SAMPLE_ROUGH_DURATION (GST_SECOND * 65 / 10)
130 
131 enum
132 {
133   MSG_ACK = 0,
134   MSG_START = 1
135 };
136 
137 static GMainLoop *loop;
138 static gboolean child_dead;
139 static int pipesfa[2], pipesba[2], pipesfv[2], pipesbv[2];
140 static int ctlsock[2];
141 static int recovery_pid = 0;
142 static int check_fd = -1;
143 static GList *weak_refs = NULL;
144 
145 /* lock helpers */
146 
147 #define FAIL_IF(x) do { lock_check (); fail_if(x); unlock_check (); } while(0)
148 #define FAIL_UNLESS(x) do { lock_check (); fail_unless(x); unlock_check (); } while(0)
149 #define FAIL_UNLESS_EQUALS_INT(x,y) do { lock_check (); fail_unless_equals_int(x,y); unlock_check (); } while(0)
150 #define FAIL() do { lock_check (); fail(); unlock_check (); } while(0)
151 
152 static void
lock_check(void)153 lock_check (void)
154 {
155   flock (check_fd, LOCK_EX);
156 }
157 
158 static void
unlock_check(void)159 unlock_check (void)
160 {
161   flock (check_fd, LOCK_UN);
162 }
163 
164 static void
setup_lock(void)165 setup_lock (void)
166 {
167   gchar *name = NULL;
168   check_fd = g_file_open_tmp (NULL, &name, NULL);
169   unlink (name);
170   g_free (name);
171 }
172 
173 /* tracking for ipcpipeline elements; this is used mainly to detect leaks,
174  * but also to provide a method for calling "disconnect" on all of them
175  * in the tests that require it */
176 
177 static void
remove_weak_ref(GstElement * element)178 remove_weak_ref (GstElement * element)
179 {
180   weak_refs = g_list_remove (weak_refs, element);
181 }
182 
183 static void
add_weak_ref(GstElement * element)184 add_weak_ref (GstElement * element)
185 {
186   weak_refs = g_list_append (weak_refs, element);
187   g_object_weak_ref (G_OBJECT (element), (GWeakNotify) remove_weak_ref,
188       element);
189 }
190 
191 static void
disconnect_ipcpipeline_elements(void)192 disconnect_ipcpipeline_elements (void)
193 {
194   GList *l;
195 
196   for (l = weak_refs; l; l = l->next) {
197     g_signal_emit_by_name (G_OBJECT (l->data), "disconnect", NULL);
198   }
199 }
200 
201 /* helper functions */
202 
203 #define EXCLUSIVE_CALL(td,func) \
204   G_STMT_START { \
205     if (!td->two_streams || \
206         g_atomic_int_add (&td->exclusive_call_counter, 1) == 1) { \
207       func; \
208     } \
209   } G_STMT_END
210 
211 static void
cleanup_bus(GstElement * pipeline)212 cleanup_bus (GstElement * pipeline)
213 {
214   gst_bus_remove_watch (GST_ELEMENT_BUS (pipeline));
215   gst_bus_set_flushing (GST_ELEMENT_BUS (pipeline), TRUE);
216 }
217 
218 static void
setup_log(const char * logfile,int append)219 setup_log (const char *logfile, int append)
220 {
221   FILE *f;
222 
223   f = fopen (logfile, append ? "a+" : "w");
224   gst_debug_add_log_function (gst_debug_log_default, f, NULL);
225 }
226 
227 static GstElement *
create_pipeline(const char * type)228 create_pipeline (const char *type)
229 {
230   GstElement *pipeline;
231 
232   pipeline = gst_element_factory_make (type, NULL);
233   FAIL_UNLESS (pipeline);
234 
235   return pipeline;
236 }
237 
238 static GQuark
to_be_removed_quark(void)239 to_be_removed_quark (void)
240 {
241   static GQuark q = 0;
242   if (!q)
243     q = g_quark_from_static_string ("to_be_removed");
244   return q;
245 }
246 
247 static gboolean
are_caps_audio(const GstCaps * caps)248 are_caps_audio (const GstCaps * caps)
249 {
250   GstStructure *structure;
251   const char *name;
252 
253   structure = gst_caps_get_structure (caps, 0);
254   name = gst_structure_get_name (structure);
255   return g_str_has_prefix (name, "audio/");
256 }
257 
258 static gboolean
are_caps_video(const GstCaps * caps)259 are_caps_video (const GstCaps * caps)
260 {
261   GstStructure *structure;
262   const char *name;
263 
264   structure = gst_caps_get_structure (caps, 0);
265   name = gst_structure_get_name (structure);
266   return (g_str_has_prefix (name, "video/")
267       && strcmp (name, "video/x-dvd-subpicture"));
268 }
269 
270 static int
caps2idx(GstCaps * caps,gboolean two_streams)271 caps2idx (GstCaps * caps, gboolean two_streams)
272 {
273   int idx;
274 
275   if (!two_streams)
276     return 0;
277 
278   if (are_caps_audio (caps)) {
279     idx = 0;
280   } else if (are_caps_video (caps)) {
281     idx = 1;
282   } else {
283     FAIL_IF (1);
284     idx = 0;
285   }
286   return idx;
287 }
288 
289 static int
pad2idx(GstPad * pad,gboolean two_streams)290 pad2idx (GstPad * pad, gboolean two_streams)
291 {
292   GstCaps *caps;
293   int idx;
294 
295   if (!two_streams)
296     return 0;
297 
298   caps = gst_pad_get_current_caps (pad);
299   if (!caps)
300     caps = gst_pad_get_pad_template_caps (pad);
301   FAIL_UNLESS (caps);
302 
303   idx = caps2idx (caps, two_streams);
304 
305   gst_caps_unref (caps);
306   return idx;
307 }
308 
309 static gboolean
stop_pipeline(gpointer user_data)310 stop_pipeline (gpointer user_data)
311 {
312   GstElement *pipeline = user_data;
313   GstStateChangeReturn ret;
314 
315   ret = gst_element_set_state (pipeline, GST_STATE_NULL);
316   FAIL_IF (ret == GST_STATE_CHANGE_FAILURE);
317   gst_object_unref (pipeline);
318   g_main_loop_quit (loop);
319   return FALSE;
320 }
321 
322 static void
hook_peer_probe_types(const GValue * sinkv,GstPadProbeCallback probe,unsigned int types,gpointer user_data)323 hook_peer_probe_types (const GValue * sinkv, GstPadProbeCallback probe,
324     unsigned int types, gpointer user_data)
325 {
326   GstElement *sink;
327   GstPad *pad, *peer;
328 
329   sink = g_value_get_object (sinkv);
330   FAIL_UNLESS (sink);
331   pad = gst_element_get_static_pad (sink, "sink");
332   FAIL_UNLESS (pad);
333   peer = gst_pad_get_peer (pad);
334   FAIL_UNLESS (peer);
335   gst_pad_add_probe (peer, types, probe, user_data, NULL);
336   gst_object_unref (peer);
337   gst_object_unref (pad);
338 }
339 
340 static void
hook_probe_types(const GValue * sinkv,GstPadProbeCallback probe,unsigned int types,gpointer user_data)341 hook_probe_types (const GValue * sinkv, GstPadProbeCallback probe,
342     unsigned int types, gpointer user_data)
343 {
344   GstElement *sink;
345   GstPad *pad;
346 
347   sink = g_value_get_object (sinkv);
348   FAIL_UNLESS (sink);
349   pad = gst_element_get_static_pad (sink, "sink");
350   FAIL_UNLESS (pad);
351   gst_pad_add_probe (pad, types, probe, user_data, NULL);
352   gst_object_unref (pad);
353 }
354 
355 static void
hook_probe(const GValue * sinkv,GstPadProbeCallback probe,gpointer user_data)356 hook_probe (const GValue * sinkv, GstPadProbeCallback probe, gpointer user_data)
357 {
358   hook_probe_types (sinkv, probe,
359       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
360       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, user_data);
361 }
362 
363 /* the master process'es async GstBus callback */
364 static gboolean
master_bus_msg(GstBus * bus,GstMessage * message,gpointer user_data)365 master_bus_msg (GstBus * bus, GstMessage * message, gpointer user_data)
366 {
367   test_data *td = user_data;
368 
369   switch (GST_MESSAGE_TYPE (message)) {
370     case GST_MESSAGE_ERROR:{
371       GError *err;
372       gchar *dbg;
373 
374       /* elements we are removing might error out as they are taken out
375          of the pipeline, and fail to push. We don't care about those. */
376       if (g_object_get_qdata (G_OBJECT (GST_MESSAGE_SRC (message)),
377               to_be_removed_quark ()))
378         break;
379 
380       gst_message_parse_error (message, &err, &dbg);
381       g_printerr ("ERROR: %s\n", err->message);
382       if (dbg != NULL)
383         g_printerr ("ERROR debug information: %s\n", dbg);
384       g_error_free (err);
385       g_free (dbg);
386       g_assert_not_reached ();
387       break;
388     }
389     case GST_MESSAGE_WARNING:{
390       GError *err;
391       gchar *dbg;
392 
393       gst_message_parse_warning (message, &err, &dbg);
394       g_printerr ("WARNING: %s\n", err->message);
395       if (dbg != NULL)
396         g_printerr ("WARNING debug information: %s\n", dbg);
397       g_error_free (err);
398       g_free (dbg);
399       g_assert_not_reached ();
400       break;
401     }
402     case GST_MESSAGE_EOS:
403       g_main_loop_quit (loop);
404       break;
405     case GST_MESSAGE_STATE_CHANGED:
406       if (GST_MESSAGE_SRC (message) == GST_OBJECT_CAST (td->p)
407           && td->state_changed_cb) {
408         GstState state;
409         gst_message_parse_state_changed (message, NULL, &state, NULL);
410         if (state == td->state_target)
411           td->state_changed_cb (td);
412       }
413       break;
414     default:
415       break;
416   }
417   return TRUE;
418 }
419 
420 /* source construction functions */
421 
422 static GstElement *
create_wavparse_source_loc(const char * loc,int fdina,int fdouta)423 create_wavparse_source_loc (const char *loc, int fdina, int fdouta)
424 {
425   GstElement *sbin, *pipeline, *filesrc, *ipcpipelinesink;
426   GError *e = NULL;
427 
428   pipeline = create_pipeline ("pipeline");
429   sbin =
430       gst_parse_bin_from_description ("pushfilesrc name=filesrc ! wavparse",
431       TRUE, &e);
432   FAIL_IF (e || !sbin);
433   gst_element_set_name (sbin, "source");
434   filesrc = gst_bin_get_by_name (GST_BIN (sbin), "filesrc");
435   FAIL_UNLESS (filesrc);
436   g_object_set (filesrc, "location", loc, NULL);
437   gst_object_unref (filesrc);
438   ipcpipelinesink =
439       gst_element_factory_make ("ipcpipelinesink", "ipcpipelinesink");
440   add_weak_ref (ipcpipelinesink);
441   g_object_set (ipcpipelinesink, "fdin", fdina, "fdout", fdouta, NULL);
442   gst_bin_add_many (GST_BIN (pipeline), sbin, ipcpipelinesink, NULL);
443   FAIL_UNLESS (gst_element_link_many (sbin, ipcpipelinesink, NULL));
444 
445   return pipeline;
446 }
447 
448 static void
on_pad_added(GstElement * element,GstPad * pad,gpointer data)449 on_pad_added (GstElement * element, GstPad * pad, gpointer data)
450 {
451   GstCaps *caps;
452   GstElement *next;
453   GstBin *pipeline = data;
454   GstPad *sink_pad;
455 
456   caps = gst_pad_get_current_caps (pad);
457   if (!caps)
458     caps = gst_pad_get_pad_template_caps (pad);
459 
460   if (are_caps_video (caps)) {
461     next = gst_bin_get_by_name (GST_BIN (pipeline), "vqueue");
462   } else if (are_caps_audio (caps)) {
463     next = gst_bin_get_by_name (GST_BIN (pipeline), "aqueue");
464   } else {
465     gst_caps_unref (caps);
466     return;
467   }
468   gst_caps_unref (caps);
469 
470   FAIL_UNLESS (next);
471   sink_pad = gst_element_get_static_pad (next, "sink");
472   FAIL_UNLESS (sink_pad);
473   FAIL_UNLESS (gst_pad_link (pad, sink_pad) == GST_PAD_LINK_OK);
474   gst_object_unref (sink_pad);
475 
476   gst_object_unref (next);
477 }
478 
479 static GstElement *
create_mpegts_source_loc(const char * loc,int fdina,int fdouta,int fdinv,int fdoutv)480 create_mpegts_source_loc (const char *loc, int fdina, int fdouta, int fdinv,
481     int fdoutv)
482 {
483   GstElement *pipeline, *filesrc, *tsdemux, *aqueue, *vqueue, *aipcpipelinesink,
484       *vipcpipelinesink;
485 
486   pipeline = create_pipeline ("pipeline");
487   filesrc = gst_element_factory_make ("filesrc", NULL);
488   g_object_set (filesrc, "location", loc, NULL);
489   tsdemux = gst_element_factory_make ("tsdemux", NULL);
490   g_signal_connect (tsdemux, "pad-added", G_CALLBACK (on_pad_added), pipeline);
491   aqueue = gst_element_factory_make ("queue", "aqueue");
492   aipcpipelinesink = gst_element_factory_make ("ipcpipelinesink", NULL);
493   add_weak_ref (aipcpipelinesink);
494   g_object_set (aipcpipelinesink, "fdin", fdina, "fdout", fdouta, NULL);
495   vqueue = gst_element_factory_make ("queue", "vqueue");
496   vipcpipelinesink = gst_element_factory_make ("ipcpipelinesink", NULL);
497   add_weak_ref (vipcpipelinesink);
498   g_object_set (vipcpipelinesink, "fdin", fdinv, "fdout", fdoutv, NULL);
499   gst_bin_add_many (GST_BIN (pipeline), filesrc, tsdemux, aqueue,
500       aipcpipelinesink, vqueue, vipcpipelinesink, NULL);
501   FAIL_UNLESS (gst_element_link_many (filesrc, tsdemux, NULL));
502   FAIL_UNLESS (gst_element_link_many (aqueue, aipcpipelinesink, NULL));
503   FAIL_UNLESS (gst_element_link_many (vqueue, vipcpipelinesink, NULL));
504 
505   return pipeline;
506 }
507 
508 static GstElement *
create_test_source(gboolean live,int fdina,int fdouta,int fdinv,int fdoutv,gboolean audio,gboolean video,gboolean Long)509 create_test_source (gboolean live, int fdina, int fdouta, int fdinv, int fdoutv,
510     gboolean audio, gboolean video, gboolean Long)
511 {
512   GstElement *pipeline, *audiotestsrc, *aipcpipelinesink;
513   GstElement *videotestsrc, *vipcpipelinesink;
514   int L = Long ? 2 : 1;
515 
516   pipeline = create_pipeline ("pipeline");
517 
518   if (audio) {
519     audiotestsrc = gst_element_factory_make ("audiotestsrc", "audiotestsrc");
520     g_object_set (audiotestsrc, "is-live", live, "num-buffers",
521         live ? 270 * L : 600, NULL);
522     aipcpipelinesink = gst_element_factory_make ("ipcpipelinesink",
523         "aipcpipelinesink");
524     add_weak_ref (aipcpipelinesink);
525     g_object_set (aipcpipelinesink, "fdin", fdina, "fdout", fdouta, NULL);
526     gst_bin_add_many (GST_BIN (pipeline), audiotestsrc, aipcpipelinesink, NULL);
527     FAIL_UNLESS (gst_element_link_many (audiotestsrc, aipcpipelinesink, NULL));
528   }
529 
530   if (video) {
531     videotestsrc = gst_element_factory_make ("videotestsrc", "videotestsrc");
532     g_object_set (videotestsrc, "is-live", live, "num-buffers",
533         live ? 190 * L : 600, NULL);
534     vipcpipelinesink =
535         gst_element_factory_make ("ipcpipelinesink", "vipcpipelinesink");
536     add_weak_ref (vipcpipelinesink);
537     g_object_set (vipcpipelinesink, "fdin", fdinv, "fdout", fdoutv, NULL);
538     gst_bin_add_many (GST_BIN (pipeline), videotestsrc, vipcpipelinesink, NULL);
539     FAIL_UNLESS (gst_element_link_many (videotestsrc, vipcpipelinesink, NULL));
540   }
541 
542   return pipeline;
543 }
544 
545 static GstElement *
create_source(TestFeatures features,int fdina,int fdouta,int fdinv,int fdoutv,test_data * td)546 create_source (TestFeatures features, int fdina, int fdouta, int fdinv,
547     int fdoutv, test_data * td)
548 {
549   GstElement *pipeline = NULL;
550   gboolean live = ! !(features & TEST_FEATURE_LIVE);
551   gboolean longdur = ! !(features & TEST_FEATURE_LONG_DURATION);
552   gboolean has_video = ! !(features & TEST_FEATURE_HAS_VIDEO);
553 
554   if (features & TEST_FEATURE_TEST_SOURCE) {
555 
556     pipeline = create_test_source (live, fdina, fdouta, fdinv, fdoutv, TRUE,
557         has_video, longdur);
558   } else if (features & TEST_FEATURE_WAV_SOURCE) {
559     pipeline = create_wavparse_source_loc ("../../tests/files/sine.wav", fdina,
560         fdouta);
561   } else if (features & TEST_FEATURE_MPEGTS_SOURCE) {
562     pipeline = create_mpegts_source_loc ("../../tests/files/test.ts", fdina,
563         fdouta, fdinv, fdoutv);
564   } else {
565     g_assert_not_reached ();
566   }
567 
568   td->two_streams = has_video;
569   td->p = pipeline;
570 
571   if (pipeline)
572     gst_bus_add_watch (GST_ELEMENT_BUS (pipeline), master_bus_msg, td);
573 
574   return pipeline;
575 }
576 
577 /* sink construction */
578 
579 static GstElement *
create_sink(TestFeatures features,GstElement ** slave_pipeline,int fdin,int fdout,const char * filter_caps)580 create_sink (TestFeatures features, GstElement ** slave_pipeline,
581     int fdin, int fdout, const char *filter_caps)
582 {
583   GstElement *ipcpipelinesrc, *fakesink, *identity, *capsfilter, *endpoint;
584   GstCaps *caps;
585 
586   if (!*slave_pipeline)
587     *slave_pipeline = create_pipeline ("ipcslavepipeline");
588   else
589     gst_object_ref (*slave_pipeline);
590   ipcpipelinesrc = gst_element_factory_make ("ipcpipelinesrc", NULL);
591   add_weak_ref (ipcpipelinesrc);
592   g_object_set (ipcpipelinesrc, "fdin", fdin, "fdout", fdout, NULL);
593   fakesink = gst_element_factory_make ("fakesink", NULL);
594   g_object_set (fakesink, "sync", !(features & TEST_FEATURE_ASYNC_SINK), NULL);
595   gst_bin_add_many (GST_BIN (*slave_pipeline), ipcpipelinesrc, fakesink, NULL);
596   endpoint = ipcpipelinesrc;
597 
598   if (features & TEST_FEATURE_ERROR_SINK &&
599       !g_strcmp0 (filter_caps, "audio/x-raw")) {
600     identity = gst_element_factory_make ("identity", "error-element");
601     g_object_set (identity, "error-after", 5, NULL);
602     gst_bin_add (GST_BIN (*slave_pipeline), identity);
603     FAIL_UNLESS (gst_element_link_many (endpoint, identity, NULL));
604     endpoint = identity;
605   }
606 
607   if ((features & TEST_FEATURE_FILTER_SINK_CAPS) && filter_caps) {
608     capsfilter = gst_element_factory_make ("capsfilter", NULL);
609     caps = gst_caps_from_string (filter_caps);
610     FAIL_UNLESS (caps);
611     g_object_set (capsfilter, "caps", caps, NULL);
612     gst_caps_unref (caps);
613     gst_bin_add (GST_BIN (*slave_pipeline), capsfilter);
614     FAIL_UNLESS (gst_element_link_many (endpoint, capsfilter, NULL));
615     endpoint = capsfilter;
616   }
617   FAIL_UNLESS (gst_element_link_many (endpoint, fakesink, NULL));
618 
619   return *slave_pipeline;
620 }
621 
622 static void
ensure_sink_setup(GstElement * sink,void (* setup_sink)(GstElement *,void *),gpointer user_data)623 ensure_sink_setup (GstElement * sink, void (*setup_sink) (GstElement *, void *),
624     gpointer user_data)
625 {
626   static GQuark setup_done = 0;
627   test_data *td = user_data;
628 
629   if (!setup_done)
630     setup_done = g_quark_from_static_string ("setup_done");
631 
632   if (sink)
633     td->p = sink;
634 
635   if (sink && setup_sink && !g_object_get_qdata (G_OBJECT (sink), setup_done)) {
636     g_object_set_qdata (G_OBJECT (sink), setup_done, GINT_TO_POINTER (1));
637     setup_sink (sink, user_data);
638   }
639 }
640 
641 /* GstCheck multi-process setup helpers */
642 
643 static void
on_child_exit(int signal)644 on_child_exit (int signal)
645 {
646   int status = 0;
647   if (waitpid (-1, &status, 0) > 0 && status) {
648     FAIL ();
649     exit (status);
650   } else {
651     child_dead = TRUE;
652   }
653 }
654 
655 static void
die_on_child_death(void)656 die_on_child_death (void)
657 {
658   struct sigaction sa;
659 
660   memset (&sa, 0, sizeof (sa));
661   sa.sa_handler = on_child_exit;
662   sigaction (SIGCHLD, &sa, NULL);
663 }
664 
665 static void
wait_for_recovery(void)666 wait_for_recovery (void)
667 {
668   int value;
669 
670   FAIL_UNLESS (ctlsock[1]);
671   FAIL_UNLESS (read (ctlsock[1], &value, sizeof (int)) == sizeof (int));
672   FAIL_UNLESS (value == MSG_START);
673 }
674 
675 static void
ack_recovery(void)676 ack_recovery (void)
677 {
678   int value = MSG_ACK;
679   FAIL_UNLESS (ctlsock[1]);
680   FAIL_UNLESS (write (ctlsock[1], &value, sizeof (int)) == sizeof (int));
681 }
682 
683 static void
recreate_crashed_slave_process(void)684 recreate_crashed_slave_process (void)
685 {
686   int value = MSG_START;
687   /* We don't recreate, because there seems to be some subtle issues
688      with forking after gst has started running. So we create a new
689      recovery process at start, and wake it up after the current
690      slave dies, so it can take its place. It's a bit hacky, but it
691      works. The spare process waits for SIGUSR2 to setup a replacement
692      pipeline and connect to the master. */
693   FAIL_UNLESS (recovery_pid);
694   FAIL_UNLESS (ctlsock[0]);
695   FAIL_UNLESS (write (ctlsock[0], &value, sizeof (int)) == sizeof (int));
696   FAIL_UNLESS (read (ctlsock[0], &value, sizeof (int)) == sizeof (int));
697   FAIL_UNLESS (value == MSG_ACK);
698 }
699 
700 static gboolean
crash(gpointer user_data)701 crash (gpointer user_data)
702 {
703   _exit (0);
704 }
705 
706 static gboolean
unwind(gpointer user_data)707 unwind (gpointer user_data)
708 {
709   g_main_loop_quit (loop);
710   return FALSE;
711 }
712 
713 static void
on_unwind(int signal)714 on_unwind (int signal)
715 {
716   g_idle_add (unwind, NULL);
717 }
718 
719 static void
listen_for_unwind(void)720 listen_for_unwind (void)
721 {
722   struct sigaction sa;
723 
724   memset (&sa, 0, sizeof (sa));
725   sa.sa_handler = on_unwind;
726   sigaction (SIGUSR1, &sa, NULL);
727 }
728 
729 static void
stop_listening_for_unwind(void)730 stop_listening_for_unwind (void)
731 {
732   struct sigaction sa;
733 
734   memset (&sa, 0, sizeof (sa));
735   sa.sa_handler = SIG_DFL;
736   sigaction (SIGUSR1, &sa, NULL);
737 }
738 
739 #define TEST_BASE(...) test_base(__FUNCTION__,##__VA_ARGS__)
740 
741 /*
742  * This is the main function driving the tests. All tests configure it
743  * by way of all the function pointers it takes as arguments, which have
744  * self-explanatory names.
745  * Most tests are run over a number of different pipelines with the same
746  * configuration (eg, a wavparse based pipeline, a live pipeline with
747  * test audio/video, etc). Those pipelines that have more than one sink
748  * (eg, MPEG-TS source demuxing audio and video) have a version with a
749  * single slave pipeline and process, and a version with the audio and
750  * video sinks in two different processes, each with its slave pipeline.
751  * The master and slave crash tests are also run via this function, and
752  * have specific code (grep for recovery).
753  * There is a fair amount of hairy stuff to do with letting the main
754  * check process when a subprocess has failed. Best not to look at it
755  * and let it do its thing.
756  * To add new tests, duplicate a set of tests, eg the *_end_of_stream
757  * ones, and s/_end_of_stream/new_test_name/g. Then do the same for
758  * the functions they pass as parameters to test_base. Typically, the
759  * source creation sets a message hook to catch things like async-done
760  * messages. Sink creation typically adds a probe to check that events,
761  * buffers, etc, come through as expected. The two success functions
762  * check all went well for the source and sink. Note that since all of
763  * these functions take the same user data structure, and the process
764  * will fork, writing something from one process will not be reflected
765  * in the other, so there is usually a subset of data relevant to the
766  * source, and another to the sink. But some have data relevant to both,
767  * it depends on the test and what you are doing.
768  * New tests do not have to use this framework, it just avoids spending
769  * more time and effort on multi process handling.
770  */
771 static void
test_base(const char * name,TestFeatures features,void (* run_source)(GstElement *,void *),void (* setup_sink)(GstElement *,void *),void (* check_success_source)(void *),void (* check_success_sink)(void *),gpointer input_data,gpointer master_data,gpointer slave_data)772 test_base (const char *name, TestFeatures features,
773     void (*run_source) (GstElement *, void *),
774     void (*setup_sink) (GstElement *, void *),
775     void (*check_success_source) (void *),
776     void (*check_success_sink) (void *),
777     gpointer input_data, gpointer master_data, gpointer slave_data)
778 {
779   GstElement *source = NULL, *asink = NULL, *vsink = NULL;
780   GstElement *slave_pipeline = NULL;
781   GstStateChangeReturn ret;
782   gboolean c_src, c_sink;
783   pid_t pid = 0;
784   unsigned char x;
785   int master_recovery_pid_comm[2] = { -1, -1 };
786   test_data td = { input_data, master_data, slave_data, features, FALSE, NULL,
787     NULL, GST_STATE_NULL, 0
788   };
789 
790   g_print ("Testing: %s\n", name);
791 
792   weak_refs = NULL;
793 
794   FAIL_IF (pipe2 (pipesfa, O_NONBLOCK) < 0);
795   FAIL_IF (pipe2 (pipesba, O_NONBLOCK) < 0);
796   FAIL_IF (pipe2 (pipesfv, O_NONBLOCK) < 0);
797   FAIL_IF (pipe2 (pipesbv, O_NONBLOCK) < 0);
798   FAIL_IF (socketpair (PF_UNIX, SOCK_STREAM, 0, ctlsock) < 0);
799 
800   FAIL_IF (pipesfa[0] < 0);
801   FAIL_IF (pipesfa[1] < 0);
802   FAIL_IF (pipesba[0] < 0);
803   FAIL_IF (pipesba[1] < 0);
804   FAIL_IF (pipesfv[0] < 0);
805   FAIL_IF (pipesfv[1] < 0);
806   FAIL_IF (pipesbv[0] < 0);
807   FAIL_IF (pipesbv[1] < 0);
808 
809   gst_debug_remove_log_function (gst_debug_log_default);
810 
811   listen_for_unwind ();
812   child_dead = FALSE;
813 
814   if (features & TEST_FEATURE_RECOVERY_MASTER_PROCESS) {
815     /* the other master will let us know its child's PID so we can unwind
816        it when we're finished */
817     FAIL_IF (pipe2 (master_recovery_pid_comm, O_NONBLOCK) < 0);
818 
819     recovery_pid = fork ();
820     if (recovery_pid > 0) {
821       /* we're the main process that libcheck waits for */
822       die_on_child_death ();
823       while (!child_dead)
824         g_usleep (1000);
825       /* leave some time for the slave to timeout (1 second), record error, etc */
826       g_usleep (1500 * 1000);
827 
828       /* Discard anything that was sent to the previous process when it died */
829       while (read (pipesba[0], &x, 1) == 1);
830 
831       FAIL_UNLESS (read (master_recovery_pid_comm[0], &pid,
832               sizeof (pid)) == sizeof (pid));
833 
834       setup_log ("gstsrc.log", TRUE);
835       source = create_source (features, pipesba[0], pipesfa[1], pipesbv[0],
836           pipesfv[1], &td);
837       FAIL_UNLESS (source);
838       if (run_source)
839         run_source (source, &td);
840       goto setup_done;
841     }
842   }
843 
844   if (features & TEST_FEATURE_RECOVERY_SLAVE_PROCESS) {
845     recovery_pid = fork ();
846     if (!recovery_pid) {
847       wait_for_recovery ();
848 
849       /* Discard anything that was sent to the previous process when it died */
850       while (read (pipesfa[0], &x, 1) == 1);
851 
852       setup_log ("gstasink.log", TRUE);
853       asink = create_sink (features, &slave_pipeline, pipesfa[0], pipesba[1],
854           "audio/x-raw");
855       FAIL_UNLESS (asink);
856       ensure_sink_setup (asink, setup_sink, &td);
857       ack_recovery ();
858       goto setup_done;
859     }
860   }
861 
862   pid = fork ();
863   FAIL_IF (pid < 0);
864   if (pid) {
865     if (features & TEST_FEATURE_RECOVERY_MASTER_PROCESS) {
866       FAIL_UNLESS (write (master_recovery_pid_comm[1], &pid,
867               sizeof (pid)) == sizeof (pid));
868     }
869     die_on_child_death ();
870     if (features & TEST_FEATURE_SPLIT_SINKS) {
871       pid = fork ();
872       FAIL_IF (pid < 0);
873       if (pid) {
874         die_on_child_death ();
875       }
876       c_src = ! !pid;
877       c_sink = !pid;
878     } else {
879       c_src = TRUE;
880       c_sink = FALSE;
881     }
882     if (c_src) {
883       setup_log ("gstsrc.log", FALSE);
884       source = create_source (features, pipesba[0], pipesfa[1], pipesbv[0],
885           pipesfv[1], &td);
886       FAIL_UNLESS (source);
887       run_source (source, &td);
888     }
889     if (c_sink) {
890       setup_log ("gstasink.log", FALSE);
891       asink = create_sink (features, &slave_pipeline, pipesfa[0], pipesba[1],
892           "audio/x-raw");
893       FAIL_UNLESS (asink);
894     }
895   } else {
896     td.two_streams = (features & TEST_FEATURE_HAS_VIDEO) &&
897         !(features & TEST_FEATURE_SPLIT_SINKS);
898 
899     if (features & TEST_FEATURE_HAS_VIDEO) {
900       setup_log ("gstvsink.log", FALSE);
901       vsink = create_sink (features, &slave_pipeline, pipesfv[0], pipesbv[1],
902           "video/x-raw");
903       FAIL_UNLESS (vsink);
904     }
905     if (!(features & TEST_FEATURE_SPLIT_SINKS)) {
906       setup_log ("gstasink.log", FALSE);
907       asink = create_sink (features, &slave_pipeline, pipesfa[0], pipesba[1],
908           "audio/x-raw");
909       FAIL_UNLESS (asink);
910     }
911   }
912 
913 setup_done:
914   ensure_sink_setup (asink, setup_sink, &td);
915   ensure_sink_setup (vsink, setup_sink, &td);
916 
917   loop = g_main_loop_new (NULL, FALSE);
918   g_main_loop_run (loop);
919 
920   /* tell the child process to unwind too */
921   stop_listening_for_unwind ();
922 
923   if (source) {
924     ret = gst_element_set_state (source, GST_STATE_NULL);
925     FAIL_UNLESS (ret == GST_STATE_CHANGE_SUCCESS
926         || ret == GST_STATE_CHANGE_ASYNC);
927   }
928 
929   if (pid)
930     kill (pid, SIGUSR1);
931 
932   g_main_loop_unref (loop);
933 
934   if (source) {
935     cleanup_bus (source);
936     if (check_success_source)
937       check_success_source (&td);
938   } else {
939     if (asink)
940       cleanup_bus (asink);
941     if (vsink)
942       cleanup_bus (vsink);
943     if (check_success_sink)
944       check_success_sink (&td);
945   }
946 
947   disconnect_ipcpipeline_elements ();
948 
949   close (pipesfa[0]);
950   close (pipesfa[1]);
951   close (pipesba[0]);
952   close (pipesba[1]);
953   close (pipesfv[0]);
954   close (pipesfv[1]);
955   close (pipesbv[0]);
956   close (pipesbv[1]);
957 
958   /* If we have a child, we must now wait for it to be finished.
959      We can't just waitpid, because this child might be still doing
960      its shutdown, and might assert, and the die_on_child_death
961      function will exit with the right exit code if so. So we wait
962      for the child_dead boolean to be set, which die_on_child_death
963      sets if the child dies normally. */
964   if (pid) {
965     while (!child_dead)
966       g_usleep (1000);
967   }
968 
969   if (source) {
970     FAIL_UNLESS_EQUALS_INT (GST_OBJECT_REFCOUNT_VALUE (source), 1);
971     gst_object_unref (source);
972   }
973   /* asink and vsink may be the same object, so refcount is not sure to be 1 */
974   if (asink)
975     gst_object_unref (asink);
976   if (vsink)
977     gst_object_unref (vsink);
978 
979   /* cleanup tasks a bit earlier to make sure all weak refs are gone */
980   gst_task_cleanup_all ();
981 
982   /* all ipcpipeline elements we created should now be destroyed */
983   if (weak_refs) {
984 #if 1
985     /* to make it easier to see what leaks */
986     GList *l;
987     for (l = weak_refs; l; l = l->next) {
988       g_print ("%s has %u refs\n", GST_ELEMENT_NAME (l->data),
989           GST_OBJECT_REFCOUNT_VALUE (l->data));
990     }
991 #endif
992     FAIL_UNLESS (0);
993   }
994 }
995 
996 /**** play-pause test ****/
997 
998 typedef struct
999 {
1000   gboolean got_state_changed_to_playing[2];
1001   gboolean got_state_changed_to_paused;
1002 } play_pause_master_data;
1003 
1004 #define PLAY_PAUSE_MASTER_DATA_INIT { { FALSE, FALSE }, FALSE }
1005 
1006 typedef struct
1007 {
1008   gboolean got_caps[2];
1009   gboolean got_segment[2];
1010   gboolean got_buffer[2];
1011 } play_pause_slave_data;
1012 
1013 #define PLAY_PAUSE_SLAVE_DATA_INIT \
1014   { { FALSE, FALSE }, { FALSE, FALSE }, { FALSE, FALSE } }
1015 
1016 static gboolean
idlenull(gpointer user_data)1017 idlenull (gpointer user_data)
1018 {
1019   test_data *td = user_data;
1020   GstStateChangeReturn ret;
1021 
1022   ret = gst_element_set_state (td->p, GST_STATE_NULL);
1023   FAIL_UNLESS (ret == GST_STATE_CHANGE_SUCCESS);
1024   gst_object_unref (td->p);
1025   g_main_loop_quit (loop);
1026   return G_SOURCE_REMOVE;
1027 }
1028 
1029 static gboolean idleplay (gpointer user_data);
1030 static gboolean
idlepause(gpointer user_data)1031 idlepause (gpointer user_data)
1032 {
1033   test_data *td = user_data;
1034   play_pause_master_data *d = td->md;
1035   GstStateChangeReturn ret;
1036 
1037   ret = gst_element_set_state (td->p, GST_STATE_PAUSED);
1038   FAIL_IF (ret == GST_STATE_CHANGE_FAILURE);
1039   if (ret == GST_STATE_CHANGE_SUCCESS || ret == GST_STATE_CHANGE_NO_PREROLL) {
1040     /* if the state change is not async, we won't get an aync-done, but
1041        this is expected, so set the flag here */
1042     d->got_state_changed_to_paused = TRUE;
1043     td->state_target = GST_STATE_PLAYING;
1044     g_timeout_add (STEP_AT, idleplay, user_data);
1045     return G_SOURCE_REMOVE;
1046   }
1047   gst_object_unref (td->p);
1048   return G_SOURCE_REMOVE;
1049 }
1050 
1051 static gboolean
idleplay(gpointer user_data)1052 idleplay (gpointer user_data)
1053 {
1054   test_data *td = user_data;
1055   play_pause_master_data *d = td->md;
1056   GstStateChangeReturn ret;
1057 
1058   ret = gst_element_set_state (td->p, GST_STATE_PLAYING);
1059   FAIL_IF (ret == GST_STATE_CHANGE_FAILURE);
1060   if (ret == GST_STATE_CHANGE_SUCCESS || ret == GST_STATE_CHANGE_NO_PREROLL) {
1061     /* if the state change is not async, we won't get an aync-done, but
1062        this is expected, so set the flag here */
1063     d->got_state_changed_to_playing[1] = TRUE;
1064     td->state_target = GST_STATE_NULL;
1065     g_timeout_add (STEP_AT, idlenull, user_data);
1066     return G_SOURCE_REMOVE;
1067   }
1068   gst_object_unref (td->p);
1069   return G_SOURCE_REMOVE;
1070 }
1071 
1072 static void
play_pause_on_state_changed(gpointer user_data)1073 play_pause_on_state_changed (gpointer user_data)
1074 {
1075   test_data *td = user_data;
1076   play_pause_master_data *d = td->md;
1077   GstStateChangeReturn ret;
1078 
1079   if (d->got_state_changed_to_paused) {
1080     d->got_state_changed_to_playing[1] = TRUE;
1081     td->state_target = GST_STATE_NULL;
1082     ret = gst_element_set_state (td->p, GST_STATE_NULL);
1083     FAIL_UNLESS (ret == GST_STATE_CHANGE_SUCCESS);
1084     g_main_loop_quit (loop);
1085   } else if (d->got_state_changed_to_playing[0]) {
1086     d->got_state_changed_to_paused = TRUE;
1087     td->state_target = GST_STATE_PLAYING;
1088     gst_object_ref (td->p);
1089     g_timeout_add (STEP_AT, (GSourceFunc) idleplay, td);
1090   } else {
1091     d->got_state_changed_to_playing[0] = TRUE;
1092     td->state_target = GST_STATE_PAUSED;
1093     gst_object_ref (td->p);
1094     g_timeout_add (STEP_AT, (GSourceFunc) idlepause, td);
1095   }
1096 }
1097 
1098 static void
play_pause_source(GstElement * source,void * user_data)1099 play_pause_source (GstElement * source, void *user_data)
1100 {
1101   test_data *td = user_data;
1102   GstStateChangeReturn ret;
1103 
1104   td->state_target = GST_STATE_PLAYING;
1105   td->state_changed_cb = play_pause_on_state_changed;
1106   ret = gst_element_set_state (source, GST_STATE_PLAYING);
1107   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
1108 }
1109 
1110 static GstPadProbeReturn
play_pause_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)1111 play_pause_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1112 {
1113   test_data *td = user_data;
1114   play_pause_slave_data *d = td->sd;
1115   GstCaps *caps;
1116 
1117   if (GST_IS_BUFFER (info->data)) {
1118     d->got_buffer[pad2idx (pad, td->two_streams)] = TRUE;
1119   } else if (GST_IS_EVENT (info->data)) {
1120     if (GST_EVENT_TYPE (info->data) == GST_EVENT_CAPS) {
1121       gst_event_parse_caps (info->data, &caps);
1122       d->got_caps[caps2idx (caps, td->two_streams)] = TRUE;
1123     } else if (GST_EVENT_TYPE (info->data) == GST_EVENT_SEGMENT) {
1124       d->got_segment[pad2idx (pad, td->two_streams)] = TRUE;
1125     }
1126   }
1127 
1128   return GST_PAD_PROBE_OK;
1129 }
1130 
1131 static void
hook_play_pause_probe(const GValue * v,gpointer user_data)1132 hook_play_pause_probe (const GValue * v, gpointer user_data)
1133 {
1134   hook_probe (v, play_pause_probe, user_data);
1135 }
1136 
1137 static void
setup_sink_play_pause(GstElement * sink,void * user_data)1138 setup_sink_play_pause (GstElement * sink, void *user_data)
1139 {
1140   GstIterator *it;
1141 
1142   it = gst_bin_iterate_sinks (GST_BIN (sink));
1143   while (gst_iterator_foreach (it, hook_play_pause_probe, user_data))
1144     gst_iterator_resync (it);
1145   gst_iterator_free (it);
1146 }
1147 
1148 static void
check_success_source_play_pause(void * user_data)1149 check_success_source_play_pause (void *user_data)
1150 {
1151   test_data *td = user_data;
1152   play_pause_master_data *d = td->md;
1153 
1154   FAIL_UNLESS (d->got_state_changed_to_playing[0]);
1155   FAIL_UNLESS (d->got_state_changed_to_playing[1]);
1156   FAIL_UNLESS (d->got_state_changed_to_paused);
1157 }
1158 
1159 static void
check_success_sink_play_pause(void * user_data)1160 check_success_sink_play_pause (void *user_data)
1161 {
1162   test_data *td = user_data;
1163   play_pause_slave_data *d = td->sd;
1164   int idx;
1165 
1166   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
1167     FAIL_UNLESS (d->got_caps[idx]);
1168     FAIL_UNLESS (d->got_segment[idx]);
1169     FAIL_UNLESS (d->got_buffer[idx]);
1170   }
1171 }
1172 
GST_START_TEST(test_empty_play_pause)1173 GST_START_TEST (test_empty_play_pause)
1174 {
1175   play_pause_master_data md = PLAY_PAUSE_MASTER_DATA_INIT;
1176   play_pause_slave_data sd = PLAY_PAUSE_SLAVE_DATA_INIT;
1177 
1178   TEST_BASE (TEST_FEATURE_TEST_SOURCE, play_pause_source, setup_sink_play_pause,
1179       check_success_source_play_pause, check_success_sink_play_pause, NULL, &md,
1180       &sd);
1181 }
1182 
1183 GST_END_TEST;
1184 
GST_START_TEST(test_wavparse_play_pause)1185 GST_START_TEST (test_wavparse_play_pause)
1186 {
1187   play_pause_master_data md = PLAY_PAUSE_MASTER_DATA_INIT;
1188   play_pause_slave_data sd = PLAY_PAUSE_SLAVE_DATA_INIT;
1189 
1190   TEST_BASE (TEST_FEATURE_WAV_SOURCE, play_pause_source, setup_sink_play_pause,
1191       check_success_source_play_pause, check_success_sink_play_pause, NULL, &md,
1192       &sd);
1193 }
1194 
1195 GST_END_TEST;
1196 
GST_START_TEST(test_mpegts_play_pause)1197 GST_START_TEST (test_mpegts_play_pause)
1198 {
1199   play_pause_master_data md = PLAY_PAUSE_MASTER_DATA_INIT;
1200   play_pause_slave_data sd = PLAY_PAUSE_SLAVE_DATA_INIT;
1201 
1202   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, play_pause_source,
1203       setup_sink_play_pause, check_success_source_play_pause,
1204       check_success_sink_play_pause, NULL, &md, &sd);
1205 }
1206 
1207 GST_END_TEST;
1208 
GST_START_TEST(test_mpegts_2_play_pause)1209 GST_START_TEST (test_mpegts_2_play_pause)
1210 {
1211   play_pause_master_data md = PLAY_PAUSE_MASTER_DATA_INIT;
1212   play_pause_slave_data sd = PLAY_PAUSE_SLAVE_DATA_INIT;
1213 
1214   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
1215       play_pause_source, setup_sink_play_pause, check_success_source_play_pause,
1216       check_success_sink_play_pause, NULL, &md, &sd);
1217 }
1218 
1219 GST_END_TEST;
1220 
GST_START_TEST(test_live_a_play_pause)1221 GST_START_TEST (test_live_a_play_pause)
1222 {
1223   play_pause_master_data md = PLAY_PAUSE_MASTER_DATA_INIT;
1224   play_pause_slave_data sd = PLAY_PAUSE_SLAVE_DATA_INIT;
1225 
1226   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, play_pause_source,
1227       setup_sink_play_pause, check_success_source_play_pause,
1228       check_success_sink_play_pause, NULL, &md, &sd);
1229 }
1230 
1231 GST_END_TEST;
1232 
GST_START_TEST(test_live_av_play_pause)1233 GST_START_TEST (test_live_av_play_pause)
1234 {
1235   play_pause_master_data md = PLAY_PAUSE_MASTER_DATA_INIT;
1236   play_pause_slave_data sd = PLAY_PAUSE_SLAVE_DATA_INIT;
1237 
1238   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, play_pause_source,
1239       setup_sink_play_pause, check_success_source_play_pause,
1240       check_success_sink_play_pause, NULL, &md, &sd);
1241 }
1242 
1243 GST_END_TEST;
1244 
GST_START_TEST(test_live_av_2_play_pause)1245 GST_START_TEST (test_live_av_2_play_pause)
1246 {
1247   play_pause_master_data md = PLAY_PAUSE_MASTER_DATA_INIT;
1248   play_pause_slave_data sd = PLAY_PAUSE_SLAVE_DATA_INIT;
1249 
1250   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
1251       play_pause_source, setup_sink_play_pause, check_success_source_play_pause,
1252       check_success_sink_play_pause, NULL, &md, &sd);
1253 }
1254 
1255 GST_END_TEST;
1256 
1257 /**** flushing seek test ****/
1258 
1259 typedef struct
1260 {
1261   gboolean segment_seek;
1262   gboolean pause;
1263 } flushing_seek_input_data;
1264 
1265 #define FLUSHING_SEEK_INPUT_DATA_INIT { FALSE, FALSE }
1266 #define FLUSHING_SEEK_INPUT_DATA_INIT_PAUSED { FALSE, TRUE }
1267 #define FLUSHING_SEEK_INPUT_DATA_INIT_SEGMENT_SEEK { TRUE, FALSE }
1268 
1269 typedef struct
1270 {
1271   gboolean got_state_changed_to_playing;
1272   gboolean got_segment_done;
1273   gboolean seek_sent;
1274 } flushing_seek_master_data;
1275 
1276 #define FLUSHING_SEEK_MASTER_DATA_INIT { FALSE, FALSE, FALSE }
1277 
1278 typedef struct
1279 {
1280   GstClockTime first_ts[2];
1281   gboolean got_caps[2];
1282   gboolean got_buffer_before_seek[2];
1283   gboolean got_buffer_after_seek[2];
1284   gboolean first_buffer_after_seek_has_timestamp_0[2];
1285   gboolean got_segment_after_seek[2];
1286   gboolean got_flush_start[2];
1287   gboolean got_flush_stop[2];
1288 } flushing_seek_slave_data;
1289 
1290 #define FLUSHING_SEEK_SLAVE_DATA_INIT { { 0, 0 }, }
1291 
1292 static gboolean
send_flushing_seek(gpointer user_data)1293 send_flushing_seek (gpointer user_data)
1294 {
1295   test_data *td = user_data;
1296   const flushing_seek_input_data *i = td->id;
1297   flushing_seek_master_data *d = td->md;
1298   GstEvent *seek_event;
1299 
1300   if (i->segment_seek) {
1301     GST_INFO_OBJECT (td->p, "Sending segment seek");
1302     seek_event =
1303         gst_event_new_seek (1.0, GST_FORMAT_TIME,
1304         GST_SEEK_FLAG_SEGMENT | GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, 0,
1305         GST_SEEK_TYPE_SET, 1 * GST_SECOND);
1306     FAIL_UNLESS (gst_element_send_event (td->p, seek_event));
1307   } else {
1308     GST_INFO_OBJECT (td->p, "Sending flushing seek");
1309     gst_element_seek_simple (td->p, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH, 0);
1310     g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline,
1311         gst_object_ref (td->p));
1312   }
1313   d->seek_sent = TRUE;
1314   return G_SOURCE_REMOVE;
1315 }
1316 
1317 static gboolean
pause_before_seek(gpointer user_data)1318 pause_before_seek (gpointer user_data)
1319 {
1320   test_data *td = user_data;
1321   GstStateChangeReturn ret;
1322 
1323   ret = gst_element_set_state (td->p, GST_STATE_PAUSED);
1324   FAIL_IF (ret == GST_STATE_CHANGE_FAILURE);
1325 
1326   return G_SOURCE_REMOVE;
1327 }
1328 
1329 static gboolean
flushing_seek_bus_msg(GstBus * bus,GstMessage * message,gpointer user_data)1330 flushing_seek_bus_msg (GstBus * bus, GstMessage * message, gpointer user_data)
1331 {
1332   test_data *td = user_data;
1333   flushing_seek_master_data *d = td->md;
1334 
1335   if (GST_IS_PIPELINE (GST_MESSAGE_SRC (message))) {
1336     if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_SEGMENT_DONE) {
1337       d->got_segment_done = TRUE;
1338       g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline,
1339           gst_object_ref (td->p));
1340     }
1341   }
1342   return master_bus_msg (bus, message, user_data);
1343 }
1344 
1345 static void
flushing_seek_on_state_changed(gpointer user_data)1346 flushing_seek_on_state_changed (gpointer user_data)
1347 {
1348   test_data *td = user_data;
1349   const flushing_seek_input_data *i = td->id;
1350   flushing_seek_master_data *d = td->md;
1351 
1352   if (!d->got_state_changed_to_playing) {
1353     d->got_state_changed_to_playing = TRUE;
1354     if (i->pause)
1355       g_timeout_add (PAUSE_AT, (GSourceFunc) pause_before_seek, td);
1356     g_timeout_add (SEEK_AT, (GSourceFunc) send_flushing_seek, td);
1357   }
1358 }
1359 
1360 static void
flushing_seek_source(GstElement * source,gpointer user_data)1361 flushing_seek_source (GstElement * source, gpointer user_data)
1362 {
1363   test_data *td = user_data;
1364   GstStateChangeReturn ret;
1365 
1366   /* we're on the source, there's already the basic master_bus_msg watch,
1367      and gst doesn't want more than one watch, so we remove the watch and
1368      call it directly when done in the new watch */
1369   gst_bus_remove_watch (GST_ELEMENT_BUS (source));
1370   gst_bus_add_watch (GST_ELEMENT_BUS (source), flushing_seek_bus_msg,
1371       user_data);
1372   td->state_target = GST_STATE_PLAYING;
1373   td->state_changed_cb = flushing_seek_on_state_changed;
1374   ret = gst_element_set_state (source, GST_STATE_PLAYING);
1375   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
1376 }
1377 
1378 static GstPadProbeReturn
flushing_seek_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)1379 flushing_seek_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1380 {
1381   test_data *td = user_data;
1382   flushing_seek_slave_data *d = td->sd;
1383   GstClockTime ts;
1384   int idx;
1385   GstCaps *caps;
1386 
1387   if (GST_IS_BUFFER (info->data)) {
1388     idx = pad2idx (pad, td->two_streams);
1389     if (d->got_flush_stop[idx]) {
1390       if (!d->got_buffer_after_seek[idx]) {
1391         ts = GST_BUFFER_TIMESTAMP (info->data);
1392         d->first_buffer_after_seek_has_timestamp_0[idx] =
1393             (ts < d->first_ts[idx] + 10 * GST_MSECOND);
1394         d->got_buffer_after_seek[idx] = TRUE;
1395       }
1396     } else if (!d->got_buffer_before_seek[idx]) {
1397       d->got_buffer_before_seek[idx] = TRUE;
1398       d->first_ts[idx] = GST_BUFFER_TIMESTAMP (info->data);
1399     }
1400   } else if (GST_IS_EVENT (info->data)) {
1401     if (GST_EVENT_TYPE (info->data) == GST_EVENT_CAPS) {
1402       gst_event_parse_caps (info->data, &caps);
1403       if (are_caps_audio (caps) || are_caps_video (caps)) {
1404         idx = caps2idx (caps, td->two_streams);
1405         d->got_caps[idx] = TRUE;
1406       }
1407     } else if (GST_EVENT_TYPE (info->data) == GST_EVENT_SEGMENT) {
1408       /* from the sink pipeline, we don't know whether the master issued a seek,
1409          as the seek_sent memory location isn't directly accesible to us, so we
1410          look for a segment after a buffer to mean a seek was sent */
1411       idx = pad2idx (pad, td->two_streams);
1412       if (d->got_buffer_before_seek[idx])
1413         d->got_segment_after_seek[idx] = TRUE;
1414     } else if (GST_EVENT_TYPE (info->data) == GST_EVENT_FLUSH_START) {
1415       idx = pad2idx (pad, td->two_streams);
1416       d->got_flush_start[idx] = TRUE;
1417     } else if (GST_EVENT_TYPE (info->data) == GST_EVENT_FLUSH_STOP) {
1418       idx = pad2idx (pad, td->two_streams);
1419       if (d->got_buffer_before_seek[idx])
1420         d->got_flush_stop[idx] = TRUE;
1421     }
1422   }
1423 
1424   return GST_PAD_PROBE_OK;
1425 }
1426 
1427 static void
hook_flushing_seek_probe(const GValue * v,gpointer user_data)1428 hook_flushing_seek_probe (const GValue * v, gpointer user_data)
1429 {
1430   hook_probe (v, flushing_seek_probe, user_data);
1431 }
1432 
1433 static void
setup_sink_flushing_seek(GstElement * sink,gpointer user_data)1434 setup_sink_flushing_seek (GstElement * sink, gpointer user_data)
1435 {
1436   GstIterator *it;
1437 
1438   it = gst_bin_iterate_sinks (GST_BIN (sink));
1439   while (gst_iterator_foreach (it, hook_flushing_seek_probe, user_data))
1440     gst_iterator_resync (it);
1441   gst_iterator_free (it);
1442 }
1443 
1444 static void
check_success_source_flushing_seek(gpointer user_data)1445 check_success_source_flushing_seek (gpointer user_data)
1446 {
1447   test_data *td = user_data;
1448   const flushing_seek_input_data *i = td->id;
1449   flushing_seek_master_data *d = td->md;
1450 
1451   FAIL_UNLESS (d->got_state_changed_to_playing);
1452   FAIL_UNLESS (d->seek_sent);
1453   FAIL_UNLESS (d->got_segment_done == i->segment_seek);
1454 }
1455 
1456 static void
check_success_sink_flushing_seek(gpointer user_data)1457 check_success_sink_flushing_seek (gpointer user_data)
1458 {
1459   test_data *td = user_data;
1460   flushing_seek_slave_data *d = td->sd;
1461   gint idx;
1462 
1463   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
1464     FAIL_UNLESS (d->got_caps[idx]);
1465     FAIL_UNLESS (d->got_buffer_before_seek[idx]);
1466     FAIL_UNLESS (d->got_buffer_after_seek[idx]);
1467     FAIL_UNLESS (d->got_segment_after_seek[idx]);
1468     FAIL_UNLESS (d->got_flush_start[idx]);
1469     FAIL_UNLESS (d->got_flush_stop[idx]);
1470     FAIL_UNLESS (d->first_buffer_after_seek_has_timestamp_0[idx]);
1471   }
1472 }
1473 
GST_START_TEST(test_empty_flushing_seek)1474 GST_START_TEST (test_empty_flushing_seek)
1475 {
1476   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT;
1477   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1478   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1479 
1480   TEST_BASE (TEST_FEATURE_TEST_SOURCE, flushing_seek_source,
1481       setup_sink_flushing_seek, check_success_source_flushing_seek,
1482       check_success_sink_flushing_seek, &id, &md, &sd);
1483 }
1484 
1485 GST_END_TEST;
1486 
GST_START_TEST(test_wavparse_flushing_seek)1487 GST_START_TEST (test_wavparse_flushing_seek)
1488 {
1489   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT;
1490   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1491   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1492 
1493   TEST_BASE (TEST_FEATURE_WAV_SOURCE, flushing_seek_source,
1494       setup_sink_flushing_seek, check_success_source_flushing_seek,
1495       check_success_sink_flushing_seek, &id, &md, &sd);
1496 }
1497 
1498 GST_END_TEST;
1499 
GST_START_TEST(test_mpegts_flushing_seek)1500 GST_START_TEST (test_mpegts_flushing_seek)
1501 {
1502   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT;
1503   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1504   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1505 
1506   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, flushing_seek_source,
1507       setup_sink_flushing_seek, check_success_source_flushing_seek,
1508       check_success_sink_flushing_seek, &id, &md, &sd);
1509 }
1510 
1511 GST_END_TEST;
1512 
GST_START_TEST(test_mpegts_2_flushing_seek)1513 GST_START_TEST (test_mpegts_2_flushing_seek)
1514 {
1515   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT;
1516   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1517   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1518 
1519   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
1520       flushing_seek_source, setup_sink_flushing_seek,
1521       check_success_source_flushing_seek, check_success_sink_flushing_seek, &id,
1522       &md, &sd);
1523 }
1524 
1525 GST_END_TEST;
1526 
GST_START_TEST(test_live_a_flushing_seek)1527 GST_START_TEST (test_live_a_flushing_seek)
1528 {
1529   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT;
1530   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1531   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1532 
1533   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, flushing_seek_source,
1534       setup_sink_flushing_seek, check_success_source_flushing_seek,
1535       check_success_sink_flushing_seek, &id, &md, &sd);
1536 }
1537 
1538 GST_END_TEST;
1539 
GST_START_TEST(test_live_av_flushing_seek)1540 GST_START_TEST (test_live_av_flushing_seek)
1541 {
1542   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT;
1543   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1544   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1545 
1546   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, flushing_seek_source,
1547       setup_sink_flushing_seek, check_success_source_flushing_seek,
1548       check_success_sink_flushing_seek, &id, &md, &sd);
1549 }
1550 
1551 GST_END_TEST;
1552 
GST_START_TEST(test_live_av_2_flushing_seek)1553 GST_START_TEST (test_live_av_2_flushing_seek)
1554 {
1555   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT;
1556   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1557   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1558 
1559   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
1560       flushing_seek_source, setup_sink_flushing_seek,
1561       check_success_source_flushing_seek, check_success_sink_flushing_seek, &id,
1562       &md, &sd);
1563 }
1564 
1565 GST_END_TEST;
1566 
GST_START_TEST(test_empty_flushing_seek_in_pause)1567 GST_START_TEST (test_empty_flushing_seek_in_pause)
1568 {
1569   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT_PAUSED;
1570   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1571   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1572 
1573   TEST_BASE (TEST_FEATURE_TEST_SOURCE, flushing_seek_source,
1574       setup_sink_flushing_seek, check_success_source_flushing_seek,
1575       check_success_sink_flushing_seek, &id, &md, &sd);
1576 }
1577 
1578 GST_END_TEST;
1579 
GST_START_TEST(test_wavparse_flushing_seek_in_pause)1580 GST_START_TEST (test_wavparse_flushing_seek_in_pause)
1581 {
1582   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT_PAUSED;
1583   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1584   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1585 
1586   TEST_BASE (TEST_FEATURE_WAV_SOURCE, flushing_seek_source,
1587       setup_sink_flushing_seek, check_success_source_flushing_seek,
1588       check_success_sink_flushing_seek, &id, &md, &sd);
1589 }
1590 
1591 GST_END_TEST;
1592 
GST_START_TEST(test_mpegts_flushing_seek_in_pause)1593 GST_START_TEST (test_mpegts_flushing_seek_in_pause)
1594 {
1595   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT_PAUSED;
1596   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1597   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1598 
1599   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, flushing_seek_source,
1600       setup_sink_flushing_seek, check_success_source_flushing_seek,
1601       check_success_sink_flushing_seek, &id, &md, &sd);
1602 }
1603 
1604 GST_END_TEST;
1605 
GST_START_TEST(test_mpegts_2_flushing_seek_in_pause)1606 GST_START_TEST (test_mpegts_2_flushing_seek_in_pause)
1607 {
1608   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT_PAUSED;
1609   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1610   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1611 
1612   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
1613       flushing_seek_source, setup_sink_flushing_seek,
1614       check_success_source_flushing_seek,
1615       check_success_sink_flushing_seek, &id, &md, &sd);
1616 }
1617 
1618 GST_END_TEST;
1619 
GST_START_TEST(test_empty_segment_seek)1620 GST_START_TEST (test_empty_segment_seek)
1621 {
1622   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT_SEGMENT_SEEK;
1623   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1624   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1625 
1626   TEST_BASE (TEST_FEATURE_TEST_SOURCE, flushing_seek_source,
1627       setup_sink_flushing_seek, check_success_source_flushing_seek,
1628       check_success_sink_flushing_seek, &id, &md, &sd);
1629 }
1630 
1631 GST_END_TEST;
1632 
GST_START_TEST(test_wavparse_segment_seek)1633 GST_START_TEST (test_wavparse_segment_seek)
1634 {
1635   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT_SEGMENT_SEEK;
1636   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1637   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1638 
1639   TEST_BASE (TEST_FEATURE_WAV_SOURCE, flushing_seek_source,
1640       setup_sink_flushing_seek, check_success_source_flushing_seek,
1641       check_success_sink_flushing_seek, &id, &md, &sd);
1642 }
1643 
1644 GST_END_TEST;
1645 
GST_START_TEST(test_live_a_segment_seek)1646 GST_START_TEST (test_live_a_segment_seek)
1647 {
1648   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT_SEGMENT_SEEK;
1649   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1650   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1651 
1652   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE,
1653       flushing_seek_source, setup_sink_flushing_seek,
1654       check_success_source_flushing_seek,
1655       check_success_sink_flushing_seek, &id, &md, &sd);
1656 }
1657 
1658 GST_END_TEST;
1659 
GST_START_TEST(test_live_av_segment_seek)1660 GST_START_TEST (test_live_av_segment_seek)
1661 {
1662   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT_SEGMENT_SEEK;
1663   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1664   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1665 
1666   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE,
1667       flushing_seek_source, setup_sink_flushing_seek,
1668       check_success_source_flushing_seek,
1669       check_success_sink_flushing_seek, &id, &md, &sd);
1670 }
1671 
1672 GST_END_TEST;
1673 
GST_START_TEST(test_live_av_2_segment_seek)1674 GST_START_TEST (test_live_av_2_segment_seek)
1675 {
1676   flushing_seek_input_data id = FLUSHING_SEEK_INPUT_DATA_INIT_SEGMENT_SEEK;
1677   flushing_seek_master_data md = FLUSHING_SEEK_MASTER_DATA_INIT;
1678   flushing_seek_slave_data sd = FLUSHING_SEEK_SLAVE_DATA_INIT;
1679 
1680   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
1681       flushing_seek_source, setup_sink_flushing_seek,
1682       check_success_source_flushing_seek,
1683       check_success_sink_flushing_seek, &id, &md, &sd);
1684 }
1685 
1686 GST_END_TEST;
1687 
1688 /**** seek stress test ****/
1689 
1690 typedef struct
1691 {
1692   gint n_flushing_seeks;
1693   gint n_paused_seeks;
1694   gint n_segment_seeks;
1695 } seek_stress_input_data;
1696 
1697 typedef struct
1698 {
1699   gboolean got_state_changed_to_playing;
1700   gboolean got_eos;
1701   gboolean seek_sent;
1702   guint64 t0;
1703 } seek_stress_master_data;
1704 
1705 static gboolean
send_seek_stress(gpointer user_data)1706 send_seek_stress (gpointer user_data)
1707 {
1708   test_data *td = user_data;
1709   seek_stress_input_data *i = td->id;
1710   seek_stress_master_data *d = td->md;
1711   GstEvent *seek_event;
1712   unsigned int available, seekidx;
1713   GstClockTime t, base;
1714 
1715   /* Live streams don't like to be seeked too far away from the
1716      "current" time, since they're live, so always seek near the
1717      "real" time, so we still exercise seeking to another position
1718      but still land somewhere close enough to "live" position. */
1719   t = (g_get_monotonic_time () - d->t0) * 1000;
1720   base = t > GST_SECOND / 2 ? t - GST_SECOND / 2 : 0;
1721   t = base + g_random_int_range (0, GST_SECOND);
1722 
1723   /* pick a random seek type among the ones we have left */
1724   available = i->n_flushing_seeks + i->n_paused_seeks + i->n_segment_seeks;
1725   if (available == 0) {
1726     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (td->p),
1727         GST_DEBUG_GRAPH_SHOW_ALL, "inter.test.toplaying");
1728     FAIL_UNLESS (gst_element_set_state (td->p,
1729             GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE);
1730     g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline,
1731         gst_object_ref (td->p));
1732     gst_object_unref (td->p);
1733     return G_SOURCE_REMOVE;
1734   }
1735 
1736   seekidx = rand () % available;
1737   if (seekidx < i->n_flushing_seeks) {
1738     GST_INFO_OBJECT (td->p, "Sending flushing seek to %" GST_TIME_FORMAT,
1739         GST_TIME_ARGS (t));
1740     FAIL_UNLESS (gst_element_set_state (td->p,
1741             GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE);
1742     FAIL_UNLESS (gst_element_seek_simple (td->p, GST_FORMAT_TIME,
1743             GST_SEEK_FLAG_FLUSH, t));
1744     --i->n_flushing_seeks;
1745     return G_SOURCE_CONTINUE;
1746   }
1747   seekidx -= i->n_flushing_seeks;
1748 
1749   if (seekidx < i->n_paused_seeks) {
1750     GST_INFO_OBJECT (td->p,
1751         "Sending flushing seek in paused to %" GST_TIME_FORMAT,
1752         GST_TIME_ARGS (t));
1753     FAIL_UNLESS (gst_element_set_state (td->p,
1754             GST_STATE_PAUSED) != GST_STATE_CHANGE_FAILURE);
1755     FAIL_UNLESS (gst_element_seek_simple (td->p, GST_FORMAT_TIME,
1756             GST_SEEK_FLAG_FLUSH, t));
1757     --i->n_paused_seeks;
1758     return G_SOURCE_CONTINUE;
1759   }
1760   seekidx -= i->n_paused_seeks;
1761 
1762   GST_INFO_OBJECT (td->p, "Sending segment seek to %" GST_TIME_FORMAT,
1763       GST_TIME_ARGS (t));
1764   seek_event =
1765       gst_event_new_seek (1.0, GST_FORMAT_TIME,
1766       GST_SEEK_FLAG_SEGMENT | GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, t,
1767       GST_SEEK_TYPE_SET, t + 5 * GST_SECOND);
1768   FAIL_UNLESS (gst_element_send_event (td->p, seek_event));
1769   --i->n_segment_seeks;
1770   return G_SOURCE_CONTINUE;
1771 }
1772 
1773 static gboolean
seek_stress_bus_msg(GstBus * bus,GstMessage * message,gpointer user_data)1774 seek_stress_bus_msg (GstBus * bus, GstMessage * message, gpointer user_data)
1775 {
1776   test_data *td = user_data;
1777   seek_stress_master_data *d = td->md;
1778 
1779   if (GST_IS_PIPELINE (GST_MESSAGE_SRC (message))) {
1780     if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS ||
1781         GST_MESSAGE_TYPE (message) == GST_MESSAGE_SEGMENT_DONE) {
1782       d->got_eos = TRUE;
1783     }
1784   }
1785   return master_bus_msg (bus, message, user_data);
1786 }
1787 
1788 static void
seek_stress_on_state_changed(gpointer user_data)1789 seek_stress_on_state_changed (gpointer user_data)
1790 {
1791   test_data *td = user_data;
1792   seek_stress_master_data *d = td->md;
1793 
1794   if (!d->got_state_changed_to_playing) {
1795     d->got_state_changed_to_playing = TRUE;
1796     d->t0 = g_get_monotonic_time ();
1797     gst_object_ref (td->p);
1798     g_timeout_add (10, (GSourceFunc) send_seek_stress, td);
1799   }
1800 }
1801 
1802 static void
seek_stress_source(GstElement * source,gpointer user_data)1803 seek_stress_source (GstElement * source, gpointer user_data)
1804 {
1805   test_data *td = user_data;
1806   GstStateChangeReturn ret;
1807 
1808   /* we're on the source, there's already the basic master_bus_msg watch,
1809      and gst doesn't want more than one watch, so we remove the watch and
1810      call it directly when done in the new watch */
1811   gst_bus_remove_watch (GST_ELEMENT_BUS (source));
1812   gst_bus_add_watch (GST_ELEMENT_BUS (source), seek_stress_bus_msg, user_data);
1813   td->state_target = GST_STATE_PLAYING;
1814   td->state_changed_cb = seek_stress_on_state_changed;
1815   ret = gst_element_set_state (source, GST_STATE_PLAYING);
1816   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
1817 }
1818 
1819 static void
check_success_source_seek_stress(gpointer user_data)1820 check_success_source_seek_stress (gpointer user_data)
1821 {
1822   test_data *td = user_data;
1823   seek_stress_input_data *i = td->id;
1824   seek_stress_master_data *d = td->md;
1825 
1826   FAIL_UNLESS (d->got_state_changed_to_playing);
1827   FAIL_UNLESS_EQUALS_INT (i->n_flushing_seeks, 0);
1828   FAIL_UNLESS_EQUALS_INT (i->n_paused_seeks, 0);
1829   FAIL_UNLESS_EQUALS_INT (i->n_segment_seeks, 0);
1830   FAIL_IF (d->got_eos);
1831 }
1832 
GST_START_TEST(test_empty_seek_stress)1833 GST_START_TEST (test_empty_seek_stress)
1834 {
1835   seek_stress_input_data id = { 100, 100, 100 };
1836   seek_stress_master_data md = { 0 };
1837 
1838   TEST_BASE (TEST_FEATURE_TEST_SOURCE, seek_stress_source, NULL,
1839       check_success_source_seek_stress, NULL, &id, &md, NULL);
1840 }
1841 
1842 GST_END_TEST;
1843 
GST_START_TEST(test_wavparse_seek_stress)1844 GST_START_TEST (test_wavparse_seek_stress)
1845 {
1846   seek_stress_input_data id = { 100, 100, 100 };
1847   seek_stress_master_data md = { 0 };
1848 
1849   TEST_BASE (TEST_FEATURE_WAV_SOURCE, seek_stress_source, NULL,
1850       check_success_source_seek_stress, NULL, &id, &md, NULL);
1851 }
1852 
1853 GST_END_TEST;
1854 
GST_START_TEST(test_mpegts_seek_stress)1855 GST_START_TEST (test_mpegts_seek_stress)
1856 {
1857   seek_stress_input_data id = { 100, 100, 0 };
1858   seek_stress_master_data md = { 0 };
1859 
1860   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, seek_stress_source, NULL,
1861       check_success_source_seek_stress, NULL, &id, &md, NULL);
1862 }
1863 
1864 GST_END_TEST;
1865 
GST_START_TEST(test_mpegts_2_seek_stress)1866 GST_START_TEST (test_mpegts_2_seek_stress)
1867 {
1868   seek_stress_input_data id = { 100, 100, 0 };
1869   seek_stress_master_data md = { 0 };
1870 
1871   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
1872       seek_stress_source, NULL, check_success_source_seek_stress, NULL, &id,
1873       &md, NULL);
1874 }
1875 
1876 GST_END_TEST;
1877 
GST_START_TEST(test_live_a_seek_stress)1878 GST_START_TEST (test_live_a_seek_stress)
1879 {
1880   seek_stress_input_data id = { 100, 0, 100 };
1881   seek_stress_master_data md = { 0 };
1882 
1883   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE | TEST_FEATURE_LONG_DURATION,
1884       seek_stress_source, NULL, check_success_source_seek_stress, NULL, &id,
1885       &md, NULL);
1886 }
1887 
1888 GST_END_TEST;
1889 
GST_START_TEST(test_live_av_seek_stress)1890 GST_START_TEST (test_live_av_seek_stress)
1891 {
1892   seek_stress_input_data id = { 100, 0, 100 };
1893   seek_stress_master_data md = { 0 };
1894 
1895   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_LONG_DURATION,
1896       seek_stress_source, NULL, check_success_source_seek_stress, NULL, &id,
1897       &md, NULL);
1898 }
1899 
1900 GST_END_TEST;
1901 
GST_START_TEST(test_live_av_2_seek_stress)1902 GST_START_TEST (test_live_av_2_seek_stress)
1903 {
1904   seek_stress_input_data id = { 100, 0, 100 };
1905   seek_stress_master_data md = { 0 };
1906 
1907   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_LONG_DURATION |
1908       TEST_FEATURE_SPLIT_SINKS,
1909       seek_stress_source, NULL, check_success_source_seek_stress, NULL, &id,
1910       &md, NULL);
1911 }
1912 
1913 GST_END_TEST;
1914 
1915 /**** upstream query test ****/
1916 
1917 typedef struct
1918 {
1919   GstClockTime expected_duration;
1920 
1921   /* In this test, the source does a position query (in the source pipeline
1922      process), and must check its return against the last buffer timestamp
1923      in the sink pipeline process. We open a pipe to let the sink send us
1924      the timestamps it receives so the source can make the comparison. */
1925   gint ts_pipes[2];
1926 } upstream_query_input_data;
1927 
1928 typedef struct
1929 {
1930   gboolean got_state_changed_to_playing;
1931   gboolean got_correct_position;
1932   gboolean got_correct_duration;
1933   GstClockTime last_buffer_ts;
1934 } upstream_query_master_data;
1935 
1936 typedef struct
1937 {
1938   gboolean got_caps[2];
1939   gboolean got_buffer[2];
1940   GstClockTime last_buffer_ts;
1941 } upstream_query_slave_data;
1942 
1943 static gboolean
send_upstream_queries(gpointer user_data)1944 send_upstream_queries (gpointer user_data)
1945 {
1946   test_data *td = user_data;
1947   upstream_query_input_data *i = td->id;
1948   upstream_query_master_data *d = td->md;
1949   gint64 pos, dur, last;
1950 
1951   FAIL_UNLESS (gst_element_query_position (td->p, GST_FORMAT_TIME, &pos));
1952 
1953   /* read up the buffer ts sent by the sink process till the last one */
1954   while (read (i->ts_pipes[0], &last, sizeof (last)) == sizeof (last)) {
1955     /* timestamps may not be increasing because we are getting ts from
1956      * both the audio and video streams; the position query will report
1957      * the higher */
1958     if (last > d->last_buffer_ts)
1959       d->last_buffer_ts = last;
1960   }
1961   if (ABS ((gint64) (pos - d->last_buffer_ts)) <= CLOSE_ENOUGH_TO_ZERO)
1962     d->got_correct_position = TRUE;
1963 
1964   FAIL_UNLESS (gst_element_query_duration (td->p, GST_FORMAT_TIME, &dur));
1965   if (GST_CLOCK_TIME_IS_VALID (i->expected_duration)) {
1966     GstClockTimeDiff diff = GST_CLOCK_DIFF (dur, i->expected_duration);
1967     if (diff >= -CLOSE_ENOUGH_TO_ZERO && diff <= CLOSE_ENOUGH_TO_ZERO)
1968       d->got_correct_duration = TRUE;
1969   } else {
1970     if (!GST_CLOCK_TIME_IS_VALID (dur))
1971       d->got_correct_duration = TRUE;
1972   }
1973 
1974   g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline, td->p);
1975   return FALSE;
1976 }
1977 
1978 static void
upstream_query_on_state_changed(gpointer user_data)1979 upstream_query_on_state_changed (gpointer user_data)
1980 {
1981   test_data *td = user_data;
1982   upstream_query_master_data *d = td->md;
1983 
1984   if (!d->got_state_changed_to_playing) {
1985     d->got_state_changed_to_playing = TRUE;
1986     gst_object_ref (td->p);
1987     g_timeout_add (QUERY_AT, (GSourceFunc) send_upstream_queries, td);
1988   }
1989 }
1990 
1991 static void
upstream_query_source(GstElement * source,gpointer user_data)1992 upstream_query_source (GstElement * source, gpointer user_data)
1993 {
1994   test_data *td = user_data;
1995   GstStateChangeReturn ret;
1996 
1997   td->state_changed_cb = upstream_query_on_state_changed;
1998   td->state_target = GST_STATE_PLAYING;
1999   ret = gst_element_set_state (source, GST_STATE_PLAYING);
2000   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
2001 }
2002 
2003 static GstPadProbeReturn
upstream_query_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)2004 upstream_query_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2005 {
2006   test_data *td = user_data;
2007   upstream_query_input_data *i = td->id;
2008   upstream_query_slave_data *d = td->sd;
2009   GstCaps *caps;
2010 
2011   if (GST_IS_BUFFER (info->data)) {
2012     d->got_buffer[pad2idx (pad, td->two_streams)] = TRUE;
2013     if (GST_BUFFER_TIMESTAMP_IS_VALID (info->data)) {
2014       d->last_buffer_ts = GST_BUFFER_TIMESTAMP (info->data);
2015       FAIL_UNLESS (write (i->ts_pipes[1], &d->last_buffer_ts,
2016               sizeof (d->last_buffer_ts)) == sizeof (d->last_buffer_ts));
2017     }
2018   } else if (GST_IS_EVENT (info->data)) {
2019     if (GST_EVENT_TYPE (info->data) == GST_EVENT_CAPS) {
2020       gst_event_parse_caps (info->data, &caps);
2021       d->got_caps[caps2idx (caps, td->two_streams)] = TRUE;
2022     }
2023   }
2024 
2025   return GST_PAD_PROBE_OK;
2026 }
2027 
2028 static void
hook_upstream_query_probe(const GValue * v,gpointer user_data)2029 hook_upstream_query_probe (const GValue * v, gpointer user_data)
2030 {
2031   hook_probe (v, upstream_query_probe, user_data);
2032 }
2033 
2034 static void
setup_sink_upstream_query(GstElement * sink,gpointer user_data)2035 setup_sink_upstream_query (GstElement * sink, gpointer user_data)
2036 {
2037   GstIterator *it;
2038 
2039   it = gst_bin_iterate_sinks (GST_BIN (sink));
2040   while (gst_iterator_foreach (it, hook_upstream_query_probe, user_data))
2041     gst_iterator_resync (it);
2042   gst_iterator_free (it);
2043 }
2044 
2045 static void
check_success_source_upstream_query(gpointer user_data)2046 check_success_source_upstream_query (gpointer user_data)
2047 {
2048   test_data *td = user_data;
2049   upstream_query_master_data *d = td->md;
2050 
2051   FAIL_UNLESS (d->got_state_changed_to_playing);
2052   FAIL_UNLESS (d->got_correct_position);
2053   FAIL_UNLESS (d->got_correct_duration);
2054 }
2055 
2056 static void
check_success_sink_upstream_query(gpointer user_data)2057 check_success_sink_upstream_query (gpointer user_data)
2058 {
2059   test_data *td = user_data;
2060   upstream_query_slave_data *d = td->sd;
2061   int idx;
2062 
2063   for (idx = 0; idx < (td->two_streams ? 2 : 1); ++idx) {
2064     FAIL_UNLESS (d->got_caps[idx]);
2065     FAIL_UNLESS (d->got_buffer[idx]);
2066   }
2067 }
2068 
GST_START_TEST(test_empty_upstream_query)2069 GST_START_TEST (test_empty_upstream_query)
2070 {
2071   upstream_query_input_data id = { GST_CLOCK_TIME_NONE, };
2072   upstream_query_master_data md = { 0 };
2073   upstream_query_slave_data sd = { {0}
2074   };
2075 
2076   FAIL_UNLESS (pipe2 (id.ts_pipes, O_NONBLOCK) == 0);
2077   TEST_BASE (TEST_FEATURE_TEST_SOURCE, upstream_query_source,
2078       setup_sink_upstream_query, check_success_source_upstream_query,
2079       check_success_sink_upstream_query, &id, &md, &sd);
2080   close (id.ts_pipes[0]);
2081   close (id.ts_pipes[1]);
2082 }
2083 
2084 GST_END_TEST;
2085 
GST_START_TEST(test_wavparse_upstream_query)2086 GST_START_TEST (test_wavparse_upstream_query)
2087 {
2088   upstream_query_input_data id = { WAV_SAMPLE_ROUGH_DURATION, };
2089   upstream_query_master_data md = { 0 };
2090   upstream_query_slave_data sd = { {0}
2091   };
2092 
2093   FAIL_UNLESS (pipe2 (id.ts_pipes, O_NONBLOCK) == 0);
2094   TEST_BASE (TEST_FEATURE_WAV_SOURCE, upstream_query_source,
2095       setup_sink_upstream_query, check_success_source_upstream_query,
2096       check_success_sink_upstream_query, &id, &md, &sd);
2097   close (id.ts_pipes[0]);
2098   close (id.ts_pipes[1]);
2099 }
2100 
2101 GST_END_TEST;
2102 
GST_START_TEST(test_mpegts_upstream_query)2103 GST_START_TEST (test_mpegts_upstream_query)
2104 {
2105   upstream_query_input_data id = { MPEGTS_SAMPLE_ROUGH_DURATION, };
2106   upstream_query_master_data md = { 0 };
2107   upstream_query_slave_data sd = { {0}
2108   };
2109 
2110   FAIL_UNLESS (pipe2 (id.ts_pipes, O_NONBLOCK) == 0);
2111   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, upstream_query_source,
2112       setup_sink_upstream_query, check_success_source_upstream_query,
2113       check_success_sink_upstream_query, &id, &md, &sd);
2114   close (id.ts_pipes[0]);
2115   close (id.ts_pipes[1]);
2116 }
2117 
2118 GST_END_TEST;
2119 
GST_START_TEST(test_mpegts_2_upstream_query)2120 GST_START_TEST (test_mpegts_2_upstream_query)
2121 {
2122   upstream_query_input_data id = { MPEGTS_SAMPLE_ROUGH_DURATION, };
2123   upstream_query_master_data md = { 0 };
2124   upstream_query_slave_data sd = { {0}
2125   };
2126 
2127   FAIL_UNLESS (pipe2 (id.ts_pipes, O_NONBLOCK) == 0);
2128   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
2129       upstream_query_source, setup_sink_upstream_query,
2130       check_success_source_upstream_query, check_success_sink_upstream_query,
2131       &id, &md, &sd);
2132   close (id.ts_pipes[0]);
2133   close (id.ts_pipes[1]);
2134 }
2135 
2136 GST_END_TEST;
2137 
GST_START_TEST(test_live_a_upstream_query)2138 GST_START_TEST (test_live_a_upstream_query)
2139 {
2140   upstream_query_input_data id = { GST_CLOCK_TIME_NONE, };
2141   upstream_query_master_data md = { 0 };
2142   upstream_query_slave_data sd = { {0}
2143   };
2144 
2145   FAIL_UNLESS (pipe2 (id.ts_pipes, O_NONBLOCK) == 0);
2146   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE,
2147       upstream_query_source, setup_sink_upstream_query,
2148       check_success_source_upstream_query, check_success_sink_upstream_query,
2149       &id, &md, &sd);
2150   close (id.ts_pipes[0]);
2151   close (id.ts_pipes[1]);
2152 }
2153 
2154 GST_END_TEST;
2155 
GST_START_TEST(test_live_av_upstream_query)2156 GST_START_TEST (test_live_av_upstream_query)
2157 {
2158   upstream_query_input_data id = { GST_CLOCK_TIME_NONE, };
2159   upstream_query_master_data md = { 0 };
2160   upstream_query_slave_data sd = { {0}
2161   };
2162 
2163   FAIL_UNLESS (pipe2 (id.ts_pipes, O_NONBLOCK) == 0);
2164   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE,
2165       upstream_query_source, setup_sink_upstream_query,
2166       check_success_source_upstream_query, check_success_sink_upstream_query,
2167       &id, &md, &sd);
2168   close (id.ts_pipes[0]);
2169   close (id.ts_pipes[1]);
2170 }
2171 
2172 GST_END_TEST;
2173 
GST_START_TEST(test_live_av_2_upstream_query)2174 GST_START_TEST (test_live_av_2_upstream_query)
2175 {
2176   upstream_query_input_data id = { GST_CLOCK_TIME_NONE, };
2177   upstream_query_master_data md = { 0 };
2178   upstream_query_slave_data sd = { {0}
2179   };
2180 
2181   FAIL_UNLESS (pipe2 (id.ts_pipes, O_NONBLOCK) == 0);
2182   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
2183       upstream_query_source, setup_sink_upstream_query,
2184       check_success_source_upstream_query, check_success_sink_upstream_query,
2185       &id, &md, &sd);
2186   close (id.ts_pipes[0]);
2187   close (id.ts_pipes[1]);
2188 }
2189 
2190 GST_END_TEST;
2191 
2192 /**** message test ****/
2193 
2194 typedef struct
2195 {
2196   gboolean got_state_changed_to_playing;
2197   guint8 num_got_message;
2198   guint8 num_sent_message;
2199 } message_master_data;
2200 
2201 static void
send_ipcpipeline_test_message_event(const GValue * v,gpointer user_data)2202 send_ipcpipeline_test_message_event (const GValue * v, gpointer user_data)
2203 {
2204   test_data *td = user_data;
2205   message_master_data *d = td->md;
2206   GstElement *element = g_value_get_object (v);
2207   GstMessage *msg;
2208   gboolean ret;
2209 
2210   d->num_sent_message++;
2211 
2212   msg = gst_message_new_element (GST_OBJECT (element),
2213       gst_structure_new_empty ("ipcpipeline-test"));
2214   ret = gst_element_send_event (element,
2215       gst_event_new_sink_message ("ipcpipeline-test", msg));
2216   FAIL_UNLESS (ret);
2217   gst_message_unref (msg);
2218 }
2219 
2220 static gboolean
send_sink_message(gpointer user_data)2221 send_sink_message (gpointer user_data)
2222 {
2223   test_data *td = user_data;
2224   GstIterator *it;
2225 
2226   it = gst_bin_iterate_sources (GST_BIN (td->p));
2227   while (gst_iterator_foreach (it, send_ipcpipeline_test_message_event, td))
2228     gst_iterator_resync (it);
2229   gst_iterator_free (it);
2230 
2231   gst_object_unref (td->p);
2232   return G_SOURCE_REMOVE;
2233 }
2234 
2235 static gboolean
message_bus_msg(GstBus * bus,GstMessage * message,gpointer user_data)2236 message_bus_msg (GstBus * bus, GstMessage * message, gpointer user_data)
2237 {
2238   test_data *td = user_data;
2239   message_master_data *d = td->md;
2240   const GstStructure *structure;
2241 
2242   if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ELEMENT) {
2243     structure = gst_message_get_structure (message);
2244     FAIL_UNLESS (structure);
2245     if (gst_structure_has_name (structure, "ipcpipeline-test")) {
2246       d->num_got_message++;
2247       if (d->num_got_message == d->num_sent_message)
2248         g_main_loop_quit (loop);
2249     }
2250   }
2251   return master_bus_msg (bus, message, user_data);
2252 }
2253 
2254 static void
message_on_state_changed(gpointer user_data)2255 message_on_state_changed (gpointer user_data)
2256 {
2257   test_data *td = user_data;
2258   message_master_data *d = td->md;
2259 
2260   if (!d->got_state_changed_to_playing) {
2261     d->got_state_changed_to_playing = TRUE;
2262     gst_object_ref (td->p);
2263     g_timeout_add (MESSAGE_AT, (GSourceFunc) send_sink_message, td);
2264   }
2265 }
2266 
2267 static void
message_source(GstElement * source,gpointer user_data)2268 message_source (GstElement * source, gpointer user_data)
2269 {
2270   test_data *td = user_data;
2271   GstStateChangeReturn ret;
2272 
2273   /* we're on the source, there's already the basic master_bus_msg watch,
2274      and gst doesn't want more than one watch, so we remove the watch and
2275      call it directly when done in the new watch */
2276   gst_bus_remove_watch (GST_ELEMENT_BUS (source));
2277   gst_bus_add_watch (GST_ELEMENT_BUS (source), message_bus_msg, user_data);
2278   td->state_target = GST_STATE_PLAYING;
2279   td->state_changed_cb = message_on_state_changed;
2280   ret = gst_element_set_state (source, GST_STATE_PLAYING);
2281   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
2282 }
2283 
2284 static void
check_success_source_message(gpointer user_data)2285 check_success_source_message (gpointer user_data)
2286 {
2287   test_data *td = user_data;
2288   message_master_data *d = td->md;
2289 
2290   FAIL_UNLESS (d->got_state_changed_to_playing);
2291   FAIL_UNLESS_EQUALS_INT (d->num_got_message, d->num_sent_message);
2292 }
2293 
GST_START_TEST(test_empty_message)2294 GST_START_TEST (test_empty_message)
2295 {
2296   message_master_data md = { 0 };
2297   TEST_BASE (TEST_FEATURE_TEST_SOURCE, message_source, NULL,
2298       check_success_source_message, NULL, NULL, &md, NULL);
2299 }
2300 
2301 GST_END_TEST;
2302 
GST_START_TEST(test_wavparse_message)2303 GST_START_TEST (test_wavparse_message)
2304 {
2305   message_master_data md = { 0 };
2306   TEST_BASE (TEST_FEATURE_WAV_SOURCE, message_source, NULL,
2307       check_success_source_message, NULL, NULL, &md, NULL);
2308 }
2309 
2310 GST_END_TEST;
2311 
GST_START_TEST(test_live_a_message)2312 GST_START_TEST (test_live_a_message)
2313 {
2314   message_master_data md = { 0 };
2315   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, message_source, NULL,
2316       check_success_source_message, NULL, NULL, &md, NULL);
2317 }
2318 
2319 GST_END_TEST;
2320 
GST_START_TEST(test_live_av_message)2321 GST_START_TEST (test_live_av_message)
2322 {
2323   message_master_data md = { 0 };
2324   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, message_source, NULL,
2325       check_success_source_message, NULL, NULL, &md, NULL);
2326 }
2327 
2328 GST_END_TEST;
2329 
GST_START_TEST(test_live_av_2_message)2330 GST_START_TEST (test_live_av_2_message)
2331 {
2332   message_master_data md = { 0 };
2333   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
2334       message_source, NULL, check_success_source_message, NULL, NULL, &md,
2335       NULL);
2336 }
2337 
2338 GST_END_TEST;
2339 
2340 /**** end of stream test ****/
2341 
2342 typedef struct
2343 {
2344   gboolean got_state_changed_to_playing;
2345 } end_of_stream_master_data;
2346 
2347 typedef struct
2348 {
2349   gboolean got_buffer[2];
2350   gboolean got_eos[2];
2351 } end_of_stream_slave_data;
2352 
2353 static void
end_of_stream_on_state_changed(gpointer user_data)2354 end_of_stream_on_state_changed (gpointer user_data)
2355 {
2356   test_data *td = user_data;
2357   end_of_stream_master_data *d = td->md;
2358 
2359   if (!d->got_state_changed_to_playing)
2360     d->got_state_changed_to_playing = TRUE;
2361 }
2362 
2363 static void
end_of_stream_source(GstElement * source,gpointer user_data)2364 end_of_stream_source (GstElement * source, gpointer user_data)
2365 {
2366   test_data *td = user_data;
2367   GstStateChangeReturn ret;
2368 
2369   td->state_changed_cb = end_of_stream_on_state_changed;
2370   td->state_target = GST_STATE_PLAYING;
2371   ret = gst_element_set_state (source, GST_STATE_PLAYING);
2372   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
2373 }
2374 
2375 static GstPadProbeReturn
end_of_stream_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)2376 end_of_stream_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2377 {
2378   test_data *td = user_data;
2379   end_of_stream_slave_data *d = td->sd;
2380 
2381   if (GST_IS_BUFFER (info->data)) {
2382     d->got_buffer[pad2idx (pad, td->two_streams)] = TRUE;
2383   } else if (GST_IS_EVENT (info->data)) {
2384     if (GST_EVENT_TYPE (info->data) == GST_EVENT_EOS) {
2385       d->got_eos[pad2idx (pad, td->two_streams)] = TRUE;
2386     }
2387   }
2388 
2389   return GST_PAD_PROBE_OK;
2390 }
2391 
2392 static void
hook_end_of_stream_probe(const GValue * v,gpointer user_data)2393 hook_end_of_stream_probe (const GValue * v, gpointer user_data)
2394 {
2395   hook_probe (v, end_of_stream_probe, user_data);
2396 }
2397 
2398 static void
setup_sink_end_of_stream(GstElement * sink,gpointer user_data)2399 setup_sink_end_of_stream (GstElement * sink, gpointer user_data)
2400 {
2401   GstIterator *it;
2402 
2403   it = gst_bin_iterate_sinks (GST_BIN (sink));
2404   while (gst_iterator_foreach (it, hook_end_of_stream_probe, user_data))
2405     gst_iterator_resync (it);
2406   gst_iterator_free (it);
2407 }
2408 
2409 static void
check_success_source_end_of_stream(gpointer user_data)2410 check_success_source_end_of_stream (gpointer user_data)
2411 {
2412   test_data *td = user_data;
2413   end_of_stream_master_data *d = td->md;
2414 
2415   FAIL_UNLESS (d->got_state_changed_to_playing);
2416 }
2417 
2418 static void
check_success_sink_end_of_stream(gpointer user_data)2419 check_success_sink_end_of_stream (gpointer user_data)
2420 {
2421   test_data *td = user_data;
2422   end_of_stream_slave_data *d = td->sd;
2423   int idx;
2424 
2425   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
2426     FAIL_UNLESS (d->got_buffer[idx]);
2427     FAIL_UNLESS (d->got_eos[idx]);
2428   }
2429 }
2430 
GST_START_TEST(test_empty_end_of_stream)2431 GST_START_TEST (test_empty_end_of_stream)
2432 {
2433   end_of_stream_master_data md = { 0 };
2434   end_of_stream_slave_data sd = { {0}
2435   };
2436 
2437   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_ASYNC_SINK,
2438       end_of_stream_source, setup_sink_end_of_stream,
2439       check_success_source_end_of_stream, check_success_sink_end_of_stream,
2440       NULL, &md, &sd);
2441 }
2442 
2443 GST_END_TEST;
2444 
GST_START_TEST(test_wavparse_end_of_stream)2445 GST_START_TEST (test_wavparse_end_of_stream)
2446 {
2447   end_of_stream_master_data md = { 0 };
2448   end_of_stream_slave_data sd = { {0}
2449   };
2450 
2451   TEST_BASE (TEST_FEATURE_WAV_SOURCE | TEST_FEATURE_ASYNC_SINK,
2452       end_of_stream_source, setup_sink_end_of_stream,
2453       check_success_source_end_of_stream, check_success_sink_end_of_stream,
2454       NULL, &md, &sd);
2455 }
2456 
2457 GST_END_TEST;
2458 
GST_START_TEST(test_mpegts_end_of_stream)2459 GST_START_TEST (test_mpegts_end_of_stream)
2460 {
2461   end_of_stream_master_data md = { 0 };
2462   end_of_stream_slave_data sd = { {0}
2463   };
2464 
2465   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_ASYNC_SINK,
2466       end_of_stream_source, setup_sink_end_of_stream,
2467       check_success_source_end_of_stream, check_success_sink_end_of_stream,
2468       NULL, &md, &sd);
2469 }
2470 
2471 GST_END_TEST;
2472 
GST_START_TEST(test_mpegts_2_end_of_stream)2473 GST_START_TEST (test_mpegts_2_end_of_stream)
2474 {
2475   end_of_stream_master_data md = { 0 };
2476   end_of_stream_slave_data sd = { {0}
2477   };
2478 
2479   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS |
2480       TEST_FEATURE_ASYNC_SINK,
2481       end_of_stream_source, setup_sink_end_of_stream,
2482       check_success_source_end_of_stream, check_success_sink_end_of_stream,
2483       NULL, &md, &sd);
2484 }
2485 
2486 GST_END_TEST;
2487 
GST_START_TEST(test_live_a_end_of_stream)2488 GST_START_TEST (test_live_a_end_of_stream)
2489 {
2490   end_of_stream_master_data md = { 0 };
2491   end_of_stream_slave_data sd = { {0}
2492   };
2493 
2494   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE,
2495       end_of_stream_source, setup_sink_end_of_stream,
2496       check_success_source_end_of_stream, check_success_sink_end_of_stream,
2497       NULL, &md, &sd);
2498 }
2499 
2500 GST_END_TEST;
2501 
GST_START_TEST(test_live_av_end_of_stream)2502 GST_START_TEST (test_live_av_end_of_stream)
2503 {
2504   end_of_stream_master_data md = { 0 };
2505   end_of_stream_slave_data sd = { {0}
2506   };
2507 
2508   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE,
2509       end_of_stream_source, setup_sink_end_of_stream,
2510       check_success_source_end_of_stream, check_success_sink_end_of_stream,
2511       NULL, &md, &sd);
2512 }
2513 
2514 GST_END_TEST;
2515 
GST_START_TEST(test_live_av_2_end_of_stream)2516 GST_START_TEST (test_live_av_2_end_of_stream)
2517 {
2518   end_of_stream_master_data md = { 0 };
2519   end_of_stream_slave_data sd = { {0}
2520   };
2521 
2522   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
2523       end_of_stream_source, setup_sink_end_of_stream,
2524       check_success_source_end_of_stream, check_success_sink_end_of_stream,
2525       NULL, &md, &sd);
2526 }
2527 
2528 GST_END_TEST;
2529 
2530 /**** reverse playback test ****/
2531 
2532 typedef struct
2533 {
2534   gboolean got_state_changed_to_playing;
2535   gboolean seek_sent;
2536 } reverse_playback_master_data;
2537 
2538 typedef struct
2539 {
2540   gboolean got_segment_with_negative_rate;
2541   gboolean got_buffer_after_segment_with_negative_rate;
2542   GstClockTime first_backward_buffer_timestamp;
2543   gboolean got_buffer_one_second_early;
2544 } reverse_playback_slave_data;
2545 
2546 static gboolean
play_backwards(gpointer user_data)2547 play_backwards (gpointer user_data)
2548 {
2549   test_data *td = user_data;
2550   reverse_playback_master_data *d = td->md;
2551   gint64 pos;
2552   gboolean ret;
2553 
2554   FAIL_UNLESS (gst_element_query_position (td->p, GST_FORMAT_TIME, &pos));
2555 
2556   ret =
2557       gst_element_seek (td->p, -0.5, GST_FORMAT_TIME, 0, GST_SEEK_TYPE_SET, 0,
2558       GST_SEEK_TYPE_SET, pos);
2559   FAIL_UNLESS (ret);
2560   d->seek_sent = TRUE;
2561 
2562   gst_object_unref (td->p);
2563   return G_SOURCE_REMOVE;
2564 }
2565 
2566 static void
reverse_playback_on_state_changed(gpointer user_data)2567 reverse_playback_on_state_changed (gpointer user_data)
2568 {
2569   test_data *td = user_data;
2570   reverse_playback_master_data *d = td->md;
2571 
2572   if (!d->got_state_changed_to_playing) {
2573     d->got_state_changed_to_playing = TRUE;
2574     gst_object_ref (td->p);
2575     g_timeout_add (2000, (GSourceFunc) play_backwards, td);
2576   }
2577 }
2578 
2579 static void
reverse_playback_source(GstElement * source,gpointer user_data)2580 reverse_playback_source (GstElement * source, gpointer user_data)
2581 {
2582   test_data *td = user_data;
2583   GstStateChangeReturn ret;
2584 
2585   td->state_target = GST_STATE_PLAYING;
2586   td->state_changed_cb = reverse_playback_on_state_changed;
2587   ret = gst_element_set_state (source, GST_STATE_PLAYING);
2588   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
2589 }
2590 
2591 static GstPadProbeReturn
reverse_playback_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)2592 reverse_playback_probe (GstPad * pad, GstPadProbeInfo * info,
2593     gpointer user_data)
2594 {
2595   test_data *td = user_data;
2596   reverse_playback_slave_data *d = td->sd;
2597 
2598   if (GST_IS_EVENT (info->data)) {
2599     if (GST_EVENT_TYPE (info->data) == GST_EVENT_SEGMENT) {
2600       const GstSegment *s;
2601       gst_event_parse_segment (GST_EVENT (info->data), &s);
2602       if (s->rate < 0)
2603         d->got_segment_with_negative_rate = TRUE;
2604     }
2605   } else if (GST_IS_BUFFER (info->data)) {
2606     GstClockTime ts = GST_BUFFER_TIMESTAMP (info->data);
2607     if (GST_CLOCK_TIME_IS_VALID (ts)) {
2608       if (d->got_segment_with_negative_rate) {
2609         if (d->got_buffer_after_segment_with_negative_rate) {
2610           /* We test for 1 second, not just earlier, to make sure we don't
2611              just see B frames, or whatever else */
2612           if (ts < d->first_backward_buffer_timestamp - GST_SECOND) {
2613             d->got_buffer_one_second_early = TRUE;
2614           }
2615         } else {
2616           d->got_buffer_after_segment_with_negative_rate = TRUE;
2617           d->first_backward_buffer_timestamp = ts;
2618         }
2619       }
2620     }
2621   }
2622 
2623   return GST_PAD_PROBE_OK;
2624 }
2625 
2626 static void
hook_reverse_playback_probe(const GValue * v,gpointer user_data)2627 hook_reverse_playback_probe (const GValue * v, gpointer user_data)
2628 {
2629   hook_probe (v, reverse_playback_probe, user_data);
2630 }
2631 
2632 static void
setup_sink_reverse_playback(GstElement * sink,gpointer user_data)2633 setup_sink_reverse_playback (GstElement * sink, gpointer user_data)
2634 {
2635   GstIterator *it;
2636 
2637   it = gst_bin_iterate_sinks (GST_BIN (sink));
2638   while (gst_iterator_foreach (it, hook_reverse_playback_probe, user_data))
2639     gst_iterator_resync (it);
2640   gst_iterator_free (it);
2641 }
2642 
2643 static void
check_success_source_reverse_playback(gpointer user_data)2644 check_success_source_reverse_playback (gpointer user_data)
2645 {
2646   test_data *td = user_data;
2647   reverse_playback_master_data *d = td->md;
2648 
2649   FAIL_UNLESS (d->got_state_changed_to_playing);
2650   FAIL_UNLESS (d->seek_sent);
2651 }
2652 
2653 static void
check_success_sink_reverse_playback(gpointer user_data)2654 check_success_sink_reverse_playback (gpointer user_data)
2655 {
2656   test_data *td = user_data;
2657   reverse_playback_slave_data *d = td->sd;
2658 
2659   FAIL_UNLESS (d->got_segment_with_negative_rate);
2660   FAIL_UNLESS (d->got_buffer_after_segment_with_negative_rate);
2661   FAIL_UNLESS (GST_CLOCK_TIME_IS_VALID (d->first_backward_buffer_timestamp));
2662   FAIL_UNLESS (d->first_backward_buffer_timestamp >= GST_SECOND);
2663   FAIL_UNLESS (d->got_buffer_one_second_early);
2664 }
2665 
GST_START_TEST(test_a_reverse_playback)2666 GST_START_TEST (test_a_reverse_playback)
2667 {
2668   reverse_playback_master_data md = { 0 };
2669   reverse_playback_slave_data sd = { 0 };
2670 
2671   TEST_BASE (TEST_FEATURE_TEST_SOURCE,
2672       reverse_playback_source, setup_sink_reverse_playback,
2673       check_success_source_reverse_playback,
2674       check_success_sink_reverse_playback, NULL, &md, &sd);
2675 }
2676 
2677 GST_END_TEST;
2678 
GST_START_TEST(test_av_reverse_playback)2679 GST_START_TEST (test_av_reverse_playback)
2680 {
2681   reverse_playback_master_data md = { 0 };
2682   reverse_playback_slave_data sd = { 0 };
2683 
2684   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_HAS_VIDEO,
2685       reverse_playback_source, setup_sink_reverse_playback,
2686       check_success_source_reverse_playback,
2687       check_success_sink_reverse_playback, NULL, &md, &sd);
2688 }
2689 
2690 GST_END_TEST;
2691 
GST_START_TEST(test_av_2_reverse_playback)2692 GST_START_TEST (test_av_2_reverse_playback)
2693 {
2694   reverse_playback_master_data md = { 0 };
2695   reverse_playback_slave_data sd = { 0 };
2696 
2697   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_HAS_VIDEO |
2698       TEST_FEATURE_SPLIT_SINKS,
2699       reverse_playback_source, setup_sink_reverse_playback,
2700       check_success_source_reverse_playback,
2701       check_success_sink_reverse_playback, NULL, &md, &sd);
2702 }
2703 
2704 GST_END_TEST;
2705 
2706 /**** tags test ****/
2707 
2708 enum
2709 {
2710   TEST_TAG_EMPTY,
2711   TEST_TAG_TWO_TAGS,
2712   N_TEST_TAGS
2713 };
2714 
2715 typedef struct
2716 {
2717   gboolean got_state_changed_to_playing;
2718   gboolean tags_sent[2][N_TEST_TAGS];
2719 } tags_master_data;
2720 
2721 typedef struct
2722 {
2723   gboolean tags_received[N_TEST_TAGS];
2724 } tags_slave_data;
2725 
2726 static void
send_tags_on_pad(GstPad * pad,gpointer user_data)2727 send_tags_on_pad (GstPad * pad, gpointer user_data)
2728 {
2729   test_data *td = user_data;
2730   tags_master_data *d = td->md;
2731   GstEvent *e;
2732   gint idx;
2733 
2734   idx = pad2idx (pad, td->two_streams);
2735 
2736   e = gst_event_new_tag (gst_tag_list_new_empty ());
2737   FAIL_UNLESS (gst_pad_send_event (pad, e));
2738   d->tags_sent[idx][TEST_TAG_EMPTY] = TRUE;
2739 
2740   e = gst_event_new_tag (gst_tag_list_new (GST_TAG_TITLE, "title",
2741           GST_TAG_BITRATE, 56000, NULL));
2742   FAIL_UNLESS (gst_pad_send_event (pad, e));
2743   d->tags_sent[idx][TEST_TAG_TWO_TAGS] = TRUE;
2744 }
2745 
2746 static GstPadProbeReturn
tags_probe_source(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)2747 tags_probe_source (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2748 {
2749   test_data *td = user_data;
2750   tags_master_data *d = td->md;
2751   GstClockTime ts;
2752 
2753   if (GST_IS_BUFFER (info->data)) {
2754     ts = GST_BUFFER_TIMESTAMP (info->data);
2755     if (GST_CLOCK_TIME_IS_VALID (ts) && ts > STEP_AT * GST_MSECOND) {
2756       gint idx = pad2idx (pad, td->two_streams);
2757       if (!d->tags_sent[idx][0]) {
2758         GstPad *peer = gst_pad_get_peer (pad);
2759         FAIL_UNLESS (peer);
2760         send_tags_on_pad (peer, td);
2761         gst_object_unref (peer);
2762         EXCLUSIVE_CALL (td, g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline,
2763                 gst_object_ref (td->p)));
2764       }
2765     }
2766   }
2767 
2768   return GST_PAD_PROBE_OK;
2769 }
2770 
2771 static void
hook_tags_probe_source(const GValue * v,gpointer user_data)2772 hook_tags_probe_source (const GValue * v, gpointer user_data)
2773 {
2774   hook_peer_probe_types (v, tags_probe_source,
2775       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM, user_data);
2776 }
2777 
2778 static void
tags_on_state_changed(gpointer user_data)2779 tags_on_state_changed (gpointer user_data)
2780 {
2781   test_data *td = user_data;
2782   tags_master_data *d = td->md;
2783   GstIterator *it;
2784 
2785   if (!d->got_state_changed_to_playing) {
2786     d->got_state_changed_to_playing = TRUE;
2787 
2788     it = gst_bin_iterate_sinks (GST_BIN (td->p));
2789     while (gst_iterator_foreach (it, hook_tags_probe_source, user_data))
2790       gst_iterator_resync (it);
2791     gst_iterator_free (it);
2792   }
2793 }
2794 
2795 static void
tags_source(GstElement * source,gpointer user_data)2796 tags_source (GstElement * source, gpointer user_data)
2797 {
2798   test_data *td = user_data;
2799   GstStateChangeReturn ret;
2800 
2801   td->state_target = GST_STATE_PLAYING;
2802   td->state_changed_cb = tags_on_state_changed;
2803   ret = gst_element_set_state (source, GST_STATE_PLAYING);
2804   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
2805 }
2806 
2807 static GstPadProbeReturn
tags_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)2808 tags_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2809 {
2810   test_data *td = user_data;
2811   tags_slave_data *d = td->sd;
2812   guint funsigned;
2813   gchar *fstring = NULL;
2814 
2815   if (GST_IS_EVENT (info->data)) {
2816     if (GST_EVENT_TYPE (info->data) == GST_EVENT_TAG) {
2817       GstTagList *taglist = NULL;
2818       gst_event_parse_tag (GST_EVENT (info->data), &taglist);
2819       FAIL_UNLESS (taglist);
2820       if (gst_tag_list_is_empty (taglist)) {
2821         d->tags_received[TEST_TAG_EMPTY] = TRUE;
2822       } else if (gst_tag_list_get_string (taglist, GST_TAG_TITLE, &fstring)
2823           && !strcmp (fstring, "title")
2824           && gst_tag_list_get_uint (taglist, GST_TAG_BITRATE, &funsigned)
2825           && funsigned == 56000) {
2826         d->tags_received[TEST_TAG_TWO_TAGS] = TRUE;
2827       }
2828     }
2829   }
2830   g_free (fstring);
2831 
2832   return GST_PAD_PROBE_OK;
2833 }
2834 
2835 static void
hook_tags_probe(const GValue * v,gpointer user_data)2836 hook_tags_probe (const GValue * v, gpointer user_data)
2837 {
2838   hook_probe (v, tags_probe, user_data);
2839 }
2840 
2841 static void
setup_sink_tags(GstElement * sink,gpointer user_data)2842 setup_sink_tags (GstElement * sink, gpointer user_data)
2843 {
2844   GstIterator *it;
2845 
2846   it = gst_bin_iterate_sinks (GST_BIN (sink));
2847   while (gst_iterator_foreach (it, hook_tags_probe, user_data))
2848     gst_iterator_resync (it);
2849   gst_iterator_free (it);
2850 }
2851 
2852 static void
check_success_source_tags(gpointer user_data)2853 check_success_source_tags (gpointer user_data)
2854 {
2855   test_data *td = user_data;
2856   tags_master_data *d = td->md;
2857   gint n;
2858 
2859   FAIL_UNLESS (d->got_state_changed_to_playing);
2860   for (n = 0; n < N_TEST_TAGS; ++n) {
2861     FAIL_UNLESS (d->tags_sent[n]);
2862   }
2863 }
2864 
2865 static void
check_success_sink_tags(gpointer user_data)2866 check_success_sink_tags (gpointer user_data)
2867 {
2868   test_data *td = user_data;
2869   tags_slave_data *d = td->sd;
2870   gint n;
2871 
2872   for (n = 0; n < N_TEST_TAGS; ++n) {
2873     FAIL_UNLESS (d->tags_received[n]);
2874   }
2875 }
2876 
GST_START_TEST(test_empty_tags)2877 GST_START_TEST (test_empty_tags)
2878 {
2879   tags_master_data md = { 0 };
2880   tags_slave_data sd = { {0}
2881   };
2882 
2883   TEST_BASE (TEST_FEATURE_TEST_SOURCE, tags_source, setup_sink_tags,
2884       check_success_source_tags, check_success_sink_tags, NULL, &md, &sd);
2885 }
2886 
2887 GST_END_TEST;
2888 
GST_START_TEST(test_wavparse_tags)2889 GST_START_TEST (test_wavparse_tags)
2890 {
2891   tags_master_data md = { 0 };
2892   tags_slave_data sd = { {0}
2893   };
2894 
2895   TEST_BASE (TEST_FEATURE_WAV_SOURCE, tags_source, setup_sink_tags,
2896       check_success_source_tags, check_success_sink_tags, NULL, &md, &sd);
2897 }
2898 
2899 GST_END_TEST;
2900 
GST_START_TEST(test_mpegts_tags)2901 GST_START_TEST (test_mpegts_tags)
2902 {
2903   tags_master_data md = { 0 };
2904   tags_slave_data sd = { {0}
2905   };
2906 
2907   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, tags_source, setup_sink_tags,
2908       check_success_source_tags, check_success_sink_tags, NULL, &md, &sd);
2909 }
2910 
2911 GST_END_TEST;
2912 
GST_START_TEST(test_mpegts_2_tags)2913 GST_START_TEST (test_mpegts_2_tags)
2914 {
2915   tags_master_data md = { 0 };
2916   tags_slave_data sd = { {0}
2917   };
2918 
2919   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS, tags_source,
2920       setup_sink_tags, check_success_source_tags, check_success_sink_tags, NULL,
2921       &md, &sd);
2922 }
2923 
2924 GST_END_TEST;
2925 
GST_START_TEST(test_live_a_tags)2926 GST_START_TEST (test_live_a_tags)
2927 {
2928   tags_master_data md = { 0 };
2929   tags_slave_data sd = { {0}
2930   };
2931 
2932   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, tags_source, setup_sink_tags,
2933       check_success_source_tags, check_success_sink_tags, NULL, &md, &sd);
2934 }
2935 
2936 GST_END_TEST;
2937 
GST_START_TEST(test_live_av_tags)2938 GST_START_TEST (test_live_av_tags)
2939 {
2940   tags_master_data md = { 0 };
2941   tags_slave_data sd = { {0}
2942   };
2943 
2944   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, tags_source, setup_sink_tags,
2945       check_success_source_tags, check_success_sink_tags, NULL, &md, &sd);
2946 }
2947 
2948 GST_END_TEST;
2949 
GST_START_TEST(test_live_av_2_tags)2950 GST_START_TEST (test_live_av_2_tags)
2951 {
2952   tags_master_data md = { 0 };
2953   tags_slave_data sd = { {0}
2954   };
2955 
2956   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
2957       tags_source, setup_sink_tags, check_success_source_tags,
2958       check_success_sink_tags, NULL, &md, &sd);
2959 }
2960 
2961 GST_END_TEST;
2962 
2963 /**** nagivation test ****/
2964 
2965 enum
2966 {
2967   TEST_NAV_MOUSE_MOVE,
2968   TEST_NAV_KEY_PRESS,
2969   N_NAVIGATION_EVENTS
2970 };
2971 
2972 typedef struct
2973 {
2974   gboolean got_state_changed_to_playing;
2975   gboolean navigation_received[N_NAVIGATION_EVENTS];
2976 } navigation_master_data;
2977 
2978 typedef struct
2979 {
2980   gboolean started;
2981   gboolean navigation_sent[N_NAVIGATION_EVENTS];
2982   gint step;
2983 } navigation_slave_data;
2984 
2985 static GstPadProbeReturn
navigation_probe_source(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)2986 navigation_probe_source (GstPad * pad, GstPadProbeInfo * info,
2987     gpointer user_data)
2988 {
2989   test_data *td = user_data;
2990   navigation_master_data *d = td->md;
2991   const GstStructure *s;
2992   const gchar *string, *key;
2993   double x, y;
2994 
2995   if (GST_IS_EVENT (info->data)) {
2996     if (GST_EVENT_TYPE (info->data) == GST_EVENT_NAVIGATION) {
2997       s = gst_event_get_structure (info->data);
2998       FAIL_UNLESS (s);
2999 
3000       /* mouse-move */
3001       string = gst_structure_get_string (s, "event");
3002       if (string && !strcmp (string, "mouse-move")) {
3003         if (gst_structure_get_double (s, "pointer_x", &x) && x == 4.7) {
3004           if (gst_structure_get_double (s, "pointer_y", &y) && y == 0.1) {
3005             d->navigation_received[TEST_NAV_MOUSE_MOVE] = TRUE;
3006           }
3007         }
3008       }
3009 
3010       /* key-press */
3011       string = gst_structure_get_string (s, "event");
3012       if (string && !strcmp (string, "key-press")) {
3013         key = gst_structure_get_string (s, "key");
3014         if (key && !strcmp (key, "Left")) {
3015           d->navigation_received[TEST_NAV_KEY_PRESS] = TRUE;
3016         }
3017       }
3018 
3019       /* drop at this point to imply successful handling; the upstream filesrc
3020        * does not know how to handle navigation events and returns FALSE,
3021        * which makes the test fail */
3022       return GST_PAD_PROBE_DROP;
3023     }
3024   }
3025   return GST_PAD_PROBE_OK;
3026 }
3027 
3028 static void
hook_navigation_probe_source(const GValue * v,gpointer user_data)3029 hook_navigation_probe_source (const GValue * v, gpointer user_data)
3030 {
3031   hook_probe_types (v, navigation_probe_source,
3032       GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, user_data);
3033 }
3034 
3035 static void
navigation_on_state_changed(gpointer user_data)3036 navigation_on_state_changed (gpointer user_data)
3037 {
3038   test_data *td = user_data;
3039   navigation_master_data *d = td->md;
3040 
3041   if (!d->got_state_changed_to_playing)
3042     d->got_state_changed_to_playing = TRUE;
3043 }
3044 
3045 static void
navigation_source(GstElement * source,void * user_data)3046 navigation_source (GstElement * source, void *user_data)
3047 {
3048   test_data *td = user_data;
3049   GstStateChangeReturn ret;
3050   GstIterator *it;
3051 
3052   it = gst_bin_iterate_sinks (GST_BIN (source));
3053   while (gst_iterator_foreach (it, hook_navigation_probe_source, user_data))
3054     gst_iterator_resync (it);
3055   gst_iterator_free (it);
3056 
3057   td->state_target = GST_STATE_PLAYING;
3058   td->state_changed_cb = navigation_on_state_changed;
3059   ret = gst_element_set_state (source, GST_STATE_PLAYING);
3060   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
3061 }
3062 
3063 static void
send_navigation_event(const GValue * v,gpointer user_data)3064 send_navigation_event (const GValue * v, gpointer user_data)
3065 {
3066   test_data *td = user_data;
3067   navigation_slave_data *d = td->sd;
3068   GstElement *sink;
3069   GstPad *pad, *peer;
3070   GstStructure *s;
3071   GstEvent *e = NULL;
3072 
3073   sink = g_value_get_object (v);
3074   FAIL_UNLESS (sink);
3075   pad = gst_element_get_static_pad (sink, "sink");
3076   FAIL_UNLESS (pad);
3077   peer = gst_pad_get_peer (pad);
3078   FAIL_UNLESS (peer);
3079   gst_object_unref (pad);
3080 
3081   switch (d->step) {
3082     case TEST_NAV_MOUSE_MOVE:
3083       s = gst_structure_new ("application/x-gst-navigation", "event",
3084           G_TYPE_STRING, "mouse-move", "button", G_TYPE_INT, 0, "pointer_x",
3085           G_TYPE_DOUBLE, 4.7, "pointer_y", G_TYPE_DOUBLE, 0.1, NULL);
3086       e = gst_event_new_navigation (s);
3087       break;
3088     case TEST_NAV_KEY_PRESS:
3089       s = gst_structure_new ("application/x-gst-navigation", "event",
3090           G_TYPE_STRING, "key-press", "key", G_TYPE_STRING, "Left", NULL);
3091       e = gst_event_new_navigation (s);
3092       break;
3093   }
3094 
3095   FAIL_UNLESS (e);
3096   FAIL_UNLESS (gst_pad_send_event (peer, e));
3097   d->navigation_sent[d->step] = TRUE;
3098 
3099   gst_object_unref (peer);
3100 }
3101 
3102 static gboolean
step_navigation(gpointer user_data)3103 step_navigation (gpointer user_data)
3104 {
3105   test_data *td = user_data;
3106   navigation_slave_data *d = td->sd;
3107   GstIterator *it;
3108 
3109   it = gst_bin_iterate_sinks (GST_BIN (td->p));
3110   while (gst_iterator_foreach (it, send_navigation_event, user_data))
3111     gst_iterator_resync (it);
3112   gst_iterator_free (it);
3113 
3114   if (++d->step < N_NAVIGATION_EVENTS)
3115     return G_SOURCE_CONTINUE;
3116 
3117   /* we are in the slave; send EOS to force the master to stop the pipeline */
3118   gst_element_post_message (GST_ELEMENT (td->p),
3119       gst_message_new_eos (GST_OBJECT (td->p)));
3120 
3121   gst_object_unref (td->p);
3122   return G_SOURCE_REMOVE;
3123 }
3124 
3125 static GstPadProbeReturn
navigation_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)3126 navigation_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3127 {
3128   test_data *td = user_data;
3129   navigation_slave_data *d = td->sd;
3130   GstClockTime ts;
3131 
3132   if (GST_IS_BUFFER (info->data)) {
3133     ts = GST_BUFFER_TIMESTAMP (info->data);
3134     if (GST_CLOCK_TIME_IS_VALID (ts) && ts > STEP_AT * GST_MSECOND) {
3135       if (!d->started) {
3136         d->started = TRUE;
3137         gst_object_ref (td->p);
3138         g_timeout_add (50, step_navigation, td);
3139       }
3140     }
3141   }
3142 
3143   return GST_PAD_PROBE_OK;
3144 }
3145 
3146 static void
hook_navigation_probe(const GValue * v,gpointer user_data)3147 hook_navigation_probe (const GValue * v, gpointer user_data)
3148 {
3149   hook_probe (v, navigation_probe, user_data);
3150 }
3151 
3152 static void
setup_sink_navigation(GstElement * sink,gpointer user_data)3153 setup_sink_navigation (GstElement * sink, gpointer user_data)
3154 {
3155   GstIterator *it;
3156 
3157   it = gst_bin_iterate_sinks (GST_BIN (sink));
3158   while (gst_iterator_foreach (it, hook_navigation_probe, user_data))
3159     gst_iterator_resync (it);
3160   gst_iterator_free (it);
3161 }
3162 
3163 static void
check_success_source_navigation(gpointer user_data)3164 check_success_source_navigation (gpointer user_data)
3165 {
3166   test_data *td = user_data;
3167   navigation_master_data *d = td->md;
3168   gint n;
3169 
3170   FAIL_UNLESS (d->got_state_changed_to_playing);
3171   for (n = 0; n < N_NAVIGATION_EVENTS; ++n) {
3172     FAIL_UNLESS (d->navigation_received[n]);
3173   }
3174 }
3175 
3176 static void
check_success_sink_navigation(gpointer user_data)3177 check_success_sink_navigation (gpointer user_data)
3178 {
3179   test_data *td = user_data;
3180   navigation_slave_data *d = td->sd;
3181   gint n;
3182 
3183   FAIL_UNLESS (d->started);
3184   for (n = 0; n < N_NAVIGATION_EVENTS; ++n) {
3185     FAIL_UNLESS (d->navigation_sent[n]);
3186   }
3187 }
3188 
GST_START_TEST(test_non_live_av_navigation)3189 GST_START_TEST (test_non_live_av_navigation)
3190 {
3191   navigation_master_data md = { 0 };
3192   navigation_slave_data sd = { 0 };
3193 
3194   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, navigation_source,
3195       setup_sink_navigation, check_success_source_navigation,
3196       check_success_sink_navigation, NULL, &md, &sd);
3197 }
3198 
3199 GST_END_TEST;
3200 
GST_START_TEST(test_non_live_av_2_navigation)3201 GST_START_TEST (test_non_live_av_2_navigation)
3202 {
3203   navigation_master_data md = { 0 };
3204   navigation_slave_data sd = { 0 };
3205 
3206   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
3207       navigation_source, setup_sink_navigation, check_success_source_navigation,
3208       check_success_sink_navigation, NULL, &md, &sd);
3209 }
3210 
3211 GST_END_TEST;
3212 
GST_START_TEST(test_live_av_navigation)3213 GST_START_TEST (test_live_av_navigation)
3214 {
3215   navigation_master_data md = { 0 };
3216   navigation_slave_data sd = { 0 };
3217 
3218   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, navigation_source,
3219       setup_sink_navigation, check_success_source_navigation,
3220       check_success_sink_navigation, NULL, &md, &sd);
3221 }
3222 
3223 GST_END_TEST;
3224 
GST_START_TEST(test_live_av_2_navigation)3225 GST_START_TEST (test_live_av_2_navigation)
3226 {
3227   navigation_master_data md = { 0 };
3228   navigation_slave_data sd = { 0 };
3229 
3230   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
3231       navigation_source, setup_sink_navigation, check_success_source_navigation,
3232       check_success_sink_navigation, NULL, &md, &sd);
3233 }
3234 
3235 GST_END_TEST;
3236 
3237 /**** reconfigure test ****/
3238 
3239 typedef struct
3240 {
3241   gboolean got_state_changed_to_playing;
3242   gboolean reconfigure_sent[2];
3243 } reconfigure_master_data;
3244 
3245 typedef struct
3246 {
3247   gboolean reconfigure_scheduled;
3248   gboolean reconfigure_sent[2];
3249   gboolean got_caps[2][2];
3250 } reconfigure_slave_data;
3251 
3252 static GstPadProbeReturn
reconfigure_source_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)3253 reconfigure_source_probe (GstPad * pad, GstPadProbeInfo * info,
3254     gpointer user_data)
3255 {
3256   test_data *td = user_data;
3257   reconfigure_master_data *d = td->md;
3258 
3259   if (GST_EVENT_TYPE (info->data) == GST_EVENT_RECONFIGURE) {
3260     gint idx = pad2idx (pad, td->two_streams);
3261     d->reconfigure_sent[idx] = TRUE;
3262     EXCLUSIVE_CALL (td, g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline,
3263             gst_object_ref (td->p)));
3264   }
3265 
3266   return GST_PAD_PROBE_OK;
3267 }
3268 
3269 static void
hook_reconfigure_source_probe(const GValue * v,gpointer user_data)3270 hook_reconfigure_source_probe (const GValue * v, gpointer user_data)
3271 {
3272   hook_probe_types (v, reconfigure_source_probe,
3273       GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, user_data);
3274 }
3275 
3276 static void
reconfigure_on_state_changed(gpointer user_data)3277 reconfigure_on_state_changed (gpointer user_data)
3278 {
3279   test_data *td = user_data;
3280   reconfigure_master_data *d = td->md;
3281 
3282   if (!d->got_state_changed_to_playing)
3283     d->got_state_changed_to_playing = TRUE;
3284 }
3285 
3286 static void
reconfigure_source(GstElement * source,gpointer user_data)3287 reconfigure_source (GstElement * source, gpointer user_data)
3288 {
3289   test_data *td = user_data;
3290   GstStateChangeReturn ret;
3291   GstIterator *it;
3292 
3293   it = gst_bin_iterate_sinks (GST_BIN (source));
3294   while (gst_iterator_foreach (it, hook_reconfigure_source_probe, user_data))
3295     gst_iterator_resync (it);
3296   gst_iterator_free (it);
3297 
3298   td->state_target = GST_STATE_PLAYING;
3299   td->state_changed_cb = reconfigure_on_state_changed;
3300   ret = gst_element_set_state (source, GST_STATE_PLAYING);
3301   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
3302 }
3303 
3304 static void
send_reconfigure_on_element(const GValue * v,gpointer user_data)3305 send_reconfigure_on_element (const GValue * v, gpointer user_data)
3306 {
3307   test_data *td = user_data;
3308   reconfigure_slave_data *d = td->sd;
3309   GstElement *sink, *capsfilter;
3310   GstPad *pad, *peer;
3311   GstCaps *caps = NULL;
3312 
3313   sink = g_value_get_object (v);
3314   FAIL_UNLESS (sink);
3315   pad = gst_element_get_static_pad (sink, "sink");
3316   FAIL_UNLESS (pad);
3317 
3318   // look for the previous element, change caps if a capsfilter
3319   peer = gst_pad_get_peer (pad);
3320   FAIL_UNLESS (peer);
3321   capsfilter = GST_ELEMENT (gst_pad_get_parent (peer));
3322   g_object_get (capsfilter, "caps", &caps, NULL);
3323   FAIL_UNLESS (caps);
3324   caps = gst_caps_make_writable (caps);
3325   if (!strcmp (gst_structure_get_name (gst_caps_get_structure (caps, 0)),
3326           "audio/x-raw")) {
3327     gst_caps_set_simple (caps, "rate", G_TYPE_INT, 48000, NULL);
3328   } else {
3329     gst_caps_set_simple (caps, "width", G_TYPE_INT, 320, "height", G_TYPE_INT,
3330         200, NULL);
3331   }
3332   g_object_set (capsfilter, "caps", caps, NULL);
3333   FAIL_UNLESS (capsfilter);
3334 
3335   gst_object_unref (capsfilter);
3336   gst_object_unref (peer);
3337 
3338   d->reconfigure_sent[caps2idx (caps, td->two_streams)] = TRUE;
3339 
3340   gst_caps_unref (caps);
3341   gst_object_unref (pad);
3342 }
3343 
3344 static gboolean
send_reconfigure(gpointer user_data)3345 send_reconfigure (gpointer user_data)
3346 {
3347   test_data *td = user_data;
3348   GstIterator *it;
3349 
3350   it = gst_bin_iterate_sinks (GST_BIN (td->p));
3351   while (gst_iterator_foreach (it, send_reconfigure_on_element, user_data))
3352     gst_iterator_resync (it);
3353   gst_iterator_free (it);
3354 
3355   gst_object_unref (td->p);
3356   return G_SOURCE_REMOVE;
3357 }
3358 
3359 static GstPadProbeReturn
reconfigure_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)3360 reconfigure_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3361 {
3362   test_data *td = user_data;
3363   reconfigure_slave_data *d = td->sd;
3364   GstClockTime ts;
3365   GstCaps *caps;
3366   int idx;
3367 
3368   if (GST_IS_BUFFER (info->data)) {
3369     ts = GST_BUFFER_TIMESTAMP (info->data);
3370     if (GST_CLOCK_TIME_IS_VALID (ts) && ts >= STEP_AT * GST_MSECOND) {
3371       if (!d->reconfigure_scheduled) {
3372         d->reconfigure_scheduled = TRUE;
3373         gst_object_ref (td->p);
3374         g_idle_add ((GSourceFunc) send_reconfigure, td);
3375       }
3376     }
3377   } else if (GST_IS_EVENT (info->data)) {
3378     if (GST_EVENT_TYPE (info->data) == GST_EVENT_CAPS) {
3379       gst_event_parse_caps (GST_EVENT (info->data), &caps);
3380       idx = caps2idx (caps, td->two_streams);
3381       if (d->reconfigure_sent[idx]) {
3382         d->got_caps[idx][1] = TRUE;
3383       } else {
3384         d->got_caps[idx][0] = TRUE;
3385       }
3386     }
3387   }
3388 
3389   return GST_PAD_PROBE_OK;
3390 }
3391 
3392 static void
hook_reconfigure_probe(const GValue * v,gpointer user_data)3393 hook_reconfigure_probe (const GValue * v, gpointer user_data)
3394 {
3395   hook_probe (v, reconfigure_probe, user_data);
3396 }
3397 
3398 static void
setup_sink_reconfigure(GstElement * sink,gpointer user_data)3399 setup_sink_reconfigure (GstElement * sink, gpointer user_data)
3400 {
3401   GstIterator *it;
3402 
3403   it = gst_bin_iterate_sinks (GST_BIN (sink));
3404   while (gst_iterator_foreach (it, hook_reconfigure_probe, user_data))
3405     gst_iterator_resync (it);
3406   gst_iterator_free (it);
3407 }
3408 
3409 static void
check_success_source_reconfigure(gpointer user_data)3410 check_success_source_reconfigure (gpointer user_data)
3411 {
3412   test_data *td = user_data;
3413   reconfigure_master_data *d = td->md;
3414   gint idx;
3415 
3416   FAIL_UNLESS (d->got_state_changed_to_playing);
3417   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
3418     FAIL_UNLESS (d->reconfigure_sent[idx]);
3419   }
3420 }
3421 
3422 static void
check_success_sink_reconfigure(gpointer user_data)3423 check_success_sink_reconfigure (gpointer user_data)
3424 {
3425   test_data *td = user_data;
3426   reconfigure_slave_data *d = td->sd;
3427   gint idx;
3428 
3429   FAIL_UNLESS (d->reconfigure_scheduled);
3430   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
3431     FAIL_UNLESS (d->reconfigure_sent[idx]);
3432     FAIL_UNLESS (d->got_caps[idx][0]);
3433     FAIL_UNLESS (d->got_caps[idx][1]);
3434   }
3435 }
3436 
GST_START_TEST(test_non_live_a_reconfigure)3437 GST_START_TEST (test_non_live_a_reconfigure)
3438 {
3439   reconfigure_master_data md = { 0 };
3440   reconfigure_slave_data sd = { 0 };
3441 
3442   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_FILTER_SINK_CAPS,
3443       reconfigure_source, setup_sink_reconfigure,
3444       check_success_source_reconfigure, check_success_sink_reconfigure, NULL,
3445       &md, &sd);
3446 }
3447 
3448 GST_END_TEST;
3449 
GST_START_TEST(test_non_live_av_reconfigure)3450 GST_START_TEST (test_non_live_av_reconfigure)
3451 {
3452   reconfigure_master_data md = { 0 };
3453   reconfigure_slave_data sd = { 0 };
3454 
3455   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_HAS_VIDEO |
3456       TEST_FEATURE_FILTER_SINK_CAPS,
3457       reconfigure_source, setup_sink_reconfigure,
3458       check_success_source_reconfigure, check_success_sink_reconfigure, NULL,
3459       &md, &sd);
3460 }
3461 
3462 GST_END_TEST;
3463 
GST_START_TEST(test_live_a_reconfigure)3464 GST_START_TEST (test_live_a_reconfigure)
3465 {
3466   reconfigure_master_data md = { 0 };
3467   reconfigure_slave_data sd = { 0 };
3468 
3469   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE | TEST_FEATURE_FILTER_SINK_CAPS,
3470       reconfigure_source, setup_sink_reconfigure,
3471       check_success_source_reconfigure, check_success_sink_reconfigure, NULL,
3472       &md, &sd);
3473 }
3474 
3475 GST_END_TEST;
3476 
GST_START_TEST(test_live_av_reconfigure)3477 GST_START_TEST (test_live_av_reconfigure)
3478 {
3479   reconfigure_master_data md = { 0 };
3480   reconfigure_slave_data sd = { 0 };
3481 
3482   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_FILTER_SINK_CAPS,
3483       reconfigure_source, setup_sink_reconfigure,
3484       check_success_source_reconfigure, check_success_sink_reconfigure, NULL,
3485       &md, &sd);
3486 }
3487 
3488 GST_END_TEST;
3489 
3490 /**** state changes test ****/
3491 
3492 typedef struct
3493 {
3494   gint step;
3495   GHashTable *fdin, *fdout;
3496   gboolean waiting_state_change;
3497 } state_changes_master_data;
3498 
3499 typedef struct
3500 {
3501   gint n_null;
3502   gint n_ready;
3503   gint n_paused;
3504   gint n_playing;
3505   gboolean got_eos;
3506 } state_changes_slave_data;
3507 
3508 static void
set_fdin(gpointer key,gpointer value,gpointer user_data)3509 set_fdin (gpointer key, gpointer value, gpointer user_data)
3510 {
3511   g_object_set (key, "fdin", GPOINTER_TO_INT (value), NULL);
3512 }
3513 
3514 static void
set_fdout(gpointer key,gpointer value,gpointer user_data)3515 set_fdout (gpointer key, gpointer value, gpointer user_data)
3516 {
3517   g_object_set (key, "fdout", GPOINTER_TO_INT (value), NULL);
3518 }
3519 
3520 /*
3521  * NULL
3522  * 0: READY NULL READY PAUSED READY PAUSED READY NULL
3523  * 8: READY PAUSED PLAYING PAUSED PLAYING PAUSED READY PAUSED READY NULL
3524  * 18: disconnect
3525  * 19: READY NULL READY PAUSED READY PAUSED READY NULL
3526  * 27: READY PAUSED PLAYING PAUSED PLAYING PAUSED READY PAUSED READY NULL
3527  * 37: reconnect
3528  * 38: READY NULL READY PAUSED READY PAUSED READY NULL
3529  * 46: READY PAUSED PLAYING PAUSED PLAYING
3530  * 51: EOS
3531  */
3532 static gboolean
step_state_changes(gpointer user_data)3533 step_state_changes (gpointer user_data)
3534 {
3535   test_data *td = user_data;
3536   state_changes_master_data *d = td->md;
3537   gboolean ret = G_SOURCE_CONTINUE;
3538   GstStateChangeReturn scret = GST_STATE_CHANGE_FAILURE;
3539   GList *l;
3540   int fdin, fdout;
3541 
3542   if (d->waiting_state_change)
3543     goto done;
3544 
3545   switch (d->step++) {
3546     case 1:
3547     case 7:
3548     case 17:
3549     case 20:
3550     case 26:
3551     case 36:
3552     case 39:
3553     case 45:
3554       scret = gst_element_set_state (td->p, GST_STATE_NULL);
3555       FAIL_UNLESS_EQUALS_INT (scret, GST_STATE_CHANGE_SUCCESS);
3556       break;
3557     case 0:
3558     case 2:
3559     case 4:
3560     case 6:
3561     case 8:
3562     case 14:
3563     case 16:
3564     case 38:
3565     case 40:
3566     case 42:
3567     case 44:
3568     case 46:
3569       scret = gst_element_set_state (td->p, GST_STATE_READY);
3570       FAIL_UNLESS_EQUALS_INT (scret, GST_STATE_CHANGE_SUCCESS);
3571       break;
3572     case 19:
3573     case 21:
3574     case 23:
3575     case 25:
3576     case 27:
3577     case 33:
3578     case 35:
3579       /* while we are disconnected, we can't do NULL -> READY */
3580       scret = gst_element_set_state (td->p, GST_STATE_READY);
3581       FAIL_UNLESS_EQUALS_INT (scret, GST_STATE_CHANGE_FAILURE);
3582       break;
3583     case 3:
3584     case 5:
3585     case 9:
3586     case 11:
3587     case 13:
3588     case 15:
3589     case 41:
3590     case 43:
3591     case 47:
3592     case 49:
3593       td->state_target = GST_STATE_PAUSED;
3594       scret = gst_element_set_state (td->p, GST_STATE_PAUSED);
3595       FAIL_IF (scret == GST_STATE_CHANGE_FAILURE);
3596       break;
3597     case 22:
3598     case 24:
3599     case 28:
3600     case 30:
3601     case 32:
3602     case 34:
3603       /* while we are disconnected, we can't do NULL -> READY */
3604       scret = gst_element_set_state (td->p, GST_STATE_PAUSED);
3605       FAIL_UNLESS_EQUALS_INT (scret, GST_STATE_CHANGE_FAILURE);
3606       break;
3607     case 10:
3608     case 12:
3609     case 48:
3610     case 50:
3611       td->state_target = GST_STATE_PLAYING;
3612       scret = gst_element_set_state (td->p, GST_STATE_PLAYING);
3613       FAIL_IF (scret == GST_STATE_CHANGE_FAILURE);
3614       break;
3615     case 29:
3616     case 31:
3617       /* while we are disconnected, we can't do NULL -> READY */
3618       scret = gst_element_set_state (td->p, GST_STATE_PLAYING);
3619       FAIL_UNLESS_EQUALS_INT (scret, GST_STATE_CHANGE_FAILURE);
3620       break;
3621     case 18:
3622       d->fdin = g_hash_table_new (g_direct_hash, g_direct_equal);
3623       d->fdout = g_hash_table_new (g_direct_hash, g_direct_equal);
3624       for (l = weak_refs; l; l = l->next) {
3625         g_object_get (l->data, "fdin", &fdin, "fdout", &fdout, NULL);
3626         g_hash_table_insert (d->fdin, (gpointer) l->data,
3627             GINT_TO_POINTER (fdin));
3628         g_hash_table_insert (d->fdout, (gpointer) l->data,
3629             GINT_TO_POINTER (fdout));
3630         g_signal_emit_by_name (G_OBJECT (l->data), "disconnect", NULL);
3631       }
3632       break;
3633     case 37:
3634       g_hash_table_foreach (d->fdin, set_fdin, NULL);
3635       g_hash_table_foreach (d->fdout, set_fdout, NULL);
3636       g_hash_table_destroy (d->fdin);
3637       g_hash_table_destroy (d->fdout);
3638       break;
3639     case 51:
3640       /* send EOS early to avoid waiting for the actual end of the file */
3641       gst_element_send_event (td->p, gst_event_new_eos ());
3642       gst_object_unref (td->p);
3643       ret = G_SOURCE_REMOVE;
3644       break;
3645   }
3646 
3647   if (scret == GST_STATE_CHANGE_ASYNC)
3648     d->waiting_state_change = TRUE;
3649 
3650 done:
3651   return ret;
3652 }
3653 
3654 static void
state_changes_state_changed(gpointer user_data)3655 state_changes_state_changed (gpointer user_data)
3656 {
3657   test_data *td = user_data;
3658   state_changes_master_data *d = td->md;
3659 
3660   d->waiting_state_change = FALSE;
3661 }
3662 
3663 static void
state_changes_source(GstElement * source,gpointer user_data)3664 state_changes_source (GstElement * source, gpointer user_data)
3665 {
3666   test_data *td = user_data;
3667   state_changes_master_data *d = td->md;
3668 
3669   gst_object_ref (source);
3670   g_timeout_add (STEP_AT, (GSourceFunc) step_state_changes, td);
3671 
3672   d->waiting_state_change = FALSE;
3673   td->state_changed_cb = state_changes_state_changed;
3674 }
3675 
3676 static GstBusSyncReply
state_changes_sink_bus_msg(GstBus * bus,GstMessage * message,gpointer user_data)3677 state_changes_sink_bus_msg (GstBus * bus, GstMessage * message,
3678     gpointer user_data)
3679 {
3680   test_data *td = user_data;
3681   state_changes_slave_data *d = td->sd;
3682 
3683   switch (GST_MESSAGE_TYPE (message)) {
3684     case GST_MESSAGE_EOS:
3685       d->got_eos = TRUE;
3686       break;
3687     case GST_MESSAGE_STATE_CHANGED:
3688       if (GST_MESSAGE_SRC (message) == GST_OBJECT_CAST (td->p)) {
3689         GstState state;
3690         gst_message_parse_state_changed (message, NULL, &state, NULL);
3691         switch (state) {
3692           case GST_STATE_NULL:
3693             d->n_null++;
3694             break;
3695           case GST_STATE_READY:
3696             d->n_ready++;
3697             break;
3698           case GST_STATE_PAUSED:
3699             d->n_paused++;
3700             break;
3701           case GST_STATE_PLAYING:
3702             d->n_playing++;
3703             break;
3704           default:
3705             fail_if (1);
3706         }
3707       }
3708       break;
3709     default:
3710       break;
3711   }
3712   return GST_BUS_PASS;
3713 }
3714 
3715 static void
setup_sink_state_changes(GstElement * sink,gpointer user_data)3716 setup_sink_state_changes (GstElement * sink, gpointer user_data)
3717 {
3718   g_object_set (sink, "auto-flush-bus", FALSE, NULL);
3719   gst_bus_set_sync_handler (GST_ELEMENT_BUS (sink), state_changes_sink_bus_msg,
3720       user_data, NULL);
3721 }
3722 
3723 static void
check_success_source_state_changes(gpointer user_data)3724 check_success_source_state_changes (gpointer user_data)
3725 {
3726   test_data *td = user_data;
3727   state_changes_master_data *d = td->md;
3728 
3729   FAIL_UNLESS_EQUALS_INT (d->step, 52);
3730 }
3731 
3732 static void
check_success_sink_state_changes(gpointer user_data)3733 check_success_sink_state_changes (gpointer user_data)
3734 {
3735   test_data *td = user_data;
3736   state_changes_slave_data *d = td->sd;
3737   GstBus *bus;
3738 
3739   bus = gst_pipeline_get_bus (GST_PIPELINE (td->p));
3740   gst_bus_set_flushing (bus, TRUE);
3741   gst_object_unref (bus);
3742 
3743   FAIL_UNLESS (d->got_eos);
3744   FAIL_UNLESS_EQUALS_INT (d->n_null, 6);
3745   FAIL_UNLESS_EQUALS_INT (d->n_ready, 13);
3746   FAIL_UNLESS_EQUALS_INT (d->n_paused, 11);
3747   FAIL_UNLESS_EQUALS_INT (d->n_playing, 4);
3748 }
3749 
GST_START_TEST(test_empty_state_changes)3750 GST_START_TEST (test_empty_state_changes)
3751 {
3752   state_changes_master_data md = { 0 };
3753   state_changes_slave_data sd = { 0 };
3754 
3755   TEST_BASE (TEST_FEATURE_TEST_SOURCE, state_changes_source,
3756       setup_sink_state_changes, check_success_source_state_changes,
3757       check_success_sink_state_changes, NULL, &md, &sd);
3758 }
3759 
3760 GST_END_TEST;
3761 
GST_START_TEST(test_wavparse_state_changes)3762 GST_START_TEST (test_wavparse_state_changes)
3763 {
3764   state_changes_master_data md = { 0 };
3765   state_changes_slave_data sd = { 0 };
3766 
3767   TEST_BASE (TEST_FEATURE_WAV_SOURCE, state_changes_source,
3768       setup_sink_state_changes, check_success_source_state_changes,
3769       check_success_sink_state_changes, NULL, &md, &sd);
3770 }
3771 
3772 GST_END_TEST;
3773 
GST_START_TEST(test_mpegts_state_changes)3774 GST_START_TEST (test_mpegts_state_changes)
3775 {
3776   state_changes_master_data md = { 0 };
3777   state_changes_slave_data sd = { 0 };
3778 
3779   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, state_changes_source,
3780       setup_sink_state_changes, check_success_source_state_changes,
3781       check_success_sink_state_changes, NULL, &md, &sd);
3782 }
3783 
3784 GST_END_TEST;
3785 
GST_START_TEST(test_mpegts_2_state_changes)3786 GST_START_TEST (test_mpegts_2_state_changes)
3787 {
3788   state_changes_master_data md = { 0 };
3789   state_changes_slave_data sd = { 0 };
3790 
3791   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
3792       state_changes_source, setup_sink_state_changes,
3793       check_success_source_state_changes, check_success_sink_state_changes,
3794       NULL, &md, &sd);
3795 }
3796 
3797 GST_END_TEST;
3798 
3799 /**** state changes stress test ****/
3800 
3801 typedef struct
3802 {
3803   gint n_state_changes;
3804 } state_changes_stress_input_data;
3805 
3806 typedef struct
3807 {
3808   gboolean got_state_changed_to_playing;
3809   gboolean async_state_change_completed;
3810 } state_changes_stress_master_data;
3811 
3812 static gboolean
step_state_changes_stress(gpointer user_data)3813 step_state_changes_stress (gpointer user_data)
3814 {
3815   test_data *td = user_data;
3816   state_changes_stress_input_data *i = td->id;
3817   state_changes_stress_master_data *d = td->md;
3818   static const GstState states[] =
3819       { GST_STATE_NULL, GST_STATE_READY, GST_STATE_PAUSED, GST_STATE_PLAYING };
3820   GstState state;
3821   GstStateChangeReturn ret;
3822 
3823   /* wait for async state change to complete before continuing */
3824   if (!d->async_state_change_completed)
3825     return G_SOURCE_CONTINUE;
3826 
3827   if (i->n_state_changes == 0) {
3828     ret = gst_element_set_state (td->p, GST_STATE_PLAYING);
3829     FAIL_IF (ret == GST_STATE_CHANGE_FAILURE);
3830     g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline, td->p);
3831     return G_SOURCE_REMOVE;
3832   }
3833   --i->n_state_changes;
3834 
3835   state = states[rand () % 4];
3836   ret = gst_element_set_state (td->p, state);
3837   FAIL_IF (ret == GST_STATE_CHANGE_FAILURE);
3838 
3839   if (ret == GST_STATE_CHANGE_ASYNC) {
3840     td->state_target = state;
3841     d->async_state_change_completed = FALSE;
3842   }
3843 
3844   return G_SOURCE_CONTINUE;
3845 }
3846 
3847 static void
state_changes_stress_on_state_changed(gpointer user_data)3848 state_changes_stress_on_state_changed (gpointer user_data)
3849 {
3850   test_data *td = user_data;
3851   state_changes_stress_master_data *d = td->md;
3852 
3853   if (!d->got_state_changed_to_playing) {
3854     d->got_state_changed_to_playing = TRUE;
3855     gst_object_ref (td->p);
3856     g_timeout_add (50, (GSourceFunc) step_state_changes_stress, td);
3857   }
3858   d->async_state_change_completed = TRUE;
3859 }
3860 
3861 static void
state_changes_stress_source(GstElement * source,gpointer user_data)3862 state_changes_stress_source (GstElement * source, gpointer user_data)
3863 {
3864   test_data *td = user_data;
3865   GstStateChangeReturn ret;
3866 
3867   td->state_target = GST_STATE_PLAYING;
3868   td->state_changed_cb = state_changes_stress_on_state_changed;
3869   ret = gst_element_set_state (source, GST_STATE_PLAYING);
3870   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
3871 }
3872 
3873 static void
check_success_source_state_changes_stress(gpointer user_data)3874 check_success_source_state_changes_stress (gpointer user_data)
3875 {
3876   test_data *td = user_data;
3877   state_changes_stress_input_data *i = td->id;
3878   state_changes_stress_master_data *d = td->md;
3879 
3880   FAIL_UNLESS (d->got_state_changed_to_playing);
3881   FAIL_UNLESS_EQUALS_INT (i->n_state_changes, 0);
3882 }
3883 
GST_START_TEST(test_empty_state_changes_stress)3884 GST_START_TEST (test_empty_state_changes_stress)
3885 {
3886   state_changes_stress_input_data id = { 500 };
3887   state_changes_stress_master_data md = { 0 };
3888 
3889   TEST_BASE (TEST_FEATURE_TEST_SOURCE, state_changes_stress_source, NULL,
3890       check_success_source_state_changes_stress, NULL, &id, &md, NULL);
3891 }
3892 
3893 GST_END_TEST;
3894 
GST_START_TEST(test_wavparse_state_changes_stress)3895 GST_START_TEST (test_wavparse_state_changes_stress)
3896 {
3897   state_changes_stress_input_data id = { 500 };
3898   state_changes_stress_master_data md = { 0 };
3899 
3900   TEST_BASE (TEST_FEATURE_WAV_SOURCE, state_changes_stress_source, NULL,
3901       check_success_source_state_changes_stress, NULL, &id, &md, NULL);
3902 }
3903 
3904 GST_END_TEST;
3905 
GST_START_TEST(test_mpegts_state_changes_stress)3906 GST_START_TEST (test_mpegts_state_changes_stress)
3907 {
3908   state_changes_stress_input_data id = { 500 };
3909   state_changes_stress_master_data md = { 0 };
3910 
3911   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, state_changes_stress_source, NULL,
3912       check_success_source_state_changes_stress, NULL, &id, &md, NULL);
3913 }
3914 
3915 GST_END_TEST;
3916 
GST_START_TEST(test_mpegts_2_state_changes_stress)3917 GST_START_TEST (test_mpegts_2_state_changes_stress)
3918 {
3919   state_changes_stress_input_data id = { 500 };
3920   state_changes_stress_master_data md = { 0 };
3921 
3922   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
3923       state_changes_stress_source, NULL,
3924       check_success_source_state_changes_stress, NULL, &id, &md, NULL);
3925 }
3926 
3927 GST_END_TEST;
3928 
GST_START_TEST(test_live_a_state_changes_stress)3929 GST_START_TEST (test_live_a_state_changes_stress)
3930 {
3931   state_changes_stress_input_data id = { 500 };
3932   state_changes_stress_master_data md = { 0 };
3933 
3934   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, state_changes_stress_source, NULL,
3935       check_success_source_state_changes_stress, NULL, &id, &md, NULL);
3936 }
3937 
3938 GST_END_TEST;
3939 
GST_START_TEST(test_live_av_state_changes_stress)3940 GST_START_TEST (test_live_av_state_changes_stress)
3941 {
3942   state_changes_stress_input_data id = { 500 };
3943   state_changes_stress_master_data md = { 0 };
3944 
3945   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, state_changes_stress_source, NULL,
3946       check_success_source_state_changes_stress, NULL, &id, &md, NULL);
3947 }
3948 
3949 GST_END_TEST;
3950 
GST_START_TEST(test_live_av_2_state_changes_stress)3951 GST_START_TEST (test_live_av_2_state_changes_stress)
3952 {
3953   state_changes_stress_input_data id = { 500 };
3954   state_changes_stress_master_data md = { 0 };
3955 
3956   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
3957       state_changes_stress_source, NULL,
3958       check_success_source_state_changes_stress, NULL, &id, &md, NULL);
3959 }
3960 
3961 GST_END_TEST;
3962 
3963 /**** serialized query test ****/
3964 
3965 typedef struct
3966 {
3967   gboolean sent_query[2];
3968   gboolean got_query_reply[2];
3969   GstPad *pad[2];
3970 } serialized_query_master_data;
3971 
3972 typedef struct
3973 {
3974   gboolean got_query;
3975 } serialized_query_slave_data;
3976 
3977 static gboolean
send_drain(gpointer user_data)3978 send_drain (gpointer user_data)
3979 {
3980   test_data *td = user_data;
3981   serialized_query_master_data *d = td->md;
3982   GstQuery *q;
3983   gint idx;
3984 
3985   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
3986     q = gst_query_new_drain ();
3987     FAIL_UNLESS (gst_pad_query (d->pad[idx], q));
3988     d->got_query_reply[idx] = TRUE;
3989     gst_query_unref (q);
3990     gst_object_unref (d->pad[idx]);
3991   }
3992 
3993   g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline, gst_object_ref (td->p));
3994   return G_SOURCE_REMOVE;
3995 }
3996 
3997 static GstPadProbeReturn
serialized_query_probe_source(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)3998 serialized_query_probe_source (GstPad * pad, GstPadProbeInfo * info,
3999     gpointer user_data)
4000 {
4001   test_data *td = user_data;
4002   serialized_query_master_data *d = td->md;
4003   GstClockTime ts;
4004   gint idx;
4005 
4006   if (GST_IS_BUFFER (info->data)) {
4007     ts = GST_BUFFER_TIMESTAMP (info->data);
4008     idx = pad2idx (pad, td->two_streams);
4009     if (!d->sent_query[idx] && GST_CLOCK_TIME_IS_VALID (ts)
4010         && ts > STEP_AT * GST_MSECOND) {
4011       d->sent_query[idx] = TRUE;
4012       d->pad[idx] = gst_object_ref (pad);
4013       EXCLUSIVE_CALL (td, g_idle_add (send_drain, td));
4014     }
4015   }
4016   return GST_PAD_PROBE_OK;
4017 }
4018 
4019 static void
hook_serialized_query_probe_source(const GValue * v,gpointer user_data)4020 hook_serialized_query_probe_source (const GValue * v, gpointer user_data)
4021 {
4022   hook_probe (v, serialized_query_probe_source, user_data);
4023 }
4024 
4025 static void
serialized_query_source(GstElement * source,gpointer user_data)4026 serialized_query_source (GstElement * source, gpointer user_data)
4027 {
4028   GstIterator *it;
4029   GstStateChangeReturn ret;
4030 
4031   it = gst_bin_iterate_sinks (GST_BIN (source));
4032   while (gst_iterator_foreach (it, hook_serialized_query_probe_source,
4033           user_data))
4034     gst_iterator_resync (it);
4035   gst_iterator_free (it);
4036 
4037   ret = gst_element_set_state (source, GST_STATE_PLAYING);
4038   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC
4039       || ret == GST_STATE_CHANGE_SUCCESS);
4040 }
4041 
4042 static GstPadProbeReturn
serialized_query_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)4043 serialized_query_probe (GstPad * pad, GstPadProbeInfo * info,
4044     gpointer user_data)
4045 {
4046   test_data *td = user_data;
4047   serialized_query_slave_data *d = td->sd;
4048 
4049   if (GST_IS_QUERY (info->data)) {
4050     if (GST_QUERY_TYPE (info->data) == GST_QUERY_DRAIN) {
4051       d->got_query = TRUE;
4052     }
4053   }
4054   return GST_PAD_PROBE_OK;
4055 }
4056 
4057 static void
hook_serialized_query_probe(const GValue * v,gpointer user_data)4058 hook_serialized_query_probe (const GValue * v, gpointer user_data)
4059 {
4060   hook_probe (v, serialized_query_probe, user_data);
4061 }
4062 
4063 static void
setup_sink_serialized_query(GstElement * sink,gpointer user_data)4064 setup_sink_serialized_query (GstElement * sink, gpointer user_data)
4065 {
4066   GstIterator *it;
4067 
4068   it = gst_bin_iterate_sinks (GST_BIN (sink));
4069   while (gst_iterator_foreach (it, hook_serialized_query_probe, user_data))
4070     gst_iterator_resync (it);
4071   gst_iterator_free (it);
4072 }
4073 
4074 static void
check_success_source_serialized_query(gpointer user_data)4075 check_success_source_serialized_query (gpointer user_data)
4076 {
4077   test_data *td = user_data;
4078   serialized_query_master_data *d = td->md;
4079   gint idx;
4080 
4081   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
4082     FAIL_UNLESS (d->sent_query[idx]);
4083     FAIL_UNLESS (d->got_query_reply[idx]);
4084   }
4085 }
4086 
4087 static void
check_success_sink_serialized_query(gpointer user_data)4088 check_success_sink_serialized_query (gpointer user_data)
4089 {
4090   test_data *td = user_data;
4091   serialized_query_slave_data *d = td->sd;
4092 
4093   FAIL_UNLESS (d->got_query);
4094 }
4095 
GST_START_TEST(test_empty_serialized_query)4096 GST_START_TEST (test_empty_serialized_query)
4097 {
4098   serialized_query_master_data md = { {0} };
4099   serialized_query_slave_data sd = { 0 };
4100 
4101   TEST_BASE (TEST_FEATURE_TEST_SOURCE, serialized_query_source,
4102       setup_sink_serialized_query, check_success_source_serialized_query,
4103       check_success_sink_serialized_query, NULL, &md, &sd);
4104 }
4105 
4106 GST_END_TEST;
4107 
GST_START_TEST(test_wavparse_serialized_query)4108 GST_START_TEST (test_wavparse_serialized_query)
4109 {
4110   serialized_query_master_data md = { {0} };
4111   serialized_query_slave_data sd = { 0 };
4112 
4113   TEST_BASE (TEST_FEATURE_WAV_SOURCE, serialized_query_source,
4114       setup_sink_serialized_query, check_success_source_serialized_query,
4115       check_success_sink_serialized_query, NULL, &md, &sd);
4116 }
4117 
4118 GST_END_TEST;
4119 
GST_START_TEST(test_mpegts_serialized_query)4120 GST_START_TEST (test_mpegts_serialized_query)
4121 {
4122   serialized_query_master_data md = { {0} };
4123   serialized_query_slave_data sd = { 0 };
4124 
4125   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, serialized_query_source,
4126       setup_sink_serialized_query, check_success_source_serialized_query,
4127       check_success_sink_serialized_query, NULL, &md, &sd);
4128 }
4129 
4130 GST_END_TEST;
4131 
GST_START_TEST(test_mpegts_2_serialized_query)4132 GST_START_TEST (test_mpegts_2_serialized_query)
4133 {
4134   serialized_query_master_data md = { {0} };
4135   serialized_query_slave_data sd = { 0 };
4136 
4137   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
4138       serialized_query_source, setup_sink_serialized_query,
4139       check_success_source_serialized_query,
4140       check_success_sink_serialized_query, NULL, &md, &sd);
4141 }
4142 
4143 GST_END_TEST;
4144 
GST_START_TEST(test_live_a_serialized_query)4145 GST_START_TEST (test_live_a_serialized_query)
4146 {
4147   serialized_query_master_data md = { {0} };
4148   serialized_query_slave_data sd = { 0 };
4149 
4150   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, serialized_query_source,
4151       setup_sink_serialized_query, check_success_source_serialized_query,
4152       check_success_sink_serialized_query, NULL, &md, &sd);
4153 }
4154 
4155 GST_END_TEST;
4156 
GST_START_TEST(test_live_av_serialized_query)4157 GST_START_TEST (test_live_av_serialized_query)
4158 {
4159   serialized_query_master_data md = { {0} };
4160   serialized_query_slave_data sd = { 0 };
4161 
4162   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, serialized_query_source,
4163       setup_sink_serialized_query, check_success_source_serialized_query,
4164       check_success_sink_serialized_query, NULL, &md, &sd);
4165 }
4166 
4167 GST_END_TEST;
4168 
GST_START_TEST(test_live_av_2_serialized_query)4169 GST_START_TEST (test_live_av_2_serialized_query)
4170 {
4171   serialized_query_master_data md = { {0} };
4172   serialized_query_slave_data sd = { 0 };
4173 
4174   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
4175       serialized_query_source, setup_sink_serialized_query,
4176       check_success_source_serialized_query,
4177       check_success_sink_serialized_query, NULL, &md, &sd);
4178 }
4179 
4180 GST_END_TEST;
4181 
4182 /**** non serialized event test ****/
4183 
4184 typedef struct
4185 {
4186   gboolean sent_event[2];
4187 } non_serialized_event_master_data;
4188 
4189 typedef struct
4190 {
4191   gboolean got_event;
4192 } non_serialized_event_slave_data;
4193 
4194 static GstPadProbeReturn
non_serialized_event_probe_source(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)4195 non_serialized_event_probe_source (GstPad * pad, GstPadProbeInfo * info,
4196     gpointer user_data)
4197 {
4198   test_data *td = user_data;
4199   non_serialized_event_master_data *d = td->md;
4200   GstClockTime ts;
4201   GstEvent *e;
4202   gint idx;
4203 
4204   if (GST_IS_BUFFER (info->data)) {
4205     ts = GST_BUFFER_TIMESTAMP (info->data);
4206     idx = pad2idx (pad, td->two_streams);
4207     if (!d->sent_event[idx]
4208         && GST_CLOCK_TIME_IS_VALID (ts) && ts > STEP_AT * GST_MSECOND) {
4209       e = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_OOB,
4210           gst_structure_new ("name", "field", G_TYPE_INT, 42, NULL));
4211       FAIL_UNLESS (e);
4212       FAIL_UNLESS (gst_pad_send_event (pad, e));
4213       d->sent_event[idx] = TRUE;
4214       EXCLUSIVE_CALL (td, g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline,
4215               gst_object_ref (td->p)));
4216     }
4217   }
4218   return GST_PAD_PROBE_OK;
4219 }
4220 
4221 static void
hook_non_serialized_event_probe_source(const GValue * v,gpointer user_data)4222 hook_non_serialized_event_probe_source (const GValue * v, gpointer user_data)
4223 {
4224   hook_probe (v, non_serialized_event_probe_source, user_data);
4225 }
4226 
4227 static void
non_serialized_event_source(GstElement * source,gpointer user_data)4228 non_serialized_event_source (GstElement * source, gpointer user_data)
4229 {
4230   GstIterator *it;
4231   GstStateChangeReturn ret;
4232 
4233   it = gst_bin_iterate_sinks (GST_BIN (source));
4234   while (gst_iterator_foreach (it, hook_non_serialized_event_probe_source,
4235           user_data))
4236     gst_iterator_resync (it);
4237   gst_iterator_free (it);
4238 
4239   ret = gst_element_set_state (source, GST_STATE_PLAYING);
4240   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC
4241       || ret == GST_STATE_CHANGE_SUCCESS);
4242 }
4243 
4244 static GstPadProbeReturn
non_serialized_event_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)4245 non_serialized_event_probe (GstPad * pad, GstPadProbeInfo * info,
4246     gpointer user_data)
4247 {
4248   test_data *td = user_data;
4249   non_serialized_event_slave_data *d = td->sd;
4250   const GstStructure *s;
4251   gint val;
4252 
4253   if (GST_IS_EVENT (info->data)) {
4254     if (GST_EVENT_TYPE (info->data) == GST_EVENT_CUSTOM_DOWNSTREAM_OOB) {
4255       s = gst_event_get_structure (info->data);
4256       FAIL_UNLESS (!strcmp (gst_structure_get_name (s), "name"));
4257       FAIL_UNLESS (gst_structure_get_int (s, "field", &val));
4258       FAIL_UNLESS (val == 42);
4259       d->got_event = TRUE;
4260     }
4261   }
4262   return GST_PAD_PROBE_OK;
4263 }
4264 
4265 static void
hook_non_serialized_event_probe(const GValue * v,gpointer user_data)4266 hook_non_serialized_event_probe (const GValue * v, gpointer user_data)
4267 {
4268   hook_probe (v, non_serialized_event_probe, user_data);
4269 }
4270 
4271 static void
setup_sink_non_serialized_event(GstElement * sink,gpointer user_data)4272 setup_sink_non_serialized_event (GstElement * sink, gpointer user_data)
4273 {
4274   GstIterator *it;
4275 
4276   it = gst_bin_iterate_sinks (GST_BIN (sink));
4277   while (gst_iterator_foreach (it, hook_non_serialized_event_probe, user_data))
4278     gst_iterator_resync (it);
4279   gst_iterator_free (it);
4280 }
4281 
4282 static void
check_success_source_non_serialized_event(gpointer user_data)4283 check_success_source_non_serialized_event (gpointer user_data)
4284 {
4285   test_data *td = user_data;
4286   non_serialized_event_master_data *d = td->md;
4287   gint idx;
4288 
4289   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
4290     FAIL_UNLESS (d->sent_event[idx]);
4291   }
4292 }
4293 
4294 static void
check_success_sink_non_serialized_event(gpointer user_data)4295 check_success_sink_non_serialized_event (gpointer user_data)
4296 {
4297   test_data *td = user_data;
4298   non_serialized_event_slave_data *d = td->sd;
4299 
4300   FAIL_UNLESS (d->got_event);
4301 }
4302 
GST_START_TEST(test_empty_non_serialized_event)4303 GST_START_TEST (test_empty_non_serialized_event)
4304 {
4305   non_serialized_event_master_data md = { {0} };
4306   non_serialized_event_slave_data sd = { 0 };
4307 
4308   TEST_BASE (TEST_FEATURE_TEST_SOURCE, non_serialized_event_source,
4309       setup_sink_non_serialized_event,
4310       check_success_source_non_serialized_event,
4311       check_success_sink_non_serialized_event, NULL, &md, &sd);
4312 }
4313 
4314 GST_END_TEST;
4315 
GST_START_TEST(test_wavparse_non_serialized_event)4316 GST_START_TEST (test_wavparse_non_serialized_event)
4317 {
4318   non_serialized_event_master_data md = { {0} };
4319   non_serialized_event_slave_data sd = { 0 };
4320 
4321   TEST_BASE (TEST_FEATURE_WAV_SOURCE, non_serialized_event_source,
4322       setup_sink_non_serialized_event,
4323       check_success_source_non_serialized_event,
4324       check_success_sink_non_serialized_event, NULL, &md, &sd);
4325 }
4326 
4327 GST_END_TEST;
4328 
GST_START_TEST(test_mpegts_non_serialized_event)4329 GST_START_TEST (test_mpegts_non_serialized_event)
4330 {
4331   non_serialized_event_master_data md = { {0} };
4332   non_serialized_event_slave_data sd = { 0 };
4333 
4334   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, non_serialized_event_source,
4335       setup_sink_non_serialized_event,
4336       check_success_source_non_serialized_event,
4337       check_success_sink_non_serialized_event, NULL, &md, &sd);
4338 }
4339 
4340 GST_END_TEST;
4341 
GST_START_TEST(test_mpegts_2_non_serialized_event)4342 GST_START_TEST (test_mpegts_2_non_serialized_event)
4343 {
4344   non_serialized_event_master_data md = { {0} };
4345   non_serialized_event_slave_data sd = { 0 };
4346 
4347   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
4348       non_serialized_event_source, setup_sink_non_serialized_event,
4349       check_success_source_non_serialized_event,
4350       check_success_sink_non_serialized_event, NULL, &md, &sd);
4351 }
4352 
4353 GST_END_TEST;
4354 
GST_START_TEST(test_live_a_non_serialized_event)4355 GST_START_TEST (test_live_a_non_serialized_event)
4356 {
4357   non_serialized_event_master_data md = { {0} };
4358   non_serialized_event_slave_data sd = { 0 };
4359 
4360   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, non_serialized_event_source,
4361       setup_sink_non_serialized_event,
4362       check_success_source_non_serialized_event,
4363       check_success_sink_non_serialized_event, NULL, &md, &sd);
4364 }
4365 
4366 GST_END_TEST;
4367 
GST_START_TEST(test_live_av_non_serialized_event)4368 GST_START_TEST (test_live_av_non_serialized_event)
4369 {
4370   non_serialized_event_master_data md = { {0} };
4371   non_serialized_event_slave_data sd = { 0 };
4372 
4373   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, non_serialized_event_source,
4374       setup_sink_non_serialized_event,
4375       check_success_source_non_serialized_event,
4376       check_success_sink_non_serialized_event, NULL, &md, &sd);
4377 }
4378 
4379 GST_END_TEST;
4380 
GST_START_TEST(test_live_av_2_non_serialized_event)4381 GST_START_TEST (test_live_av_2_non_serialized_event)
4382 {
4383   non_serialized_event_master_data md = { {0} };
4384   non_serialized_event_slave_data sd = { 0 };
4385 
4386   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
4387       non_serialized_event_source, setup_sink_non_serialized_event,
4388       check_success_source_non_serialized_event,
4389       check_success_sink_non_serialized_event, NULL, &md, &sd);
4390 }
4391 
4392 GST_END_TEST;
4393 
4394 /**** meta test ****/
4395 
4396 enum
4397 {
4398   TEST_META_PROTECTION = 0,
4399   N_TEST_META
4400 };
4401 
4402 typedef struct
4403 {
4404   gboolean meta_sent[N_TEST_META];
4405 } meta_master_data;
4406 
4407 typedef struct
4408 {
4409   gboolean meta_received[N_TEST_META];
4410 } meta_slave_data;
4411 
4412 static GstPadProbeReturn
meta_probe_source(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)4413 meta_probe_source (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
4414 {
4415   test_data *td = user_data;
4416   meta_master_data *d = td->md;
4417   GstBuffer *buffer;
4418   GstProtectionMeta *meta;
4419 
4420   if (GST_IS_BUFFER (info->data)) {
4421     buffer = GST_BUFFER (info->data);
4422     meta =
4423         gst_buffer_add_protection_meta (buffer, gst_structure_new ("name",
4424             "somefield", G_TYPE_INT, 42, NULL));
4425     FAIL_UNLESS (meta);
4426     d->meta_sent[TEST_META_PROTECTION] = TRUE;
4427   }
4428   return GST_PAD_PROBE_OK;
4429 }
4430 
4431 static void
hook_meta_probe_source(const GValue * v,gpointer user_data)4432 hook_meta_probe_source (const GValue * v, gpointer user_data)
4433 {
4434   hook_probe (v, meta_probe_source, user_data);
4435 }
4436 
4437 static void
meta_source(GstElement * source,gpointer user_data)4438 meta_source (GstElement * source, gpointer user_data)
4439 {
4440   GstIterator *it;
4441   GstStateChangeReturn ret;
4442 
4443   it = gst_bin_iterate_sinks (GST_BIN (source));
4444   while (gst_iterator_foreach (it, hook_meta_probe_source, user_data))
4445     gst_iterator_resync (it);
4446   gst_iterator_free (it);
4447 
4448   ret = gst_element_set_state (source, GST_STATE_PLAYING);
4449   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC
4450       || ret == GST_STATE_CHANGE_SUCCESS);
4451 
4452   g_timeout_add (STOP_AT, (GSourceFunc) stop_pipeline, gst_object_ref (source));
4453 }
4454 
4455 static gboolean
scan_meta(GstBuffer * buffer,GstMeta ** meta,gpointer user_data)4456 scan_meta (GstBuffer * buffer, GstMeta ** meta, gpointer user_data)
4457 {
4458   test_data *td = user_data;
4459   meta_slave_data *d = td->sd;
4460   int val;
4461   GstStructure *s;
4462   GstProtectionMeta *pmeta;
4463 
4464   if ((*meta)->info->api == GST_PROTECTION_META_API_TYPE) {
4465     pmeta = (GstProtectionMeta *) * meta;
4466     FAIL_UNLESS (GST_IS_STRUCTURE (pmeta->info));
4467     s = GST_STRUCTURE (pmeta->info);
4468     FAIL_UNLESS (!strcmp (gst_structure_get_name (s), "name"));
4469     FAIL_UNLESS (gst_structure_get_int (s, "somefield", &val));
4470     FAIL_UNLESS (val == 42);
4471     d->meta_received[TEST_META_PROTECTION] = TRUE;
4472   }
4473   return TRUE;
4474 }
4475 
4476 static GstPadProbeReturn
meta_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)4477 meta_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
4478 {
4479   if (GST_IS_BUFFER (info->data)) {
4480     gst_buffer_foreach_meta (info->data, scan_meta, user_data);
4481   }
4482   return GST_PAD_PROBE_OK;
4483 }
4484 
4485 static void
hook_meta_probe(const GValue * v,gpointer user_data)4486 hook_meta_probe (const GValue * v, gpointer user_data)
4487 {
4488   hook_probe (v, meta_probe, user_data);
4489 }
4490 
4491 static void
setup_sink_meta(GstElement * sink,gpointer user_data)4492 setup_sink_meta (GstElement * sink, gpointer user_data)
4493 {
4494   GstIterator *it;
4495 
4496   it = gst_bin_iterate_sinks (GST_BIN (sink));
4497   while (gst_iterator_foreach (it, hook_meta_probe, user_data))
4498     gst_iterator_resync (it);
4499   gst_iterator_free (it);
4500 }
4501 
4502 static void
check_success_source_meta(gpointer user_data)4503 check_success_source_meta (gpointer user_data)
4504 {
4505   test_data *td = user_data;
4506   meta_master_data *d = td->md;
4507   size_t n;
4508 
4509   for (n = 0; n < N_TEST_META; ++n)
4510     FAIL_UNLESS (d->meta_sent[n]);
4511 }
4512 
4513 static void
check_success_sink_meta(gpointer user_data)4514 check_success_sink_meta (gpointer user_data)
4515 {
4516   test_data *td = user_data;
4517   meta_slave_data *d = td->sd;
4518   size_t n;
4519 
4520   for (n = 0; n < N_TEST_META; ++n)
4521     FAIL_UNLESS (d->meta_received[n]);
4522 }
4523 
GST_START_TEST(test_empty_meta)4524 GST_START_TEST (test_empty_meta)
4525 {
4526   meta_master_data md = { {0}
4527   };
4528   meta_slave_data sd = { {0}
4529   };
4530 
4531   TEST_BASE (TEST_FEATURE_TEST_SOURCE, meta_source, setup_sink_meta,
4532       check_success_source_meta, check_success_sink_meta, NULL, &md, &sd);
4533 }
4534 
4535 GST_END_TEST;
4536 
GST_START_TEST(test_wavparse_meta)4537 GST_START_TEST (test_wavparse_meta)
4538 {
4539   meta_master_data md = { {0}
4540   };
4541   meta_slave_data sd = { {0}
4542   };
4543 
4544   TEST_BASE (TEST_FEATURE_WAV_SOURCE, meta_source, setup_sink_meta,
4545       check_success_source_meta, check_success_sink_meta, NULL, &md, &sd);
4546 }
4547 
4548 GST_END_TEST;
4549 
GST_START_TEST(test_mpegts_meta)4550 GST_START_TEST (test_mpegts_meta)
4551 {
4552   meta_master_data md = { {0}
4553   };
4554   meta_slave_data sd = { {0}
4555   };
4556 
4557   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, meta_source, setup_sink_meta,
4558       check_success_source_meta, check_success_sink_meta, NULL, &md, &sd);
4559 }
4560 
4561 GST_END_TEST;
4562 
GST_START_TEST(test_mpegts_2_meta)4563 GST_START_TEST (test_mpegts_2_meta)
4564 {
4565   meta_master_data md = { {0}
4566   };
4567   meta_slave_data sd = { {0}
4568   };
4569 
4570   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS, meta_source,
4571       setup_sink_meta, check_success_source_meta, check_success_sink_meta,
4572       NULL, &md, &sd);
4573 }
4574 
4575 GST_END_TEST;
4576 
GST_START_TEST(test_live_a_meta)4577 GST_START_TEST (test_live_a_meta)
4578 {
4579   meta_master_data md = { {0}
4580   };
4581   meta_slave_data sd = { {0}
4582   };
4583 
4584   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, meta_source, setup_sink_meta,
4585       check_success_source_meta, check_success_sink_meta, NULL, &md, &sd);
4586 }
4587 
4588 GST_END_TEST;
4589 
GST_START_TEST(test_live_av_meta)4590 GST_START_TEST (test_live_av_meta)
4591 {
4592   meta_master_data md = { {0}
4593   };
4594   meta_slave_data sd = { {0}
4595   };
4596 
4597   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, meta_source, setup_sink_meta,
4598       check_success_source_meta, check_success_sink_meta, NULL, &md, &sd);
4599 }
4600 
4601 GST_END_TEST;
4602 
GST_START_TEST(test_live_av_2_meta)4603 GST_START_TEST (test_live_av_2_meta)
4604 {
4605   meta_master_data md = { {0}
4606   };
4607   meta_slave_data sd = { {0}
4608   };
4609 
4610   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
4611       meta_source, setup_sink_meta, check_success_source_meta,
4612       check_success_sink_meta, NULL, &md, &sd);
4613 }
4614 
4615 GST_END_TEST;
4616 
4617 /**** source change test ****/
4618 
4619 typedef struct
4620 {
4621   void (*switcher) (GstElement *, char *name);
4622 } source_change_input_data;
4623 
4624 typedef struct
4625 {
4626   gboolean source_change_scheduled;
4627   gboolean source_changed;
4628 } source_change_master_data;
4629 
4630 typedef struct
4631 {
4632   gboolean got_caps[2][2];
4633   gboolean got_buffer[2][2];
4634   GstCaps *caps[2];
4635 } source_change_slave_data;
4636 
4637 static gboolean
stop_source(gpointer user_data)4638 stop_source (gpointer user_data)
4639 {
4640   GstElement *source = user_data;
4641 
4642   FAIL_UNLESS (gst_element_set_state (source,
4643           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
4644   gst_object_unref (source);
4645   return FALSE;
4646 }
4647 
4648 static gboolean
remove_source(gpointer user_data)4649 remove_source (gpointer user_data)
4650 {
4651   GstElement *source = user_data;
4652 
4653   FAIL_UNLESS (gst_element_set_state (source,
4654           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
4655   gst_bin_remove (GST_BIN (GST_ELEMENT_PARENT (source)), source);
4656   return FALSE;
4657 }
4658 
4659 static void
switch_to_aiff(GstElement * pipeline,char * name)4660 switch_to_aiff (GstElement * pipeline, char *name)
4661 {
4662   GstElement *sbin, *filesrc, *ipcpipelinesink;
4663   GError *e = NULL;
4664 
4665   sbin =
4666       gst_parse_bin_from_description ("pushfilesrc name=filesrc ! aiffparse",
4667       TRUE, &e);
4668   FAIL_IF (e || !sbin);
4669   gst_element_set_name (sbin, name);
4670   filesrc = gst_bin_get_by_name (GST_BIN (sbin), "filesrc");
4671   FAIL_UNLESS (filesrc);
4672   g_object_set (filesrc, "location", "../../tests/files/s16be-id3v2.aiff",
4673       NULL);
4674   gst_object_unref (filesrc);
4675   gst_bin_add (GST_BIN (pipeline), sbin);
4676   ipcpipelinesink = gst_bin_get_by_name (GST_BIN (pipeline), "ipcpipelinesink");
4677   FAIL_UNLESS (ipcpipelinesink);
4678   FAIL_UNLESS (gst_element_link (sbin, ipcpipelinesink));
4679   gst_object_unref (ipcpipelinesink);
4680   gst_element_sync_state_with_parent (sbin);
4681   g_free (name);
4682 }
4683 
4684 static void
switch_av(GstElement * pipeline,char * name,gboolean live,gboolean Long)4685 switch_av (GstElement * pipeline, char *name, gboolean live, gboolean Long)
4686 {
4687   GstElement *src, *ipcpipelinesink;
4688   gint L = Long ? 10 : 1;
4689 
4690   if (g_str_has_prefix (name, "videotestsrc")) {
4691     /* replace video source with audio source */
4692     src = gst_element_factory_make ("audiotestsrc", NULL);
4693     FAIL_UNLESS (src);
4694     g_object_set (src, "is-live", live, "num-buffers", live ? 27 * L : -1,
4695         NULL);
4696     gst_bin_add (GST_BIN (pipeline), src);
4697     ipcpipelinesink =
4698         gst_bin_get_by_name (GST_BIN (pipeline), "vipcpipelinesink");
4699     FAIL_UNLESS (ipcpipelinesink);
4700     FAIL_UNLESS (gst_element_link (src, ipcpipelinesink));
4701     gst_object_unref (ipcpipelinesink);
4702     gst_element_sync_state_with_parent (src);
4703   }
4704 
4705   if (g_str_has_prefix (name, "audiotestsrc")) {
4706     /* replace audio source with video source */
4707     src = gst_element_factory_make ("videotestsrc", NULL);
4708     FAIL_UNLESS (src);
4709     g_object_set (src, "is-live", live, "num-buffers", live ? 19 * L : -1,
4710         NULL);
4711     gst_bin_add (GST_BIN (pipeline), src);
4712     ipcpipelinesink =
4713         gst_bin_get_by_name (GST_BIN (pipeline), "aipcpipelinesink");
4714     FAIL_UNLESS (ipcpipelinesink);
4715     FAIL_UNLESS (gst_element_link (src, ipcpipelinesink));
4716     gst_object_unref (ipcpipelinesink);
4717     gst_element_sync_state_with_parent (src);
4718   }
4719 
4720   g_free (name);
4721 }
4722 
4723 static void
switch_live_av(GstElement * pipeline,char * name)4724 switch_live_av (GstElement * pipeline, char *name)
4725 {
4726   switch_av (pipeline, name, TRUE, FALSE);
4727 }
4728 
4729 static GstPadProbeReturn
change_source_blocked(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)4730 change_source_blocked (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
4731 {
4732   test_data *td = user_data;
4733   const source_change_input_data *i = td->id;
4734   source_change_master_data *d = td->md;
4735   GstElement *source;
4736   GstPad *peer;
4737 
4738   peer = gst_pad_get_peer (pad);
4739   FAIL_UNLESS (peer);
4740   FAIL_UNLESS (gst_pad_unlink (pad, peer));
4741   gst_object_unref (peer);
4742 
4743   source = GST_ELEMENT (gst_element_get_parent (pad));
4744   FAIL_UNLESS (source);
4745   g_object_set_qdata (G_OBJECT (source), to_be_removed_quark (),
4746       GINT_TO_POINTER (1));
4747 
4748   gst_bin_remove (GST_BIN (GST_ELEMENT_PARENT (source)), source);
4749   (*i->switcher) (td->p, gst_element_get_name (source));
4750 
4751   g_idle_add (stop_source, source);
4752 
4753   d->source_changed = TRUE;
4754 
4755   gst_object_unref (td->p);
4756   return GST_PAD_PROBE_REMOVE;
4757 }
4758 
4759 static gboolean
change_source(gpointer user_data)4760 change_source (gpointer user_data)
4761 {
4762   test_data *td = user_data;
4763   GstElement *source;
4764   GstPad *pad;
4765   static const char *const names[] =
4766       { "source", "audiotestsrc", "videotestsrc" };
4767   gboolean found = FALSE;
4768   size_t n;
4769 
4770   for (n = 0; n < G_N_ELEMENTS (names); ++n) {
4771     source = gst_bin_get_by_name (GST_BIN (td->p), names[n]);
4772     if (source) {
4773       found = TRUE;
4774       pad = gst_element_get_static_pad (source, "src");
4775       FAIL_UNLESS (pad);
4776       gst_object_ref (td->p);
4777       gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_IDLE, change_source_blocked,
4778           user_data, NULL);
4779       gst_object_unref (pad);
4780       gst_object_unref (source);
4781     }
4782   }
4783   FAIL_UNLESS (found);
4784 
4785   gst_object_unref (td->p);
4786   return G_SOURCE_REMOVE;
4787 }
4788 
4789 static void
source_change_on_state_changed(gpointer user_data)4790 source_change_on_state_changed (gpointer user_data)
4791 {
4792   test_data *td = user_data;
4793   source_change_master_data *d = td->md;
4794 
4795   if (!d->source_change_scheduled) {
4796     d->source_change_scheduled = TRUE;
4797     gst_object_ref (td->p);
4798     g_timeout_add (STEP_AT, change_source, td);
4799   }
4800 }
4801 
4802 static void
source_change_source(GstElement * source,gpointer user_data)4803 source_change_source (GstElement * source, gpointer user_data)
4804 {
4805   test_data *td = user_data;
4806   GstStateChangeReturn ret;
4807 
4808   td->state_target = GST_STATE_PLAYING;
4809   td->state_changed_cb = source_change_on_state_changed;
4810   ret = gst_element_set_state (source, GST_STATE_PLAYING);
4811   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC
4812       || ret == GST_STATE_CHANGE_SUCCESS);
4813 }
4814 
4815 static int
scppad2idx(GstPad * pad,gboolean two_streams,GstCaps * newcaps)4816 scppad2idx (GstPad * pad, gboolean two_streams, GstCaps * newcaps)
4817 {
4818   static GQuark scpidx = 0;
4819   gpointer p;
4820   int idx;
4821   GstCaps *caps;
4822 
4823   if (!scpidx)
4824     scpidx = g_quark_from_static_string ("scpidx");
4825 
4826   if (!two_streams)
4827     return 0;
4828 
4829   p = g_object_get_qdata (G_OBJECT (pad), scpidx);
4830   if (p)
4831     return GPOINTER_TO_INT (p) - 1;
4832 
4833   caps = gst_pad_get_current_caps (pad);
4834   if (!caps)
4835     caps = gst_pad_get_pad_template_caps (pad);
4836   if ((!caps || gst_caps_is_any (caps)) && newcaps)
4837     caps = gst_caps_ref (newcaps);
4838   FAIL_UNLESS (caps);
4839   idx = caps2idx (caps, two_streams);
4840   gst_caps_unref (caps);
4841   g_object_set_qdata (G_OBJECT (pad), scpidx, GINT_TO_POINTER (idx + 1));
4842   return idx;
4843 }
4844 
4845 static GstPadProbeReturn
source_change_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)4846 source_change_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
4847 {
4848   test_data *td = user_data;
4849   source_change_slave_data *d = td->sd;
4850   GstCaps *caps;
4851   int idx;
4852 
4853   if (GST_IS_BUFFER (info->data)) {
4854     idx = scppad2idx (pad, td->two_streams, NULL);
4855     if (d->got_caps[idx][1])
4856       d->got_buffer[idx][1] = TRUE;
4857     else if (d->got_caps[idx][0])
4858       d->got_buffer[idx][0] = TRUE;
4859   } else if (GST_IS_EVENT (info->data)) {
4860     if (GST_EVENT_TYPE (info->data) == GST_EVENT_CAPS) {
4861       gst_event_parse_caps (info->data, &caps);
4862       idx = scppad2idx (pad, td->two_streams, caps);
4863       if (!d->got_caps[idx][0]) {
4864         FAIL_IF (d->caps[idx]);
4865         d->got_caps[idx][0] = TRUE;
4866         d->caps[idx] = gst_caps_ref (caps);
4867       } else {
4868         FAIL_UNLESS (d->caps);
4869         if (gst_caps_is_equal (caps, d->caps[idx])) {
4870           FAIL ();
4871         } else {
4872           gst_caps_replace (&d->caps[idx], NULL);
4873           d->got_caps[idx][1] = TRUE;
4874         }
4875       }
4876     }
4877   }
4878   return GST_PAD_PROBE_OK;
4879 }
4880 
4881 static void
hook_source_change_probe(const GValue * v,gpointer user_data)4882 hook_source_change_probe (const GValue * v, gpointer user_data)
4883 {
4884   hook_probe (v, source_change_probe, user_data);
4885 }
4886 
4887 static void
setup_sink_source_change(GstElement * sink,gpointer user_data)4888 setup_sink_source_change (GstElement * sink, gpointer user_data)
4889 {
4890   GstIterator *it;
4891 
4892   it = gst_bin_iterate_sinks (GST_BIN (sink));
4893   while (gst_iterator_foreach (it, hook_source_change_probe, user_data))
4894     gst_iterator_resync (it);
4895   gst_iterator_free (it);
4896 }
4897 
4898 static void
check_success_source_source_change(gpointer user_data)4899 check_success_source_source_change (gpointer user_data)
4900 {
4901   test_data *td = user_data;
4902   source_change_master_data *d = td->md;
4903 
4904   FAIL_UNLESS (d->source_change_scheduled);
4905   FAIL_UNLESS (d->source_changed);
4906 }
4907 
4908 static void
check_success_sink_source_change(gpointer user_data)4909 check_success_sink_source_change (gpointer user_data)
4910 {
4911   test_data *td = user_data;
4912   source_change_slave_data *d = td->sd;
4913   int idx;
4914 
4915   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
4916     FAIL_UNLESS (d->got_caps[idx][0]);
4917     FAIL_UNLESS (d->got_buffer[idx][0]);
4918     FAIL_UNLESS (d->got_caps[idx][1]);
4919     FAIL_UNLESS (d->got_buffer[idx][1]);
4920   }
4921 }
4922 
GST_START_TEST(test_non_live_source_change)4923 GST_START_TEST (test_non_live_source_change)
4924 {
4925   source_change_input_data id = { switch_to_aiff };
4926   source_change_master_data md = { 0 };
4927   source_change_slave_data sd = { {{0}
4928       }
4929   };
4930 
4931   TEST_BASE (TEST_FEATURE_WAV_SOURCE, source_change_source,
4932       setup_sink_source_change, check_success_source_source_change,
4933       check_success_sink_source_change, &id, &md, &sd);
4934 }
4935 
4936 GST_END_TEST;
4937 
GST_START_TEST(test_live_av_source_change)4938 GST_START_TEST (test_live_av_source_change)
4939 {
4940   source_change_input_data id = { switch_live_av };
4941   source_change_master_data md = { 0 };
4942   source_change_slave_data sd = { {{0}
4943       }
4944   };
4945 
4946   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, source_change_source,
4947       setup_sink_source_change, check_success_source_source_change,
4948       check_success_sink_source_change, &id, &md, &sd);
4949 }
4950 
4951 GST_END_TEST;
4952 
GST_START_TEST(test_live_av_2_source_change)4953 GST_START_TEST (test_live_av_2_source_change)
4954 {
4955   source_change_input_data id = { switch_live_av };
4956   source_change_master_data md = { 0 };
4957   source_change_slave_data sd = { {{0}
4958       }
4959   };
4960 
4961   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
4962       source_change_source, setup_sink_source_change,
4963       check_success_source_source_change, check_success_sink_source_change,
4964       &id, &md, &sd);
4965 }
4966 
4967 GST_END_TEST;
4968 
4969 /**** dynamic pipeline change stress test ****/
4970 
4971 typedef struct
4972 {
4973   guint n_switches_0;
4974   void (*switcher0) (test_data *);
4975   guint n_switches_1;
4976   void (*switcher1) (test_data *);
4977 } dynamic_pipeline_change_stress_input_data;
4978 
4979 typedef struct
4980 {
4981   GMutex mutex;
4982   GCond cond;
4983   guint n_blocks_left;
4984   guint n_blocks_done;
4985   gboolean adding_probes;
4986   gboolean dynamic_pipeline_change_stress_scheduled;
4987 } dynamic_pipeline_change_stress_master_data;
4988 
4989 static gboolean dynamic_pipeline_change_stress_step (gpointer user_data);
4990 
4991 static GstPadProbeReturn
dynamic_pipeline_change_stress_source_blocked_switch_av(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)4992 dynamic_pipeline_change_stress_source_blocked_switch_av (GstPad * pad,
4993     GstPadProbeInfo * info, gpointer user_data)
4994 {
4995   test_data *td = user_data;
4996   dynamic_pipeline_change_stress_master_data *d = td->md;
4997   GstElement *source;
4998   GstPad *peer;
4999 
5000   /* An idle pad probe could be called directly from the gst_pad_add_probe call
5001      if the pad happens to be idle right now. This would deadlock us though, as
5002      we need all pads to be blocked at the same time, so we need the iteration
5003      over all pads to be done before the pad probes execute. So we keep track of
5004      whether we're iterating to add the probes, and pass if so. */
5005   if (d->adding_probes) {
5006     return GST_PAD_PROBE_PASS;
5007   }
5008 
5009   peer = gst_pad_get_peer (pad);
5010   FAIL_UNLESS (peer);
5011   FAIL_UNLESS (gst_pad_unlink (pad, peer));
5012   gst_object_unref (peer);
5013 
5014   source = GST_ELEMENT (gst_element_get_parent (pad));
5015   FAIL_UNLESS (source);
5016   g_object_set_qdata (G_OBJECT (source), to_be_removed_quark (),
5017       GINT_TO_POINTER (1));
5018 
5019   /* we want all pads to be blocked before we proceed */
5020   g_mutex_lock (&d->mutex);
5021   d->n_blocks_left--;
5022   while (d->n_blocks_left > 0)
5023     g_cond_wait (&d->cond, &d->mutex);
5024   g_mutex_unlock (&d->mutex);
5025   g_cond_broadcast (&d->cond);
5026 
5027   g_mutex_lock (&d->mutex);
5028   switch_av (td->p, gst_element_get_name (source),
5029       ! !(td->features & TEST_FEATURE_LIVE), TRUE);
5030   g_mutex_unlock (&d->mutex);
5031 
5032   g_idle_add_full (G_PRIORITY_HIGH, remove_source, source, g_object_unref);
5033 
5034   if (g_atomic_int_dec_and_test (&d->n_blocks_done))
5035     g_timeout_add (STEP_AT, dynamic_pipeline_change_stress_step, td);
5036 
5037   return GST_PAD_PROBE_REMOVE;
5038 }
5039 
5040 static void
change_audio_channel(GstElement * pipeline,char * name,const char * ipcpipelinesink_name,gboolean live)5041 change_audio_channel (GstElement * pipeline, char *name,
5042     const char *ipcpipelinesink_name, gboolean live)
5043 {
5044   GstElement *src, *ipcpipelinesink;
5045 
5046   /* replace audio source with video source */
5047   src = gst_element_factory_make ("audiotestsrc", NULL);
5048   FAIL_UNLESS (src);
5049   g_object_set (src, "is-live", live, "num-buffers", live ? 190 : -1, NULL);
5050 
5051   gst_bin_add (GST_BIN (pipeline), src);
5052   ipcpipelinesink =
5053       gst_bin_get_by_name (GST_BIN (pipeline), ipcpipelinesink_name);
5054   FAIL_UNLESS (ipcpipelinesink);
5055   FAIL_UNLESS (gst_element_link (src, ipcpipelinesink));
5056   gst_object_unref (ipcpipelinesink);
5057   gst_element_sync_state_with_parent (src);
5058 
5059   g_free (name);
5060 }
5061 
5062 static GstPadProbeReturn
dynamic_pipeline_change_stress_source_blocked_change_audio_channel(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)5063 dynamic_pipeline_change_stress_source_blocked_change_audio_channel (GstPad *
5064     pad, GstPadProbeInfo * info, gpointer user_data)
5065 {
5066   test_data *td = user_data;
5067   dynamic_pipeline_change_stress_master_data *d = td->md;
5068   GstElement *source;
5069   GstPad *peer;
5070   const char *ipcpipelinesink_name;
5071 
5072   /* An idle pad probe could be called directly from the gst_pad_add_probe call
5073      if the pad happens to be idle right now. This would deadlock us though, as
5074      we need all pads to be blocked at the same time, so we need the iteration
5075      over all pads to be done before the pad probes execute. So we keep track of
5076      whether we're iterating to add the probes, and pass if so. */
5077   if (d->adding_probes) {
5078     return GST_PAD_PROBE_PASS;
5079   }
5080 
5081   peer = gst_pad_get_peer (pad);
5082   FAIL_UNLESS (peer);
5083   ipcpipelinesink_name = GST_ELEMENT_NAME (GST_PAD_PARENT (peer));
5084   FAIL_UNLESS (gst_pad_unlink (pad, peer));
5085   gst_object_unref (peer);
5086 
5087   source = GST_ELEMENT (gst_element_get_parent (pad));
5088   FAIL_UNLESS (source);
5089   g_object_set_qdata (G_OBJECT (source), to_be_removed_quark (),
5090       GINT_TO_POINTER (1));
5091 
5092   /* we want all pads to be blocked before we proceed */
5093   g_mutex_lock (&d->mutex);
5094   d->n_blocks_left--;
5095   while (d->n_blocks_left > 0)
5096     g_cond_wait (&d->cond, &d->mutex);
5097   g_cond_broadcast (&d->cond);
5098   g_mutex_unlock (&d->mutex);
5099 
5100   g_mutex_lock (&d->mutex);
5101   change_audio_channel (td->p, gst_element_get_name (source),
5102       ipcpipelinesink_name, ! !(td->features & TEST_FEATURE_LIVE));
5103   g_mutex_unlock (&d->mutex);
5104 
5105   g_idle_add_full (G_PRIORITY_HIGH, remove_source, source, g_object_unref);
5106 
5107   if (g_atomic_int_dec_and_test (&d->n_blocks_done))
5108     g_timeout_add (STEP_AT, dynamic_pipeline_change_stress_step, td);
5109 
5110   return GST_PAD_PROBE_REMOVE;
5111 }
5112 
5113 typedef struct
5114 {
5115   const char *const *names;
5116   size_t n_names;
5117     GstPadProbeReturn (*f) (GstPad * pad, GstPadProbeInfo * info,
5118       gpointer user_data);
5119   test_data *td;
5120 } block_if_named_data;
5121 
5122 static void
block_if_named(const GValue * v,gpointer user_data)5123 block_if_named (const GValue * v, gpointer user_data)
5124 {
5125   block_if_named_data *bind = user_data;
5126   GstElement *e;
5127   GstPad *pad;
5128   size_t n;
5129 
5130   e = g_value_get_object (v);
5131   FAIL_UNLESS (e);
5132   for (n = 0; n < bind->n_names; ++n) {
5133     if (g_str_has_prefix (GST_ELEMENT_NAME (e), bind->names[n])) {
5134       pad = gst_element_get_static_pad (e, "src");
5135       FAIL_UNLESS (pad);
5136 
5137       if (!g_object_get_qdata (G_OBJECT (e), to_be_removed_quark ()))
5138         gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_IDLE, bind->f, bind->td,
5139             NULL);
5140       gst_object_unref (pad);
5141     }
5142   }
5143 }
5144 
5145 static void
count_audio_sources(const GValue * v,gpointer user_data)5146 count_audio_sources (const GValue * v, gpointer user_data)
5147 {
5148   GstElement *e;
5149 
5150   e = g_value_get_object (v);
5151   FAIL_UNLESS (e);
5152 
5153   // we don't want to count the sources that are in the process
5154   // of being removed asynchronously
5155   if (g_object_get_qdata (G_OBJECT (e), to_be_removed_quark ()))
5156     return;
5157 
5158   if (g_str_has_prefix (GST_ELEMENT_NAME (e), "audiotestsrc"))
5159     ++ * (guint *) user_data;
5160 }
5161 
5162 static void
dynamic_pipeline_change_stress_swap_source(test_data * td)5163 dynamic_pipeline_change_stress_swap_source (test_data * td)
5164 {
5165   dynamic_pipeline_change_stress_master_data *d = td->md;
5166   static const char *const names[] =
5167       { "source", "audiotestsrc", "videotestsrc" };
5168   block_if_named_data bind = { names, sizeof (names) / sizeof (names[0]),
5169     dynamic_pipeline_change_stress_source_blocked_switch_av, td
5170   };
5171   GstIterator *it;
5172 
5173   /* we have two sources, we need to wait for both */
5174   d->n_blocks_left = d->n_blocks_done = 2;
5175 
5176   it = gst_bin_iterate_sources (GST_BIN (td->p));
5177   d->adding_probes = TRUE;
5178   while (gst_iterator_foreach (it, block_if_named, &bind)) {
5179     GST_INFO_OBJECT (td->p, "Resync");
5180     gst_iterator_resync (it);
5181   }
5182   d->adding_probes = FALSE;
5183   gst_iterator_free (it);
5184 }
5185 
5186 static void
dynamic_pipeline_change_stress_change_audio_channel(test_data * td)5187 dynamic_pipeline_change_stress_change_audio_channel (test_data * td)
5188 {
5189   dynamic_pipeline_change_stress_master_data *d = td->md;
5190   static const char *const names[] = { "audiotestsrc" };
5191   block_if_named_data bind = { names, sizeof (names) / sizeof (names[0]),
5192     dynamic_pipeline_change_stress_source_blocked_change_audio_channel, td
5193   };
5194   GstIterator *it;
5195   guint audio_sources;
5196 
5197   /* we have either zero or one audio source */
5198   it = gst_bin_iterate_sources (GST_BIN (td->p));
5199   audio_sources = 0;
5200   while (gst_iterator_foreach (it, count_audio_sources, &audio_sources)) {
5201     GST_INFO_OBJECT (td->p, "Resync");
5202     gst_iterator_resync (it);
5203   }
5204   gst_iterator_free (it);
5205   d->n_blocks_left = d->n_blocks_done = audio_sources;
5206 
5207   it = gst_bin_iterate_sources (GST_BIN (td->p));
5208   d->adding_probes = TRUE;
5209   while (gst_iterator_foreach (it, block_if_named, &bind)) {
5210     GST_INFO_OBJECT (td->p, "Resync");
5211     gst_iterator_resync (it);
5212   }
5213   d->adding_probes = FALSE;
5214   gst_iterator_free (it);
5215 }
5216 
5217 static gboolean
dynamic_pipeline_change_stress_step(gpointer user_data)5218 dynamic_pipeline_change_stress_step (gpointer user_data)
5219 {
5220   test_data *td = user_data;
5221   dynamic_pipeline_change_stress_input_data *i = td->id;
5222   guint available, idx;
5223 
5224   /* pick a random action among the ones we have left */
5225   available = i->n_switches_0 + i->n_switches_1;
5226   if (available == 0) {
5227     GST_INFO_OBJECT (td->p, "Destroying pipeline");
5228     FAIL_UNLESS (gst_element_set_state (td->p,
5229             GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE);
5230     g_timeout_add (STEP_AT, stop_pipeline, td->p);
5231     return G_SOURCE_REMOVE;
5232   }
5233 
5234   idx = rand () % available;
5235   if (idx < i->n_switches_0) {
5236     (*i->switcher0) (td);
5237     --i->n_switches_0;
5238     return G_SOURCE_REMOVE;
5239   }
5240   idx -= i->n_switches_0;
5241 
5242   if (idx < i->n_switches_1) {
5243     (*i->switcher1) (td);
5244     --i->n_switches_1;
5245     return G_SOURCE_REMOVE;
5246   }
5247   idx -= i->n_switches_1;
5248 
5249   return G_SOURCE_REMOVE;
5250 }
5251 
5252 static void
dynamic_pipeline_change_stress_on_state_changed(gpointer user_data)5253 dynamic_pipeline_change_stress_on_state_changed (gpointer user_data)
5254 {
5255   test_data *td = user_data;
5256   dynamic_pipeline_change_stress_master_data *d = td->md;
5257 
5258   if (!d->dynamic_pipeline_change_stress_scheduled) {
5259     d->dynamic_pipeline_change_stress_scheduled = TRUE;
5260     gst_object_ref (td->p);
5261     g_timeout_add (STEP_AT, dynamic_pipeline_change_stress_step, td);
5262   }
5263 }
5264 
5265 static void
dynamic_pipeline_change_stress(GstElement * source,gpointer user_data)5266 dynamic_pipeline_change_stress (GstElement * source, gpointer user_data)
5267 {
5268   test_data *td = user_data;
5269   dynamic_pipeline_change_stress_master_data *d = td->md;
5270   GstStateChangeReturn ret;
5271 
5272   g_mutex_init (&d->mutex);
5273   g_cond_init (&d->cond);
5274 
5275   td->state_target = GST_STATE_PLAYING;
5276   td->state_changed_cb = dynamic_pipeline_change_stress_on_state_changed;
5277   ret = gst_element_set_state (source, GST_STATE_PLAYING);
5278   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC
5279       || ret == GST_STATE_CHANGE_SUCCESS);
5280 }
5281 
5282 static void
check_success_source_dynamic_pipeline_change_stress(gpointer user_data)5283 check_success_source_dynamic_pipeline_change_stress (gpointer user_data)
5284 {
5285   test_data *td = user_data;
5286   dynamic_pipeline_change_stress_input_data *i = td->id;
5287   dynamic_pipeline_change_stress_master_data *d = td->md;
5288 
5289   FAIL_UNLESS (d->dynamic_pipeline_change_stress_scheduled);
5290   FAIL_UNLESS_EQUALS_INT (i->n_switches_0, 0);
5291   FAIL_UNLESS_EQUALS_INT (i->n_switches_1, 0);
5292 
5293   g_cond_clear (&d->cond);
5294   g_mutex_clear (&d->mutex);
5295 }
5296 
GST_START_TEST(test_non_live_av_dynamic_pipeline_change_stress)5297 GST_START_TEST (test_non_live_av_dynamic_pipeline_change_stress)
5298 {
5299   dynamic_pipeline_change_stress_input_data id = { 100,
5300     dynamic_pipeline_change_stress_swap_source, 100,
5301     dynamic_pipeline_change_stress_change_audio_channel
5302   };
5303   dynamic_pipeline_change_stress_master_data md = { {0} };
5304 
5305   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_HAS_VIDEO,
5306       dynamic_pipeline_change_stress, NULL,
5307       check_success_source_dynamic_pipeline_change_stress, NULL, &id, &md,
5308       NULL);
5309 }
5310 
5311 GST_END_TEST;
5312 
GST_START_TEST(test_non_live_av_2_dynamic_pipeline_change_stress)5313 GST_START_TEST (test_non_live_av_2_dynamic_pipeline_change_stress)
5314 {
5315   dynamic_pipeline_change_stress_input_data id = { 100,
5316     dynamic_pipeline_change_stress_swap_source, 100,
5317     dynamic_pipeline_change_stress_change_audio_channel
5318   };
5319   dynamic_pipeline_change_stress_master_data md = { {0} };
5320 
5321   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_HAS_VIDEO |
5322       TEST_FEATURE_SPLIT_SINKS, dynamic_pipeline_change_stress, NULL,
5323       check_success_source_dynamic_pipeline_change_stress, NULL, &id, &md,
5324       NULL);
5325 }
5326 
5327 GST_END_TEST;
5328 
GST_START_TEST(test_live_av_dynamic_pipeline_change_stress)5329 GST_START_TEST (test_live_av_dynamic_pipeline_change_stress)
5330 {
5331   dynamic_pipeline_change_stress_input_data id = { 100,
5332     dynamic_pipeline_change_stress_swap_source, 100,
5333     dynamic_pipeline_change_stress_change_audio_channel
5334   };
5335   dynamic_pipeline_change_stress_master_data md = { {0} };
5336 
5337   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, dynamic_pipeline_change_stress, NULL,
5338       check_success_source_dynamic_pipeline_change_stress, NULL, &id, &md,
5339       NULL);
5340 }
5341 
5342 GST_END_TEST;
5343 
GST_START_TEST(test_live_av_2_dynamic_pipeline_change_stress)5344 GST_START_TEST (test_live_av_2_dynamic_pipeline_change_stress)
5345 {
5346   dynamic_pipeline_change_stress_input_data id = { 100,
5347     dynamic_pipeline_change_stress_swap_source, 100,
5348     dynamic_pipeline_change_stress_change_audio_channel
5349   };
5350   dynamic_pipeline_change_stress_master_data md = { {0} };
5351 
5352   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
5353       dynamic_pipeline_change_stress, NULL,
5354       check_success_source_dynamic_pipeline_change_stress, NULL, &id, &md,
5355       NULL);
5356 }
5357 
5358 GST_END_TEST;
5359 
5360 /**** error from slave test ****/
5361 
5362 typedef struct
5363 {
5364   gboolean crash;
5365 } error_from_slave_input_data;
5366 
5367 typedef struct
5368 {
5369   gboolean second_pass;
5370   gboolean got_state_changed_to_playing_on_first_pass;
5371   gboolean got_error_on_first_pass;
5372   gboolean got_state_changed_to_playing_on_second_pass;
5373   gboolean got_error_on_second_pass;
5374 } error_from_slave_master_data;
5375 
5376 static gboolean
bump_through_NULL(gpointer user_data)5377 bump_through_NULL (gpointer user_data)
5378 {
5379   test_data *td = user_data;
5380   error_from_slave_input_data *i = td->id;
5381   error_from_slave_master_data *d = td->md;
5382   GstStateChangeReturn ret;
5383   GstElement *sink;
5384 
5385   ret = gst_element_set_state (td->p, GST_STATE_NULL);
5386   if (!i->crash) {
5387     FAIL_UNLESS (ret == GST_STATE_CHANGE_SUCCESS);
5388   }
5389   FAIL_UNLESS (gst_element_get_state (td->p, NULL, NULL,
5390           GST_CLOCK_TIME_NONE) == GST_STATE_CHANGE_SUCCESS);
5391 
5392   d->second_pass = TRUE;
5393 
5394   if (i->crash) {
5395     recreate_crashed_slave_process ();
5396     /* give the process time to be created in the other process */
5397     g_usleep (500 * 1000);
5398 
5399     /* reconnect to to slave process */
5400     sink = gst_bin_get_by_name (GST_BIN (td->p), "ipcpipelinesink");
5401     FAIL_UNLESS (sink);
5402     g_object_set (sink, "fdin", pipesba[0], "fdout", pipesfa[1], NULL);
5403     gst_object_unref (sink);
5404   }
5405 
5406   ret = gst_element_set_state (td->p, GST_STATE_PLAYING);
5407   FAIL_UNLESS (ret == GST_STATE_CHANGE_SUCCESS
5408       || ret == GST_STATE_CHANGE_ASYNC);
5409 
5410   g_timeout_add (STOP_AT, (GSourceFunc) stop_pipeline, td->p);
5411   return G_SOURCE_REMOVE;
5412 }
5413 
5414 static void
disconnect(const GValue * v,gpointer user_data)5415 disconnect (const GValue * v, gpointer user_data)
5416 {
5417   GstElement *e;
5418 
5419   e = g_value_get_object (v);
5420   FAIL_UNLESS (e);
5421   g_signal_emit_by_name (G_OBJECT (e), "disconnect", NULL);
5422 }
5423 
5424 static gboolean
error_from_slave_source_bus_msg(GstBus * bus,GstMessage * message,gpointer user_data)5425 error_from_slave_source_bus_msg (GstBus * bus, GstMessage * message,
5426     gpointer user_data)
5427 {
5428   test_data *td = user_data;
5429   error_from_slave_input_data *i = td->id;
5430   error_from_slave_master_data *d = td->md;
5431 
5432   if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) {
5433     if (!d->second_pass) {
5434       if (!d->got_error_on_first_pass) {
5435         GstIterator *it;
5436 
5437         d->got_error_on_first_pass = TRUE;
5438 
5439         if (i->crash) {
5440           it = gst_bin_iterate_sinks (GST_BIN (td->p));
5441           while (gst_iterator_foreach (it, disconnect, NULL))
5442             gst_iterator_resync (it);
5443           gst_iterator_free (it);
5444         }
5445 
5446         gst_object_ref (td->p);
5447         g_timeout_add (STEP_AT, bump_through_NULL, td);
5448       }
5449 
5450       /* don't pass the expected error */
5451       return TRUE;
5452     }
5453   } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS) {
5454     if (!d->second_pass) {
5455       /* We'll get an expected EOS as the source reacts to the error */
5456       return TRUE;
5457     }
5458   }
5459   return master_bus_msg (bus, message, user_data);
5460 }
5461 
5462 static void
error_from_slave_on_state_changed(gpointer user_data)5463 error_from_slave_on_state_changed (gpointer user_data)
5464 {
5465   test_data *td = user_data;
5466   error_from_slave_master_data *d = td->md;
5467 
5468   if (d->second_pass)
5469     d->got_state_changed_to_playing_on_second_pass = TRUE;
5470   else
5471     d->got_state_changed_to_playing_on_first_pass = TRUE;
5472 }
5473 
5474 static gboolean
error_from_slave_position_getter(GstElement * element)5475 error_from_slave_position_getter (GstElement * element)
5476 {
5477   gint64 pos;
5478 
5479   /* we do not care about the result */
5480   gst_element_query_position (element, GST_FORMAT_TIME, &pos);
5481 
5482   return TRUE;
5483 }
5484 
5485 static void
error_from_slave_source(GstElement * source,gpointer user_data)5486 error_from_slave_source (GstElement * source, gpointer user_data)
5487 {
5488   test_data *td = user_data;
5489   GstStateChangeReturn ret;
5490 
5491   /* we're on the source, there's already the basic master_bus_msg watch,
5492      and gst doesn't want more than one watch, so we remove the watch and
5493      call it directly when done in the new watch */
5494   gst_bus_remove_watch (GST_ELEMENT_BUS (source));
5495   gst_bus_add_watch (GST_ELEMENT_BUS (source), error_from_slave_source_bus_msg,
5496       user_data);
5497   g_timeout_add (STEP_AT, (GSourceFunc) error_from_slave_position_getter,
5498       source);
5499 
5500   td->state_changed_cb = error_from_slave_on_state_changed;
5501   td->state_target = GST_STATE_PLAYING;
5502   ret = gst_element_set_state (source, GST_STATE_PLAYING);
5503   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
5504 }
5505 
5506 static gboolean
error_from_slave_sink_bus_msg(GstBus * bus,GstMessage * message,gpointer user_data)5507 error_from_slave_sink_bus_msg (GstBus * bus, GstMessage * message,
5508     gpointer user_data)
5509 {
5510   test_data *td = user_data;
5511   error_from_slave_input_data *i = td->id;
5512 
5513   switch (GST_MESSAGE_TYPE (message)) {
5514     case GST_MESSAGE_ERROR:
5515       if (!strcmp (GST_ELEMENT_NAME (GST_MESSAGE_SRC (message)),
5516               "error-element"))
5517         g_object_set (GST_MESSAGE_SRC (message), "error-after", -1, NULL);
5518       break;
5519     case GST_MESSAGE_ASYNC_DONE:
5520       if (GST_IS_PIPELINE (GST_MESSAGE_SRC (message))) {
5521         /* We have two identical processes, and only one must crash. They can
5522            be distinguished by recovery_pid, however. */
5523         if (i->crash && recovery_pid)
5524           g_timeout_add (CRASH_AT, (GSourceFunc) crash, NULL);
5525       }
5526       break;
5527     default:
5528       break;
5529   }
5530   return TRUE;
5531 }
5532 
5533 static void
setup_sink_error_from_slave(GstElement * sink,gpointer user_data)5534 setup_sink_error_from_slave (GstElement * sink, gpointer user_data)
5535 {
5536   gst_bus_add_watch (GST_ELEMENT_BUS (sink), error_from_slave_sink_bus_msg,
5537       user_data);
5538 }
5539 
5540 static void
check_success_source_error_from_slave(gpointer user_data)5541 check_success_source_error_from_slave (gpointer user_data)
5542 {
5543   test_data *td = user_data;
5544   error_from_slave_master_data *d = td->md;
5545 
5546   FAIL_UNLESS (d->second_pass);
5547   FAIL_UNLESS (d->got_state_changed_to_playing_on_first_pass);
5548   FAIL_UNLESS (d->got_state_changed_to_playing_on_second_pass);
5549   FAIL_UNLESS (d->got_error_on_first_pass);
5550   FAIL_IF (d->got_error_on_second_pass);
5551 }
5552 
GST_START_TEST(test_empty_error_from_slave)5553 GST_START_TEST (test_empty_error_from_slave)
5554 {
5555   error_from_slave_input_data id = { FALSE };
5556   error_from_slave_master_data md = { 0 };
5557 
5558   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_ERROR_SINK,
5559       error_from_slave_source, setup_sink_error_from_slave,
5560       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5561 }
5562 
5563 GST_END_TEST;
5564 
GST_START_TEST(test_wavparse_error_from_slave)5565 GST_START_TEST (test_wavparse_error_from_slave)
5566 {
5567   error_from_slave_input_data id = { FALSE };
5568   error_from_slave_master_data md = { 0 };
5569 
5570   TEST_BASE (TEST_FEATURE_WAV_SOURCE | TEST_FEATURE_ERROR_SINK,
5571       error_from_slave_source, setup_sink_error_from_slave,
5572       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5573 }
5574 
5575 GST_END_TEST;
5576 
GST_START_TEST(test_mpegts_error_from_slave)5577 GST_START_TEST (test_mpegts_error_from_slave)
5578 {
5579   error_from_slave_input_data id = { FALSE };
5580   error_from_slave_master_data md = { 0 };
5581 
5582   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_ERROR_SINK,
5583       error_from_slave_source, setup_sink_error_from_slave,
5584       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5585 }
5586 
5587 GST_END_TEST;
5588 
GST_START_TEST(test_mpegts_2_error_from_slave)5589 GST_START_TEST (test_mpegts_2_error_from_slave)
5590 {
5591   error_from_slave_input_data id = { FALSE };
5592   error_from_slave_master_data md = { 0 };
5593 
5594   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_ERROR_SINK |
5595       TEST_FEATURE_SPLIT_SINKS,
5596       error_from_slave_source, setup_sink_error_from_slave,
5597       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5598 }
5599 
5600 GST_END_TEST;
5601 
GST_START_TEST(test_live_a_error_from_slave)5602 GST_START_TEST (test_live_a_error_from_slave)
5603 {
5604   error_from_slave_input_data id = { FALSE };
5605   error_from_slave_master_data md = { 0 };
5606 
5607   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE | TEST_FEATURE_ERROR_SINK,
5608       error_from_slave_source, setup_sink_error_from_slave,
5609       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5610 }
5611 
5612 GST_END_TEST;
5613 
GST_START_TEST(test_live_av_error_from_slave)5614 GST_START_TEST (test_live_av_error_from_slave)
5615 {
5616   error_from_slave_input_data id = { FALSE };
5617   error_from_slave_master_data md = { 0 };
5618 
5619   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_ERROR_SINK,
5620       error_from_slave_source, setup_sink_error_from_slave,
5621       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5622 }
5623 
5624 GST_END_TEST;
5625 
GST_START_TEST(test_live_av_2_error_from_slave)5626 GST_START_TEST (test_live_av_2_error_from_slave)
5627 {
5628   error_from_slave_input_data id = { FALSE };
5629   error_from_slave_master_data md = { 0 };
5630 
5631   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_ERROR_SINK |
5632       TEST_FEATURE_SPLIT_SINKS,
5633       error_from_slave_source, setup_sink_error_from_slave,
5634       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5635 }
5636 
5637 GST_END_TEST;
5638 
GST_START_TEST(test_wavparse_slave_process_crash)5639 GST_START_TEST (test_wavparse_slave_process_crash)
5640 {
5641   error_from_slave_input_data id = { TRUE };
5642   error_from_slave_master_data md = { 0 };
5643 
5644   TEST_BASE (TEST_FEATURE_WAV_SOURCE | TEST_FEATURE_RECOVERY_SLAVE_PROCESS,
5645       error_from_slave_source, setup_sink_error_from_slave,
5646       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5647 }
5648 
5649 GST_END_TEST;
5650 
5651 /**** master process crash test ****/
5652 
5653 typedef struct
5654 {
5655   gboolean got_state_changed_to_playing;
5656 } master_process_crash_master_data;
5657 
5658 typedef struct
5659 {
5660   gboolean got_error;
5661   gboolean got_eos;
5662 } master_process_crash_slave_data;
5663 
5664 static void
master_process_crash_on_state_changed(gpointer user_data)5665 master_process_crash_on_state_changed (gpointer user_data)
5666 {
5667   test_data *td = user_data;
5668   master_process_crash_master_data *d = td->md;
5669 
5670   if (!d->got_state_changed_to_playing) {
5671     d->got_state_changed_to_playing = TRUE;
5672 
5673     /* We have two identical processes, and only one must crash. They can
5674        be distinguished by recovery_pid, however. */
5675     if (!recovery_pid)
5676       g_timeout_add (CRASH_AT, (GSourceFunc) crash, NULL);
5677   }
5678 }
5679 
5680 static void
master_process_crash_source(GstElement * source,gpointer user_data)5681 master_process_crash_source (GstElement * source, gpointer user_data)
5682 {
5683   test_data *td = user_data;
5684   GstStateChangeReturn ret;
5685 
5686   td->state_target = GST_STATE_PLAYING;
5687   td->state_changed_cb = master_process_crash_on_state_changed;
5688   ret = gst_element_set_state (source, GST_STATE_PLAYING);
5689   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC
5690       || ret == GST_STATE_CHANGE_SUCCESS);
5691 }
5692 
5693 static GstPadProbeReturn
master_process_crash_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)5694 master_process_crash_probe (GstPad * pad, GstPadProbeInfo * info,
5695     gpointer user_data)
5696 {
5697   test_data *td = user_data;
5698   master_process_crash_slave_data *d = td->sd;
5699 
5700   if (GST_IS_EVENT (info->data)) {
5701     if (GST_EVENT_TYPE (info->data) == GST_EVENT_EOS) {
5702       d->got_eos = TRUE;
5703     }
5704   }
5705 
5706   return GST_PAD_PROBE_OK;
5707 }
5708 
5709 static void
hook_master_process_crash_probe(const GValue * v,gpointer user_data)5710 hook_master_process_crash_probe (const GValue * v, gpointer user_data)
5711 {
5712   hook_probe (v, master_process_crash_probe, user_data);
5713 }
5714 
5715 static gboolean
go_to_NULL_and_reconnect(gpointer user_data)5716 go_to_NULL_and_reconnect (gpointer user_data)
5717 {
5718   GstElement *pipeline = user_data;
5719   GstStateChangeReturn ret;
5720   GstElement *src;
5721 
5722   ret = gst_element_set_state (pipeline, GST_STATE_NULL);
5723   FAIL_IF (ret == GST_STATE_CHANGE_FAILURE);
5724 
5725   /* reconnect to to master process */
5726   src = gst_bin_get_by_name (GST_BIN (pipeline), "ipcpipelinesrc0");
5727   FAIL_UNLESS (src);
5728   g_object_set (src, "fdin", pipesfa[0], "fdout", pipesba[1], NULL);
5729   gst_object_unref (src);
5730 
5731   gst_object_unref (pipeline);
5732   return G_SOURCE_REMOVE;
5733 }
5734 
5735 static gboolean
master_process_crash_bus_msg(GstBus * bus,GstMessage * message,gpointer user_data)5736 master_process_crash_bus_msg (GstBus * bus, GstMessage * message,
5737     gpointer user_data)
5738 {
5739   test_data *td = user_data;
5740   master_process_crash_slave_data *d = td->sd;
5741   GstIterator *it;
5742 
5743   switch (GST_MESSAGE_TYPE (message)) {
5744     case GST_MESSAGE_ERROR:
5745       if (!d->got_error) {
5746         it = gst_bin_iterate_sources (GST_BIN (td->p));
5747         while (gst_iterator_foreach (it, disconnect, NULL))
5748           gst_iterator_resync (it);
5749         gst_iterator_free (it);
5750         g_timeout_add (10, (GSourceFunc) go_to_NULL_and_reconnect,
5751             gst_object_ref (td->p));
5752         d->got_error = TRUE;
5753       }
5754       break;
5755     default:
5756       break;
5757   }
5758   return TRUE;
5759 }
5760 
5761 static void
setup_sink_master_process_crash(GstElement * sink,gpointer user_data)5762 setup_sink_master_process_crash (GstElement * sink, gpointer user_data)
5763 {
5764   GstIterator *it;
5765 
5766   it = gst_bin_iterate_sinks (GST_BIN (sink));
5767   while (gst_iterator_foreach (it, hook_master_process_crash_probe, user_data))
5768     gst_iterator_resync (it);
5769   gst_iterator_free (it);
5770 
5771   gst_bus_add_watch (GST_ELEMENT_BUS (sink), master_process_crash_bus_msg,
5772       user_data);
5773 }
5774 
5775 static void
check_success_source_master_process_crash(gpointer user_data)5776 check_success_source_master_process_crash (gpointer user_data)
5777 {
5778   test_data *td = user_data;
5779   master_process_crash_master_data *d = td->md;
5780 
5781   FAIL_UNLESS (d->got_state_changed_to_playing);
5782 }
5783 
5784 static void
check_success_sink_master_process_crash(gpointer user_data)5785 check_success_sink_master_process_crash (gpointer user_data)
5786 {
5787   test_data *td = user_data;
5788   master_process_crash_slave_data *d = td->sd;
5789 
5790   FAIL_UNLESS (d->got_error);
5791   FAIL_UNLESS (d->got_eos);
5792 }
5793 
GST_START_TEST(test_wavparse_master_process_crash)5794 GST_START_TEST (test_wavparse_master_process_crash)
5795 {
5796   master_process_crash_master_data md = { 0 };
5797   master_process_crash_slave_data sd = { 0 };
5798 
5799   TEST_BASE (TEST_FEATURE_WAV_SOURCE | TEST_FEATURE_RECOVERY_MASTER_PROCESS,
5800       master_process_crash_source, setup_sink_master_process_crash,
5801       check_success_source_master_process_crash,
5802       check_success_sink_master_process_crash, NULL, &md, &sd);
5803 }
5804 
5805 GST_END_TEST;
5806 
5807 static Suite *
ipcpipeline_suite(void)5808 ipcpipeline_suite (void)
5809 {
5810   Suite *s = suite_create ("ipcpipeline");
5811   TCase *tc_chain = tcase_create ("general");
5812 
5813   setup_lock ();
5814 
5815   suite_add_tcase (s, tc_chain);
5816   tcase_set_timeout (tc_chain, 180);
5817 
5818   /* play_pause tests put the pipeline in PLAYING state, then in
5819      PAUSED state, then in PLAYING state again. The sink expects
5820      async-done messages or state change successes. */
5821   if (1) {
5822     tcase_add_test (tc_chain, test_empty_play_pause);
5823     tcase_add_test (tc_chain, test_wavparse_play_pause);
5824     tcase_add_test (tc_chain, test_mpegts_play_pause);
5825     tcase_add_test (tc_chain, test_mpegts_2_play_pause);
5826     tcase_add_test (tc_chain, test_live_a_play_pause);
5827     tcase_add_test (tc_chain, test_live_av_play_pause);
5828     tcase_add_test (tc_chain, test_live_av_2_play_pause);
5829   }
5830 
5831   /* flushing_seek tests perform a flushing seek in PLAYING
5832      state. The sinks check a buffer with the target timestamp
5833      is received after the seek. */
5834   if (1) {
5835     tcase_add_test (tc_chain, test_empty_flushing_seek);
5836     tcase_add_test (tc_chain, test_wavparse_flushing_seek);
5837     tcase_add_test (tc_chain, test_mpegts_flushing_seek);
5838     tcase_add_test (tc_chain, test_mpegts_2_flushing_seek);
5839     tcase_add_test (tc_chain, test_live_a_flushing_seek);
5840     tcase_add_test (tc_chain, test_live_av_flushing_seek);
5841     tcase_add_test (tc_chain, test_live_av_2_flushing_seek);
5842   }
5843 
5844   /* flushing_seek_in_pause tests perform a flushing seek in
5845      PAUSED state. These are disabled for live pipelines since
5846      those will not generate data in PAUSED, so we won't get
5847      a buffer. */
5848   if (1) {
5849     tcase_add_test (tc_chain, test_empty_flushing_seek_in_pause);
5850     tcase_add_test (tc_chain, test_wavparse_flushing_seek_in_pause);
5851     tcase_add_test (tc_chain, test_mpegts_flushing_seek_in_pause);
5852     tcase_add_test (tc_chain, test_mpegts_2_flushing_seek_in_pause);
5853     /* live scenarios skipped: live sources do not generate buffers
5854      * when paused */
5855   }
5856 
5857   /* segment_seek tests perform a segment seek in PLAYING
5858      state. The sinks check a buffer with the target timestamp
5859      is received after the seek, and that a SEGMENT_DONE is
5860      received at the end of the segment. */
5861   if (1) {
5862     tcase_add_test (tc_chain, test_empty_segment_seek);
5863     tcase_add_test (tc_chain, test_wavparse_segment_seek);
5864     /* mpegts skipped: tsdemux does not support segment seeks */
5865     tcase_add_test (tc_chain, test_live_a_segment_seek);
5866     tcase_add_test (tc_chain, test_live_av_segment_seek);
5867     tcase_add_test (tc_chain, test_live_av_2_segment_seek);
5868   }
5869 
5870   /* seek_stress tests perform stress testing on seeks, then waits
5871      in PLAYING for EOS or segment-done. */
5872   if (1) {
5873     tcase_add_test (tc_chain, test_empty_seek_stress);
5874     tcase_add_test (tc_chain, test_wavparse_seek_stress);
5875     tcase_add_test (tc_chain, test_mpegts_seek_stress);
5876     tcase_add_test (tc_chain, test_mpegts_2_seek_stress);
5877     tcase_add_test (tc_chain, test_live_a_seek_stress);
5878     tcase_add_test (tc_chain, test_live_av_seek_stress);
5879     tcase_add_test (tc_chain, test_live_av_2_seek_stress);
5880   }
5881 
5882   /* upstream_query tests send position and duration queries, and
5883      checks the results are as expected. */
5884   if (1) {
5885     tcase_add_test (tc_chain, test_empty_upstream_query);
5886     tcase_add_test (tc_chain, test_wavparse_upstream_query);
5887     tcase_add_test (tc_chain, test_mpegts_upstream_query);
5888     tcase_add_test (tc_chain, test_mpegts_2_upstream_query);
5889     tcase_add_test (tc_chain, test_live_a_upstream_query);
5890     tcase_add_test (tc_chain, test_live_av_upstream_query);
5891     tcase_add_test (tc_chain, test_live_av_2_upstream_query);
5892   }
5893 
5894   /* message tests send a sink message downstream, which causes
5895      the sinks to reply with the embedded event, which is checked.
5896      This is not possible when elements go into pull mode. */
5897   if (1) {
5898     tcase_add_test (tc_chain, test_empty_message);
5899     tcase_add_test (tc_chain, test_wavparse_message);
5900     /* mpegts skipped because it goes into pull mode:
5901        https://bugzilla.gnome.org/show_bug.cgi?id=751637 */
5902     tcase_add_test (tc_chain, test_live_a_message);
5903     tcase_add_test (tc_chain, test_live_av_message);
5904     tcase_add_test (tc_chain, test_live_av_2_message);
5905   }
5906 
5907   /* end_of_stream tests check the EOS event and message are
5908      properly received when the stream reaches its end. */
5909   if (1) {
5910     tcase_add_test (tc_chain, test_empty_end_of_stream);
5911     tcase_add_test (tc_chain, test_wavparse_end_of_stream);
5912     tcase_add_test (tc_chain, test_mpegts_end_of_stream);
5913     tcase_add_test (tc_chain, test_mpegts_2_end_of_stream);
5914     tcase_add_test (tc_chain, test_live_a_end_of_stream);
5915     tcase_add_test (tc_chain, test_live_av_end_of_stream);
5916     tcase_add_test (tc_chain, test_live_av_2_end_of_stream);
5917   }
5918 
5919   /* reverse_playback tests issue a seek with negative rate,
5920      and check buffers timestamp are in decreasing order.
5921      This does not work with sources which do not support
5922      negative playback rate (live ones, and some demuxers). */
5923   if (1) {
5924     /* wavparse and tsdemux does not support backward playback */
5925     tcase_add_test (tc_chain, test_a_reverse_playback);
5926     tcase_add_test (tc_chain, test_av_reverse_playback);
5927     tcase_add_test (tc_chain, test_av_2_reverse_playback);
5928   }
5929 
5930   /* tags tests check tags are carried to the slave. */
5931   if (1) {
5932     tcase_add_test (tc_chain, test_empty_tags);
5933     tcase_add_test (tc_chain, test_wavparse_tags);
5934     tcase_add_test (tc_chain, test_mpegts_tags);
5935     tcase_add_test (tc_chain, test_mpegts_2_tags);
5936     tcase_add_test (tc_chain, test_live_a_tags);
5937     tcase_add_test (tc_chain, test_live_av_tags);
5938     tcase_add_test (tc_chain, test_live_av_2_tags);
5939   }
5940 
5941   /* reconfigure tests that pipeline reconfiguration via
5942      the reconfigure event works */
5943   if (1) {
5944     tcase_add_test (tc_chain, test_non_live_a_reconfigure);
5945     tcase_add_test (tc_chain, test_non_live_av_reconfigure);
5946     tcase_add_test (tc_chain, test_live_a_reconfigure);
5947     tcase_add_test (tc_chain, test_live_av_reconfigure);
5948   }
5949 
5950   /* state_change tests issue a number of state changes in
5951      (hopefully) all interesting configurations, and checks
5952      the state changes occured on the slave pipeline. The links
5953      are disconnected and reconnected to check it all still
5954      works after this. */
5955   if (1) {
5956     tcase_add_test (tc_chain, test_empty_state_changes);
5957     tcase_add_test (tc_chain, test_wavparse_state_changes);
5958     tcase_add_test (tc_chain, test_mpegts_state_changes);
5959     tcase_add_test (tc_chain, test_mpegts_2_state_changes);
5960     /* live scenarios skipped: live sources will cause no buffer
5961      * to flow in PAUSED, so the pipeline will only finish READY->PAUSED
5962      * once switching to PLAYING */
5963   }
5964 
5965   /* state_changes_stress tests change state randomly and rapidly. */
5966   if (1) {
5967     tcase_add_test (tc_chain, test_empty_state_changes_stress);
5968     tcase_add_test (tc_chain, test_wavparse_state_changes_stress);
5969     tcase_add_test (tc_chain, test_mpegts_state_changes_stress);
5970     tcase_add_test (tc_chain, test_mpegts_2_state_changes_stress);
5971     tcase_add_test (tc_chain, test_live_a_state_changes_stress);
5972     tcase_add_test (tc_chain, test_live_av_state_changes_stress);
5973     tcase_add_test (tc_chain, test_live_av_2_state_changes_stress);
5974   }
5975 
5976   /* serialized_query tests checks that a serialized query is
5977      handled by the slave pipeline. */
5978   if (1) {
5979     tcase_add_test (tc_chain, test_empty_serialized_query);
5980     tcase_add_test (tc_chain, test_wavparse_serialized_query);
5981     tcase_add_test (tc_chain, test_mpegts_serialized_query);
5982     tcase_add_test (tc_chain, test_mpegts_2_serialized_query);
5983     tcase_add_test (tc_chain, test_live_a_serialized_query);
5984     tcase_add_test (tc_chain, test_live_av_serialized_query);
5985     tcase_add_test (tc_chain, test_live_av_2_serialized_query);
5986   }
5987 
5988   /* non_serialized_event tests checks that a non serialized event
5989      is handled by the slave pipeline. */
5990   if (1) {
5991     tcase_add_test (tc_chain, test_empty_non_serialized_event);
5992     tcase_add_test (tc_chain, test_wavparse_non_serialized_event);
5993     tcase_add_test (tc_chain, test_mpegts_non_serialized_event);
5994     tcase_add_test (tc_chain, test_mpegts_2_non_serialized_event);
5995     tcase_add_test (tc_chain, test_live_a_non_serialized_event);
5996     tcase_add_test (tc_chain, test_live_av_non_serialized_event);
5997     tcase_add_test (tc_chain, test_live_av_2_non_serialized_event);
5998   }
5999 
6000   /* meta tests checks that GstMeta on buffers are correctly
6001      received by the slave pipeline. */
6002   if (1) {
6003     tcase_add_test (tc_chain, test_empty_meta);
6004     tcase_add_test (tc_chain, test_wavparse_meta);
6005     tcase_add_test (tc_chain, test_mpegts_meta);
6006     tcase_add_test (tc_chain, test_mpegts_2_meta);
6007     tcase_add_test (tc_chain, test_live_a_meta);
6008     tcase_add_test (tc_chain, test_live_av_meta);
6009     tcase_add_test (tc_chain, test_live_av_2_meta);
6010   }
6011 
6012   /* source_change tests checks that the pipelines can handle a
6013      change of source/caps. */
6014   if (1) {
6015     tcase_add_test (tc_chain, test_non_live_source_change);
6016     tcase_add_test (tc_chain, test_live_av_source_change);
6017     tcase_add_test (tc_chain, test_live_av_2_source_change);
6018   }
6019 
6020   /* navigation tests checks that navigation events from the slave
6021      are received by the master. */
6022   if (1) {
6023     tcase_add_test (tc_chain, test_non_live_av_navigation);
6024     tcase_add_test (tc_chain, test_non_live_av_2_navigation);
6025     tcase_add_test (tc_chain, test_live_av_navigation);
6026     tcase_add_test (tc_chain, test_live_av_2_navigation);
6027   }
6028 
6029   /* dynamic_pipeline_change_stress tests stress tests dynamic
6030      pipeline changes. */
6031   if (1) {
6032     tcase_add_test (tc_chain, test_non_live_av_dynamic_pipeline_change_stress);
6033     tcase_add_test (tc_chain,
6034         test_non_live_av_2_dynamic_pipeline_change_stress);
6035     tcase_add_test (tc_chain, test_live_av_dynamic_pipeline_change_stress);
6036     tcase_add_test (tc_chain, test_live_av_2_dynamic_pipeline_change_stress);
6037   }
6038 
6039   /* error_from_slave tests checks an error message issued
6040      by the slave pipeline is received by the master pipeline. */
6041   if (1) {
6042     tcase_add_test (tc_chain, test_empty_error_from_slave);
6043     tcase_add_test (tc_chain, test_wavparse_error_from_slave);
6044     tcase_add_test (tc_chain, test_mpegts_error_from_slave);
6045     tcase_add_test (tc_chain, test_mpegts_2_error_from_slave);
6046     tcase_add_test (tc_chain, test_live_a_error_from_slave);
6047     tcase_add_test (tc_chain, test_live_av_error_from_slave);
6048     tcase_add_test (tc_chain, test_live_av_2_error_from_slave);
6049   }
6050 
6051   /* slave_process_crash tests test that a crash of the slave
6052      process can be recovered from by the master, which can
6053      replace the slave process and continue. */
6054   tcase_add_test (tc_chain, test_wavparse_slave_process_crash);
6055 
6056   /* master_process_crash tests test that a crash of the master
6057      process can be recovered from by the slave. I don't recall
6058      how the recovery from that works, but it does! A watchdog
6059      process replaces the master process, and the slave will
6060      go to NULL and reconnect after it gets a timeout talking
6061      with the master pipeline. */
6062   tcase_add_test (tc_chain, test_wavparse_master_process_crash);
6063 
6064   return s;
6065 }
6066 
6067 GST_CHECK_MAIN (ipcpipeline);
6068