1 /* A generic test engine for elements based upon GstAdaptiveDemux
2  *
3  * Copyright (c) <2015> YouView TV Ltd
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 
21 #include <gst/check/gstcheck.h>
22 #include <gst/check/gsttestclock.h>
23 #include "adaptive_demux_engine.h"
24 
25 typedef struct _GstAdaptiveDemuxTestEnginePrivate
26 {
27   GstAdaptiveDemuxTestEngine engine;
28   const GstAdaptiveDemuxTestCallbacks *callbacks;
29   gpointer user_data;
30   guint clock_update_id;
31 } GstAdaptiveDemuxTestEnginePrivate;
32 
33 
34 #define GST_TEST_GET_LOCK(d)  (&(((GstAdaptiveDemuxTestEnginePrivate*)(d))->engine.lock))
35 #define GST_TEST_LOCK(d)      g_mutex_lock (GST_TEST_GET_LOCK (d))
36 #define GST_TEST_UNLOCK(d)    g_mutex_unlock (GST_TEST_GET_LOCK (d))
37 
38 static void
adaptive_demux_engine_stream_state_finalize(gpointer data)39 adaptive_demux_engine_stream_state_finalize (gpointer data)
40 {
41   GstAdaptiveDemuxTestOutputStream *stream =
42       (GstAdaptiveDemuxTestOutputStream *) data;
43   g_free (stream->name);
44   if (stream->appsink)
45     gst_object_unref (stream->appsink);
46   if (stream->pad)
47     gst_object_unref (stream->pad);
48   if (stream->internal_pad)
49     gst_object_unref (stream->internal_pad);
50   g_slice_free (GstAdaptiveDemuxTestOutputStream, stream);
51 }
52 
53 /* get the testOutput entry in testData corresponding to the given AppSink */
54 static GstAdaptiveDemuxTestOutputStream *
getTestOutputDataByAppsink(GstAdaptiveDemuxTestEnginePrivate * priv,GstAppSink * appsink)55 getTestOutputDataByAppsink (GstAdaptiveDemuxTestEnginePrivate * priv,
56     GstAppSink * appsink)
57 {
58   guint i;
59 
60   for (i = 0; i < priv->engine.output_streams->len; ++i) {
61     GstAdaptiveDemuxTestOutputStream *state;
62     state = g_ptr_array_index (priv->engine.output_streams, i);
63     if (state->appsink == appsink) {
64       return state;
65     }
66   }
67   ck_abort_msg ("cannot find appsink %p in the output data", appsink);
68   return NULL;
69 }
70 
71 /* get the output stream entry in corresponding to the given Pad */
72 static GstAdaptiveDemuxTestOutputStream *
getTestOutputDataByPad(GstAdaptiveDemuxTestEnginePrivate * priv,GstPad * pad,gboolean abort_if_not_found)73 getTestOutputDataByPad (GstAdaptiveDemuxTestEnginePrivate * priv,
74     GstPad * pad, gboolean abort_if_not_found)
75 {
76   guint i;
77 
78   for (i = 0; i < priv->engine.output_streams->len; ++i) {
79     GstAdaptiveDemuxTestOutputStream *stream;
80     stream = g_ptr_array_index (priv->engine.output_streams, i);
81     if (stream->internal_pad == pad || stream->pad == pad) {
82       return stream;
83     }
84   }
85   if (abort_if_not_found)
86     ck_abort_msg ("cannot find pad %p in the output data", pad);
87   return NULL;
88 }
89 
90 /* get the output stream entry in corresponding to the given Pad */
91 static GstAdaptiveDemuxTestOutputStream *
getTestOutputDataByName(GstAdaptiveDemuxTestEnginePrivate * priv,const gchar * name,gboolean abort_if_not_found)92 getTestOutputDataByName (GstAdaptiveDemuxTestEnginePrivate * priv,
93     const gchar * name, gboolean abort_if_not_found)
94 {
95   guint i;
96 
97   for (i = 0; i < priv->engine.output_streams->len; ++i) {
98     GstAdaptiveDemuxTestOutputStream *stream;
99     stream = g_ptr_array_index (priv->engine.output_streams, i);
100     if (strstr (stream->name, name) != NULL) {
101       return stream;
102     }
103   }
104   if (abort_if_not_found)
105     ck_abort_msg ("cannot find pad %s in the output data", name);
106   return NULL;
107 }
108 
109 /* callback called when AppSink receives data */
110 
111 /* callback called when AppSink receives data */
112 static GstFlowReturn
on_appSinkNewSample(GstAppSink * appsink,gpointer user_data)113 on_appSinkNewSample (GstAppSink * appsink, gpointer user_data)
114 {
115   GstAdaptiveDemuxTestEnginePrivate *priv =
116       (GstAdaptiveDemuxTestEnginePrivate *) user_data;
117   GstAdaptiveDemuxTestEngine *engine;
118   GstAdaptiveDemuxTestOutputStream *testOutputStream = NULL;
119   GstSample *sample;
120   GstBuffer *buffer;
121   gboolean ret = TRUE;
122 
123   fail_unless (priv != NULL);
124   GST_TEST_LOCK (priv);
125   engine = &priv->engine;
126   testOutputStream = getTestOutputDataByAppsink (priv, appsink);
127 
128   sample = gst_app_sink_pull_sample (appsink);
129   fail_unless (sample != NULL);
130   buffer = gst_sample_get_buffer (sample);
131   fail_unless (buffer != NULL);
132 
133   /* call the test callback, if registered */
134   if (priv->callbacks->appsink_received_data)
135     ret = priv->callbacks->appsink_received_data (engine,
136         testOutputStream, buffer, priv->user_data);
137 
138   testOutputStream->segment_received_size += gst_buffer_get_size (buffer);
139 
140   gst_sample_unref (sample);
141 
142   GST_TEST_UNLOCK (priv);
143 
144   if (!ret)
145     return GST_FLOW_EOS;
146 
147   return GST_FLOW_OK;
148 }
149 
150 /* callback called when AppSink receives eos */
151 static void
on_appSinkEOS(GstAppSink * appsink,gpointer user_data)152 on_appSinkEOS (GstAppSink * appsink, gpointer user_data)
153 {
154   GstAdaptiveDemuxTestEnginePrivate *priv =
155       (GstAdaptiveDemuxTestEnginePrivate *) user_data;
156   GstAdaptiveDemuxTestOutputStream *testOutputStream = NULL;
157 
158   fail_unless (priv != NULL);
159   GST_TEST_LOCK (priv);
160   testOutputStream = getTestOutputDataByAppsink (priv, appsink);
161 
162   testOutputStream->total_received_size +=
163       testOutputStream->segment_received_size;
164   testOutputStream->segment_received_size = 0;
165 
166   if (priv->callbacks->appsink_eos)
167     priv->callbacks->appsink_eos (&priv->engine,
168         testOutputStream, priv->user_data);
169 
170   GST_TEST_UNLOCK (priv);
171 }
172 
173 static GstPadProbeReturn
on_appsink_event(GstPad * pad,GstPadProbeInfo * info,gpointer data)174 on_appsink_event (GstPad * pad, GstPadProbeInfo * info, gpointer data)
175 {
176   GstAdaptiveDemuxTestEnginePrivate *priv =
177       (GstAdaptiveDemuxTestEnginePrivate *) data;
178   GstAdaptiveDemuxTestOutputStream *stream = NULL;
179   GstEvent *event;
180 
181   event = GST_PAD_PROBE_INFO_EVENT (info);
182   GST_DEBUG ("Received event %" GST_PTR_FORMAT " on pad %" GST_PTR_FORMAT,
183       event, pad);
184 
185   if (priv->callbacks->appsink_event) {
186     GstPad *stream_pad = gst_pad_get_peer (pad);
187     fail_unless (stream_pad != NULL);
188 
189     GST_TEST_LOCK (priv);
190     stream = getTestOutputDataByPad (priv, stream_pad, TRUE);
191     GST_TEST_UNLOCK (priv);
192     gst_object_unref (stream_pad);
193     priv->callbacks->appsink_event (&priv->engine, stream, event,
194         priv->user_data);
195   }
196 
197   return GST_PAD_PROBE_OK;
198 }
199 
200 
201 /* callback called when demux sends data to AppSink */
202 static GstPadProbeReturn
on_demux_sent_data(GstPad * pad,GstPadProbeInfo * info,gpointer data)203 on_demux_sent_data (GstPad * pad, GstPadProbeInfo * info, gpointer data)
204 {
205   GstAdaptiveDemuxTestEnginePrivate *priv =
206       (GstAdaptiveDemuxTestEnginePrivate *) data;
207   GstAdaptiveDemuxTestOutputStream *stream = NULL;
208   GstBuffer *buffer;
209 
210   buffer = GST_PAD_PROBE_INFO_BUFFER (info);
211 
212   GST_TEST_LOCK (priv);
213   stream = getTestOutputDataByPad (priv, pad, TRUE);
214   GST_TEST_UNLOCK (priv);
215 
216   if (priv->callbacks->demux_sent_data) {
217     (*priv->callbacks->demux_sent_data) (&priv->engine,
218         stream, buffer, priv->user_data);
219   }
220 
221   return GST_PAD_PROBE_OK;
222 }
223 
224 /* callback called when dash sends event to AppSink */
225 static GstPadProbeReturn
on_demux_sent_event(GstPad * pad,GstPadProbeInfo * info,gpointer data)226 on_demux_sent_event (GstPad * pad, GstPadProbeInfo * info, gpointer data)
227 {
228   GstAdaptiveDemuxTestEnginePrivate *priv =
229       (GstAdaptiveDemuxTestEnginePrivate *) data;
230   GstAdaptiveDemuxTestOutputStream *stream = NULL;
231   GstEvent *event;
232 
233   event = GST_PAD_PROBE_INFO_EVENT (info);
234 
235   GST_TEST_LOCK (&priv->engine);
236 
237   if (priv->callbacks->demux_sent_event) {
238     stream = getTestOutputDataByPad (priv, pad, TRUE);
239     (*priv->callbacks->demux_sent_event) (&priv->engine,
240         stream, event, priv->user_data);
241   }
242 
243   GST_TEST_UNLOCK (&priv->engine);
244   return GST_PAD_PROBE_OK;
245 }
246 
247 /* callback called when demux receives events from GstFakeSoupHTTPSrc */
248 static GstPadProbeReturn
on_demuxReceivesEvent(GstPad * pad,GstPadProbeInfo * info,gpointer data)249 on_demuxReceivesEvent (GstPad * pad, GstPadProbeInfo * info, gpointer data)
250 {
251   GstAdaptiveDemuxTestEnginePrivate *priv =
252       (GstAdaptiveDemuxTestEnginePrivate *) data;
253   GstAdaptiveDemuxTestOutputStream *stream = NULL;
254   GstEvent *event;
255   const GstSegment *segment;
256 
257   event = GST_PAD_PROBE_INFO_EVENT (info);
258   GST_DEBUG ("Received event %" GST_PTR_FORMAT " on pad %" GST_PTR_FORMAT,
259       event, pad);
260 
261   if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
262     /* a new segment will start arriving
263      * update segment_start used by pattern validation
264      */
265     gst_event_parse_segment (event, &segment);
266 
267     GST_TEST_LOCK (priv);
268     stream = getTestOutputDataByPad (priv, pad, TRUE);
269     stream->total_received_size += stream->segment_received_size;
270     stream->segment_received_size = 0;
271     stream->segment_start = segment->start;
272     GST_TEST_UNLOCK (priv);
273   }
274 
275   return GST_PAD_PROBE_OK;
276 }
277 
278 
279 static void
on_demuxElementAdded(GstBin * demux,GstElement * element,gpointer user_data)280 on_demuxElementAdded (GstBin * demux, GstElement * element, gpointer user_data)
281 {
282   GstAdaptiveDemuxTestEnginePrivate *priv =
283       (GstAdaptiveDemuxTestEnginePrivate *) user_data;
284   GstAdaptiveDemuxTestOutputStream *stream = NULL;
285   GstPad *internal_pad;
286   gchar *srcbin_name;
287 
288   srcbin_name = GST_ELEMENT_NAME (element);
289   GST_TEST_LOCK (priv);
290 
291   stream = getTestOutputDataByName (priv, srcbin_name, FALSE);
292   if (stream == NULL) {
293     /* Pad wasn't exposed yet, create the stream */
294     stream = g_slice_new0 (GstAdaptiveDemuxTestOutputStream);
295     stream->name = g_strdup (srcbin_name);
296     g_ptr_array_add (priv->engine.output_streams, stream);
297   }
298 
299   /* keep the reference to the internal_pad.
300    * We will need it to identify the stream in the on_demuxReceivesEvent callback
301    */
302   if (stream->internal_pad) {
303     gst_pad_remove_probe (stream->internal_pad, stream->internal_pad_probe);
304     gst_object_unref (stream->internal_pad);
305   }
306   internal_pad = gst_element_get_static_pad (element, "src");
307   stream->internal_pad_probe =
308       gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
309       (GstPadProbeCallback) on_demuxReceivesEvent, priv, NULL);
310   stream->internal_pad = internal_pad;
311   GST_TEST_UNLOCK (priv);
312 
313 }
314 
315 
316 /* callback called when demux creates a src pad.
317  * We will create an AppSink to get the data
318  */
319 static void
on_demuxNewPad(GstElement * demux,GstPad * pad,gpointer user_data)320 on_demuxNewPad (GstElement * demux, GstPad * pad, gpointer user_data)
321 {
322   GstAdaptiveDemuxTestEnginePrivate *priv =
323       (GstAdaptiveDemuxTestEnginePrivate *) user_data;
324   GstElement *pipeline;
325   GstElement *sink;
326   gboolean ret;
327   gchar *name;
328   GstPad *appsink_pad;
329   GstAppSinkCallbacks appSinkCallbacks;
330   GstAdaptiveDemuxTestOutputStream *stream;
331   GObjectClass *gobject_class;
332 
333   fail_unless (priv != NULL);
334   name = gst_pad_get_name (pad);
335 
336   GST_DEBUG ("demux created pad %p", pad);
337 
338   stream = getTestOutputDataByName (priv, name, FALSE);
339   if (stream == NULL) {
340     stream = g_slice_new0 (GstAdaptiveDemuxTestOutputStream);
341     stream->name = g_strdup (name);
342     g_ptr_array_add (priv->engine.output_streams, stream);
343   }
344 
345   sink = gst_element_factory_make ("appsink", name);
346   g_free (name);
347   fail_unless (sink != NULL);
348 
349   GST_TEST_LOCK (priv);
350 
351   /* register the AppSink pointer in the test output data */
352   gst_object_ref (sink);
353   stream->appsink = GST_APP_SINK (sink);
354 
355   appSinkCallbacks.eos = on_appSinkEOS;
356   appSinkCallbacks.new_preroll = NULL;
357   appSinkCallbacks.new_sample = on_appSinkNewSample;
358 
359   gst_app_sink_set_callbacks (GST_APP_SINK (sink), &appSinkCallbacks, priv,
360       NULL);
361   appsink_pad = gst_element_get_static_pad (sink, "sink");
362   gst_pad_add_probe (appsink_pad,
363       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
364       (GstPadProbeCallback) on_appsink_event, priv, NULL);
365   gst_object_unref (appsink_pad);
366 
367   gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER,
368       (GstPadProbeCallback) on_demux_sent_data, priv, NULL);
369   gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
370       GST_PAD_PROBE_TYPE_EVENT_FLUSH,
371       (GstPadProbeCallback) on_demux_sent_event, priv, NULL);
372   gobject_class = G_OBJECT_GET_CLASS (sink);
373   if (g_object_class_find_property (gobject_class, "sync")) {
374     GST_DEBUG ("Setting sync=FALSE on AppSink");
375     g_object_set (G_OBJECT (sink), "sync", FALSE, NULL);
376   }
377   stream->pad = gst_object_ref (pad);
378 
379   GST_TEST_UNLOCK (priv);
380 
381   pipeline = GST_ELEMENT (gst_element_get_parent (demux));
382   fail_unless (pipeline != NULL);
383   ret = gst_bin_add (GST_BIN (pipeline), sink);
384   fail_unless_equals_int (ret, TRUE);
385   gst_object_unref (pipeline);
386   ret = gst_element_link (demux, sink);
387   fail_unless_equals_int (ret, TRUE);
388   ret = gst_element_sync_state_with_parent (sink);
389   fail_unless_equals_int (ret, TRUE);
390   GST_TEST_LOCK (priv);
391   if (priv->callbacks->demux_pad_added) {
392     priv->callbacks->demux_pad_added (&priv->engine, stream, priv->user_data);
393   }
394   GST_TEST_UNLOCK (priv);
395 }
396 
397 /* callback called when demux removes a src pad.
398  * We remove the AppSink associated with this pad
399  */
400 static void
on_demuxPadRemoved(GstElement * demux,GstPad * pad,gpointer user_data)401 on_demuxPadRemoved (GstElement * demux, GstPad * pad, gpointer user_data)
402 {
403   GstAdaptiveDemuxTestEnginePrivate *priv =
404       (GstAdaptiveDemuxTestEnginePrivate *) user_data;
405   GstAdaptiveDemuxTestOutputStream *stream = NULL;
406   GstStateChangeReturn ret;
407   GstState currentState, pending;
408   GstElement *appSink;
409 
410   fail_unless (priv != NULL);
411 
412   GST_DEBUG ("Pad removed: %" GST_PTR_FORMAT, pad);
413 
414   GST_TEST_LOCK (priv);
415   stream = getTestOutputDataByPad (priv, pad, TRUE);
416   if (priv->callbacks->demux_pad_removed) {
417     priv->callbacks->demux_pad_removed (&priv->engine, stream, priv->user_data);
418   }
419   fail_unless (stream->appsink != NULL);
420   if (stream->internal_pad) {
421     gst_object_unref (stream->internal_pad);
422     stream->internal_pad = NULL;
423   }
424   appSink = GST_ELEMENT (stream->appsink);
425   ret = gst_element_get_state (appSink, &currentState, &pending, 0);
426   if ((ret == GST_STATE_CHANGE_SUCCESS && currentState == GST_STATE_PLAYING)
427       || (ret == GST_STATE_CHANGE_ASYNC && pending == GST_STATE_PLAYING)) {
428     GST_DEBUG ("Changing AppSink element to PAUSED");
429     gst_element_set_state (appSink, GST_STATE_PAUSED);
430   }
431   GST_TEST_UNLOCK (priv);
432 }
433 
434 /* callback called when main_loop detects an error message
435  * We will signal main loop to quit
436  */
437 static void
on_ErrorMessageOnBus(GstBus * bus,GstMessage * msg,gpointer user_data)438 on_ErrorMessageOnBus (GstBus * bus, GstMessage * msg, gpointer user_data)
439 {
440   GstAdaptiveDemuxTestEnginePrivate *priv =
441       (GstAdaptiveDemuxTestEnginePrivate *) user_data;
442   GError *err = NULL;
443   gchar *dbg_info = NULL;
444 
445   gst_message_parse_error (msg, &err, &dbg_info);
446   GST_DEBUG ("ERROR from element %s: '%s'. Debugging info: %s",
447       GST_OBJECT_NAME (msg->src), err->message, (dbg_info) ? dbg_info : "none");
448   g_error_free (err);
449   g_free (dbg_info);
450 
451   GST_TEST_LOCK (priv);
452 
453   fail_unless (priv->callbacks->bus_error_message,
454       "unexpected error detected on bus");
455 
456   priv->callbacks->bus_error_message (&priv->engine, msg, priv->user_data);
457 
458   GST_TEST_UNLOCK (priv);
459 }
460 
461 static gboolean
gst_adaptive_demux_update_test_clock(gpointer user_data)462 gst_adaptive_demux_update_test_clock (gpointer user_data)
463 {
464   GstAdaptiveDemuxTestEnginePrivate *priv =
465       (GstAdaptiveDemuxTestEnginePrivate *) user_data;
466   GstClockID id;
467   GstClockTime next_entry;
468   GstTestClock *clock = GST_TEST_CLOCK (priv->engine.clock);
469 
470   fail_unless (clock != NULL);
471   next_entry = gst_test_clock_get_next_entry_time (clock);
472   if (next_entry != GST_CLOCK_TIME_NONE) {
473     /* tests that do not want the manifest to update will set the update period
474      * to a big value, eg 500s. The manifest update task will register an alarm
475      * for that value.
476      * We do not want the clock to jump to that. If it does, the manifest update
477      * task will keep scheduling and use all the cpu power, starving the other
478      * threads.
479      * Usually the test require the clock to update with approx 3s, so we will
480      * allow only updates smaller than 100s
481      */
482     GstClockTime curr_time = gst_clock_get_time (GST_CLOCK (clock));
483     if (next_entry - curr_time < 100 * GST_SECOND) {
484       gst_test_clock_set_time (clock, next_entry);
485       id = gst_test_clock_process_next_clock_id (clock);
486       fail_unless (id != NULL);
487       gst_clock_id_unref (id);
488     }
489   }
490   return TRUE;
491 }
492 
493 static gboolean
start_pipeline_playing(gpointer user_data)494 start_pipeline_playing (gpointer user_data)
495 {
496   GstAdaptiveDemuxTestEnginePrivate *priv =
497       (GstAdaptiveDemuxTestEnginePrivate *) user_data;
498   GstStateChangeReturn stateChange;
499 
500   GST_DEBUG ("Moving pipeline to PLAYING state");
501   stateChange =
502       gst_element_set_state (priv->engine.pipeline, GST_STATE_PLAYING);
503   fail_unless (stateChange != GST_STATE_CHANGE_FAILURE);
504   GST_DEBUG ("PLAYING stateChange = %d", stateChange);
505   return FALSE;
506 }
507 
508 /*
509  * Create a demux element, run a test using the input data and check
510  * the output data
511  */
512 void
gst_adaptive_demux_test_run(const gchar * element_name,const gchar * manifest_uri,const GstAdaptiveDemuxTestCallbacks * callbacks,gpointer user_data)513 gst_adaptive_demux_test_run (const gchar * element_name,
514     const gchar * manifest_uri,
515     const GstAdaptiveDemuxTestCallbacks * callbacks, gpointer user_data)
516 {
517   GstBus *bus;
518   GstElement *demux;
519   GstElement *manifest_source;
520   gboolean ret;
521   GstStateChangeReturn stateChange;
522   GstAdaptiveDemuxTestEnginePrivate *priv;
523 
524   priv = g_slice_new0 (GstAdaptiveDemuxTestEnginePrivate);
525   priv->engine.output_streams =
526       g_ptr_array_new_with_free_func
527       (adaptive_demux_engine_stream_state_finalize);
528   g_mutex_init (&priv->engine.lock);
529   priv->callbacks = callbacks;
530   priv->user_data = user_data;
531   priv->engine.loop = g_main_loop_new (NULL, TRUE);
532   fail_unless (priv->engine.loop != NULL);
533   GST_TEST_LOCK (priv);
534   priv->engine.pipeline = gst_pipeline_new ("pipeline");
535   fail_unless (priv->engine.pipeline != NULL);
536   GST_DEBUG ("created pipeline %" GST_PTR_FORMAT, priv->engine.pipeline);
537 
538   /* register a callback to listen for error messages */
539   bus = gst_pipeline_get_bus (GST_PIPELINE (priv->engine.pipeline));
540   gst_bus_add_signal_watch_full (bus, G_PRIORITY_HIGH);
541   g_signal_connect (bus, "message::error",
542       G_CALLBACK (on_ErrorMessageOnBus), priv);
543 
544   manifest_source =
545       gst_element_make_from_uri (GST_URI_SRC, manifest_uri, NULL, NULL);
546   fail_unless (manifest_source != NULL);
547   priv->engine.manifest_source = manifest_source;
548 
549   demux = gst_check_setup_element (element_name);
550   fail_unless (demux != NULL);
551   priv->engine.demux = demux;
552   GST_DEBUG ("created demux %" GST_PTR_FORMAT, demux);
553 
554   g_signal_connect (demux, "element-added", G_CALLBACK (on_demuxElementAdded),
555       priv);
556   g_signal_connect (demux, "pad-added", G_CALLBACK (on_demuxNewPad), priv);
557   g_signal_connect (demux, "pad-removed",
558       G_CALLBACK (on_demuxPadRemoved), priv);
559 
560   gst_bin_add_many (GST_BIN (priv->engine.pipeline), manifest_source, demux,
561       NULL);
562   ASSERT_OBJECT_REFCOUNT (manifest_source, element_name, 1);
563   ASSERT_OBJECT_REFCOUNT (demux, element_name, 1);
564 
565   ret = gst_element_link (manifest_source, demux);
566   fail_unless_equals_int (ret, TRUE);
567 
568   priv->engine.clock = gst_system_clock_obtain ();
569   if (GST_IS_TEST_CLOCK (priv->engine.clock)) {
570     /*
571      * live tests will want to manipulate the clock, so they will register a
572      * gst_test_clock as the system clock.
573      * The on demand tests do not care about the clock, so they will let the
574      * system clock to the default one.
575      * If a gst_test_clock was installed as system clock, we register a
576      * periodic callback to update its value.
577      */
578     priv->clock_update_id =
579         g_timeout_add (100, gst_adaptive_demux_update_test_clock, priv);
580   }
581 
582   /* call a test callback before we start the pipeline */
583   if (callbacks->pre_test)
584     (*callbacks->pre_test) (&priv->engine, priv->user_data);
585 
586   GST_TEST_UNLOCK (priv);
587 
588   GST_DEBUG ("Starting pipeline");
589   stateChange = gst_element_set_state (priv->engine.pipeline, GST_STATE_PAUSED);
590   fail_unless (stateChange != GST_STATE_CHANGE_FAILURE);
591   /* wait for completion of the move to PAUSED */
592   stateChange = gst_element_get_state (priv->engine.pipeline, NULL, NULL,
593       GST_CLOCK_TIME_NONE);
594   fail_unless (stateChange != GST_STATE_CHANGE_FAILURE);
595 
596   g_idle_add ((GSourceFunc) start_pipeline_playing, priv);
597 
598   /* block until a callback calls g_main_loop_quit (engine.loop) */
599   GST_DEBUG ("main thread waiting for streams to finish");
600   g_main_loop_run (priv->engine.loop);
601   GST_DEBUG ("main thread finished. Stopping pipeline");
602 
603   /* no need to use gst_element_get_state as the move the GST_STATE_NULL
604      is always synchronous */
605   stateChange = gst_element_set_state (priv->engine.pipeline, GST_STATE_NULL);
606   fail_unless (stateChange != GST_STATE_CHANGE_FAILURE);
607 
608   GST_TEST_LOCK (priv);
609 
610   /* call a test callback after the stop of the pipeline */
611   if (callbacks->post_test)
612     (*callbacks->post_test) (&priv->engine, priv->user_data);
613 
614   g_signal_handlers_disconnect_by_func (bus,
615       G_CALLBACK (on_ErrorMessageOnBus), priv);
616   gst_bus_remove_signal_watch (bus);
617   g_signal_handlers_disconnect_by_func (demux, G_CALLBACK (on_demuxNewPad),
618       priv);
619   g_signal_handlers_disconnect_by_func (demux, G_CALLBACK (on_demuxPadRemoved),
620       priv);
621 
622   GST_DEBUG ("main thread pipeline stopped");
623   if (priv->clock_update_id != 0)
624     g_source_remove (priv->clock_update_id);
625   gst_object_unref (priv->engine.clock);
626   gst_object_unref (priv->engine.pipeline);
627   priv->engine.pipeline = NULL;
628   g_main_loop_unref (priv->engine.loop);
629   g_ptr_array_unref (priv->engine.output_streams);
630   gst_object_unref (bus);
631 
632   GST_TEST_UNLOCK (priv);
633   g_mutex_clear (&priv->engine.lock);
634   g_slice_free (GstAdaptiveDemuxTestEnginePrivate, priv);
635 }
636