1 /* A generic test engine for elements based upon GstAdaptiveDemux
2 *
3 * Copyright (c) <2015> YouView TV Ltd
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
14 *
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20
21 #include <gst/check/gstcheck.h>
22 #include <gst/check/gsttestclock.h>
23 #include "adaptive_demux_engine.h"
24
25 typedef struct _GstAdaptiveDemuxTestEnginePrivate
26 {
27 GstAdaptiveDemuxTestEngine engine;
28 const GstAdaptiveDemuxTestCallbacks *callbacks;
29 gpointer user_data;
30 guint clock_update_id;
31 } GstAdaptiveDemuxTestEnginePrivate;
32
33
34 #define GST_TEST_GET_LOCK(d) (&(((GstAdaptiveDemuxTestEnginePrivate*)(d))->engine.lock))
35 #define GST_TEST_LOCK(d) g_mutex_lock (GST_TEST_GET_LOCK (d))
36 #define GST_TEST_UNLOCK(d) g_mutex_unlock (GST_TEST_GET_LOCK (d))
37
38 static void
adaptive_demux_engine_stream_state_finalize(gpointer data)39 adaptive_demux_engine_stream_state_finalize (gpointer data)
40 {
41 GstAdaptiveDemuxTestOutputStream *stream =
42 (GstAdaptiveDemuxTestOutputStream *) data;
43 g_free (stream->name);
44 if (stream->appsink)
45 gst_object_unref (stream->appsink);
46 if (stream->pad)
47 gst_object_unref (stream->pad);
48 if (stream->internal_pad)
49 gst_object_unref (stream->internal_pad);
50 g_slice_free (GstAdaptiveDemuxTestOutputStream, stream);
51 }
52
53 /* get the testOutput entry in testData corresponding to the given AppSink */
54 static GstAdaptiveDemuxTestOutputStream *
getTestOutputDataByAppsink(GstAdaptiveDemuxTestEnginePrivate * priv,GstAppSink * appsink)55 getTestOutputDataByAppsink (GstAdaptiveDemuxTestEnginePrivate * priv,
56 GstAppSink * appsink)
57 {
58 guint i;
59
60 for (i = 0; i < priv->engine.output_streams->len; ++i) {
61 GstAdaptiveDemuxTestOutputStream *state;
62 state = g_ptr_array_index (priv->engine.output_streams, i);
63 if (state->appsink == appsink) {
64 return state;
65 }
66 }
67 ck_abort_msg ("cannot find appsink %p in the output data", appsink);
68 return NULL;
69 }
70
71 /* get the output stream entry in corresponding to the given Pad */
72 static GstAdaptiveDemuxTestOutputStream *
getTestOutputDataByPad(GstAdaptiveDemuxTestEnginePrivate * priv,GstPad * pad,gboolean abort_if_not_found)73 getTestOutputDataByPad (GstAdaptiveDemuxTestEnginePrivate * priv,
74 GstPad * pad, gboolean abort_if_not_found)
75 {
76 guint i;
77
78 for (i = 0; i < priv->engine.output_streams->len; ++i) {
79 GstAdaptiveDemuxTestOutputStream *stream;
80 stream = g_ptr_array_index (priv->engine.output_streams, i);
81 if (stream->internal_pad == pad || stream->pad == pad) {
82 return stream;
83 }
84 }
85 if (abort_if_not_found)
86 ck_abort_msg ("cannot find pad %p in the output data", pad);
87 return NULL;
88 }
89
90 /* get the output stream entry in corresponding to the given Pad */
91 static GstAdaptiveDemuxTestOutputStream *
getTestOutputDataByName(GstAdaptiveDemuxTestEnginePrivate * priv,const gchar * name,gboolean abort_if_not_found)92 getTestOutputDataByName (GstAdaptiveDemuxTestEnginePrivate * priv,
93 const gchar * name, gboolean abort_if_not_found)
94 {
95 guint i;
96
97 for (i = 0; i < priv->engine.output_streams->len; ++i) {
98 GstAdaptiveDemuxTestOutputStream *stream;
99 stream = g_ptr_array_index (priv->engine.output_streams, i);
100 if (strstr (stream->name, name) != NULL) {
101 return stream;
102 }
103 }
104 if (abort_if_not_found)
105 ck_abort_msg ("cannot find pad %s in the output data", name);
106 return NULL;
107 }
108
109 /* callback called when AppSink receives data */
110
111 /* callback called when AppSink receives data */
112 static GstFlowReturn
on_appSinkNewSample(GstAppSink * appsink,gpointer user_data)113 on_appSinkNewSample (GstAppSink * appsink, gpointer user_data)
114 {
115 GstAdaptiveDemuxTestEnginePrivate *priv =
116 (GstAdaptiveDemuxTestEnginePrivate *) user_data;
117 GstAdaptiveDemuxTestEngine *engine;
118 GstAdaptiveDemuxTestOutputStream *testOutputStream = NULL;
119 GstSample *sample;
120 GstBuffer *buffer;
121 gboolean ret = TRUE;
122
123 fail_unless (priv != NULL);
124 GST_TEST_LOCK (priv);
125 engine = &priv->engine;
126 testOutputStream = getTestOutputDataByAppsink (priv, appsink);
127
128 sample = gst_app_sink_pull_sample (appsink);
129 fail_unless (sample != NULL);
130 buffer = gst_sample_get_buffer (sample);
131 fail_unless (buffer != NULL);
132
133 /* call the test callback, if registered */
134 if (priv->callbacks->appsink_received_data)
135 ret = priv->callbacks->appsink_received_data (engine,
136 testOutputStream, buffer, priv->user_data);
137
138 testOutputStream->segment_received_size += gst_buffer_get_size (buffer);
139
140 gst_sample_unref (sample);
141
142 GST_TEST_UNLOCK (priv);
143
144 if (!ret)
145 return GST_FLOW_EOS;
146
147 return GST_FLOW_OK;
148 }
149
150 /* callback called when AppSink receives eos */
151 static void
on_appSinkEOS(GstAppSink * appsink,gpointer user_data)152 on_appSinkEOS (GstAppSink * appsink, gpointer user_data)
153 {
154 GstAdaptiveDemuxTestEnginePrivate *priv =
155 (GstAdaptiveDemuxTestEnginePrivate *) user_data;
156 GstAdaptiveDemuxTestOutputStream *testOutputStream = NULL;
157
158 fail_unless (priv != NULL);
159 GST_TEST_LOCK (priv);
160 testOutputStream = getTestOutputDataByAppsink (priv, appsink);
161
162 testOutputStream->total_received_size +=
163 testOutputStream->segment_received_size;
164 testOutputStream->segment_received_size = 0;
165
166 if (priv->callbacks->appsink_eos)
167 priv->callbacks->appsink_eos (&priv->engine,
168 testOutputStream, priv->user_data);
169
170 GST_TEST_UNLOCK (priv);
171 }
172
173 static GstPadProbeReturn
on_appsink_event(GstPad * pad,GstPadProbeInfo * info,gpointer data)174 on_appsink_event (GstPad * pad, GstPadProbeInfo * info, gpointer data)
175 {
176 GstAdaptiveDemuxTestEnginePrivate *priv =
177 (GstAdaptiveDemuxTestEnginePrivate *) data;
178 GstAdaptiveDemuxTestOutputStream *stream = NULL;
179 GstEvent *event;
180
181 event = GST_PAD_PROBE_INFO_EVENT (info);
182 GST_DEBUG ("Received event %" GST_PTR_FORMAT " on pad %" GST_PTR_FORMAT,
183 event, pad);
184
185 if (priv->callbacks->appsink_event) {
186 GstPad *stream_pad = gst_pad_get_peer (pad);
187 fail_unless (stream_pad != NULL);
188
189 GST_TEST_LOCK (priv);
190 stream = getTestOutputDataByPad (priv, stream_pad, TRUE);
191 GST_TEST_UNLOCK (priv);
192 gst_object_unref (stream_pad);
193 priv->callbacks->appsink_event (&priv->engine, stream, event,
194 priv->user_data);
195 }
196
197 return GST_PAD_PROBE_OK;
198 }
199
200
201 /* callback called when demux sends data to AppSink */
202 static GstPadProbeReturn
on_demux_sent_data(GstPad * pad,GstPadProbeInfo * info,gpointer data)203 on_demux_sent_data (GstPad * pad, GstPadProbeInfo * info, gpointer data)
204 {
205 GstAdaptiveDemuxTestEnginePrivate *priv =
206 (GstAdaptiveDemuxTestEnginePrivate *) data;
207 GstAdaptiveDemuxTestOutputStream *stream = NULL;
208 GstBuffer *buffer;
209
210 buffer = GST_PAD_PROBE_INFO_BUFFER (info);
211
212 GST_TEST_LOCK (priv);
213 stream = getTestOutputDataByPad (priv, pad, TRUE);
214 GST_TEST_UNLOCK (priv);
215
216 if (priv->callbacks->demux_sent_data) {
217 (*priv->callbacks->demux_sent_data) (&priv->engine,
218 stream, buffer, priv->user_data);
219 }
220
221 return GST_PAD_PROBE_OK;
222 }
223
224 /* callback called when dash sends event to AppSink */
225 static GstPadProbeReturn
on_demux_sent_event(GstPad * pad,GstPadProbeInfo * info,gpointer data)226 on_demux_sent_event (GstPad * pad, GstPadProbeInfo * info, gpointer data)
227 {
228 GstAdaptiveDemuxTestEnginePrivate *priv =
229 (GstAdaptiveDemuxTestEnginePrivate *) data;
230 GstAdaptiveDemuxTestOutputStream *stream = NULL;
231 GstEvent *event;
232
233 event = GST_PAD_PROBE_INFO_EVENT (info);
234
235 GST_TEST_LOCK (&priv->engine);
236
237 if (priv->callbacks->demux_sent_event) {
238 stream = getTestOutputDataByPad (priv, pad, TRUE);
239 (*priv->callbacks->demux_sent_event) (&priv->engine,
240 stream, event, priv->user_data);
241 }
242
243 GST_TEST_UNLOCK (&priv->engine);
244 return GST_PAD_PROBE_OK;
245 }
246
247 /* callback called when demux receives events from GstFakeSoupHTTPSrc */
248 static GstPadProbeReturn
on_demuxReceivesEvent(GstPad * pad,GstPadProbeInfo * info,gpointer data)249 on_demuxReceivesEvent (GstPad * pad, GstPadProbeInfo * info, gpointer data)
250 {
251 GstAdaptiveDemuxTestEnginePrivate *priv =
252 (GstAdaptiveDemuxTestEnginePrivate *) data;
253 GstAdaptiveDemuxTestOutputStream *stream = NULL;
254 GstEvent *event;
255 const GstSegment *segment;
256
257 event = GST_PAD_PROBE_INFO_EVENT (info);
258 GST_DEBUG ("Received event %" GST_PTR_FORMAT " on pad %" GST_PTR_FORMAT,
259 event, pad);
260
261 if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
262 /* a new segment will start arriving
263 * update segment_start used by pattern validation
264 */
265 gst_event_parse_segment (event, &segment);
266
267 GST_TEST_LOCK (priv);
268 stream = getTestOutputDataByPad (priv, pad, TRUE);
269 stream->total_received_size += stream->segment_received_size;
270 stream->segment_received_size = 0;
271 stream->segment_start = segment->start;
272 GST_TEST_UNLOCK (priv);
273 }
274
275 return GST_PAD_PROBE_OK;
276 }
277
278
279 static void
on_demuxElementAdded(GstBin * demux,GstElement * element,gpointer user_data)280 on_demuxElementAdded (GstBin * demux, GstElement * element, gpointer user_data)
281 {
282 GstAdaptiveDemuxTestEnginePrivate *priv =
283 (GstAdaptiveDemuxTestEnginePrivate *) user_data;
284 GstAdaptiveDemuxTestOutputStream *stream = NULL;
285 GstPad *internal_pad;
286 gchar *srcbin_name;
287
288 srcbin_name = GST_ELEMENT_NAME (element);
289 GST_TEST_LOCK (priv);
290
291 stream = getTestOutputDataByName (priv, srcbin_name, FALSE);
292 if (stream == NULL) {
293 /* Pad wasn't exposed yet, create the stream */
294 stream = g_slice_new0 (GstAdaptiveDemuxTestOutputStream);
295 stream->name = g_strdup (srcbin_name);
296 g_ptr_array_add (priv->engine.output_streams, stream);
297 }
298
299 /* keep the reference to the internal_pad.
300 * We will need it to identify the stream in the on_demuxReceivesEvent callback
301 */
302 if (stream->internal_pad) {
303 gst_pad_remove_probe (stream->internal_pad, stream->internal_pad_probe);
304 gst_object_unref (stream->internal_pad);
305 }
306 internal_pad = gst_element_get_static_pad (element, "src");
307 stream->internal_pad_probe =
308 gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
309 (GstPadProbeCallback) on_demuxReceivesEvent, priv, NULL);
310 stream->internal_pad = internal_pad;
311 GST_TEST_UNLOCK (priv);
312
313 }
314
315
316 /* callback called when demux creates a src pad.
317 * We will create an AppSink to get the data
318 */
319 static void
on_demuxNewPad(GstElement * demux,GstPad * pad,gpointer user_data)320 on_demuxNewPad (GstElement * demux, GstPad * pad, gpointer user_data)
321 {
322 GstAdaptiveDemuxTestEnginePrivate *priv =
323 (GstAdaptiveDemuxTestEnginePrivate *) user_data;
324 GstElement *pipeline;
325 GstElement *sink;
326 gboolean ret;
327 gchar *name;
328 GstPad *appsink_pad;
329 GstAppSinkCallbacks appSinkCallbacks;
330 GstAdaptiveDemuxTestOutputStream *stream;
331 GObjectClass *gobject_class;
332
333 fail_unless (priv != NULL);
334 name = gst_pad_get_name (pad);
335
336 GST_DEBUG ("demux created pad %p", pad);
337
338 stream = getTestOutputDataByName (priv, name, FALSE);
339 if (stream == NULL) {
340 stream = g_slice_new0 (GstAdaptiveDemuxTestOutputStream);
341 stream->name = g_strdup (name);
342 g_ptr_array_add (priv->engine.output_streams, stream);
343 }
344
345 sink = gst_element_factory_make ("appsink", name);
346 g_free (name);
347 fail_unless (sink != NULL);
348
349 GST_TEST_LOCK (priv);
350
351 /* register the AppSink pointer in the test output data */
352 gst_object_ref (sink);
353 stream->appsink = GST_APP_SINK (sink);
354
355 appSinkCallbacks.eos = on_appSinkEOS;
356 appSinkCallbacks.new_preroll = NULL;
357 appSinkCallbacks.new_sample = on_appSinkNewSample;
358
359 gst_app_sink_set_callbacks (GST_APP_SINK (sink), &appSinkCallbacks, priv,
360 NULL);
361 appsink_pad = gst_element_get_static_pad (sink, "sink");
362 gst_pad_add_probe (appsink_pad,
363 GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
364 (GstPadProbeCallback) on_appsink_event, priv, NULL);
365 gst_object_unref (appsink_pad);
366
367 gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER,
368 (GstPadProbeCallback) on_demux_sent_data, priv, NULL);
369 gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
370 GST_PAD_PROBE_TYPE_EVENT_FLUSH,
371 (GstPadProbeCallback) on_demux_sent_event, priv, NULL);
372 gobject_class = G_OBJECT_GET_CLASS (sink);
373 if (g_object_class_find_property (gobject_class, "sync")) {
374 GST_DEBUG ("Setting sync=FALSE on AppSink");
375 g_object_set (G_OBJECT (sink), "sync", FALSE, NULL);
376 }
377 stream->pad = gst_object_ref (pad);
378
379 GST_TEST_UNLOCK (priv);
380
381 pipeline = GST_ELEMENT (gst_element_get_parent (demux));
382 fail_unless (pipeline != NULL);
383 ret = gst_bin_add (GST_BIN (pipeline), sink);
384 fail_unless_equals_int (ret, TRUE);
385 gst_object_unref (pipeline);
386 ret = gst_element_link (demux, sink);
387 fail_unless_equals_int (ret, TRUE);
388 ret = gst_element_sync_state_with_parent (sink);
389 fail_unless_equals_int (ret, TRUE);
390 GST_TEST_LOCK (priv);
391 if (priv->callbacks->demux_pad_added) {
392 priv->callbacks->demux_pad_added (&priv->engine, stream, priv->user_data);
393 }
394 GST_TEST_UNLOCK (priv);
395 }
396
397 /* callback called when demux removes a src pad.
398 * We remove the AppSink associated with this pad
399 */
400 static void
on_demuxPadRemoved(GstElement * demux,GstPad * pad,gpointer user_data)401 on_demuxPadRemoved (GstElement * demux, GstPad * pad, gpointer user_data)
402 {
403 GstAdaptiveDemuxTestEnginePrivate *priv =
404 (GstAdaptiveDemuxTestEnginePrivate *) user_data;
405 GstAdaptiveDemuxTestOutputStream *stream = NULL;
406 GstStateChangeReturn ret;
407 GstState currentState, pending;
408 GstElement *appSink;
409
410 fail_unless (priv != NULL);
411
412 GST_DEBUG ("Pad removed: %" GST_PTR_FORMAT, pad);
413
414 GST_TEST_LOCK (priv);
415 stream = getTestOutputDataByPad (priv, pad, TRUE);
416 if (priv->callbacks->demux_pad_removed) {
417 priv->callbacks->demux_pad_removed (&priv->engine, stream, priv->user_data);
418 }
419 fail_unless (stream->appsink != NULL);
420 if (stream->internal_pad) {
421 gst_object_unref (stream->internal_pad);
422 stream->internal_pad = NULL;
423 }
424 appSink = GST_ELEMENT (stream->appsink);
425 ret = gst_element_get_state (appSink, ¤tState, &pending, 0);
426 if ((ret == GST_STATE_CHANGE_SUCCESS && currentState == GST_STATE_PLAYING)
427 || (ret == GST_STATE_CHANGE_ASYNC && pending == GST_STATE_PLAYING)) {
428 GST_DEBUG ("Changing AppSink element to PAUSED");
429 gst_element_set_state (appSink, GST_STATE_PAUSED);
430 }
431 GST_TEST_UNLOCK (priv);
432 }
433
434 /* callback called when main_loop detects an error message
435 * We will signal main loop to quit
436 */
437 static void
on_ErrorMessageOnBus(GstBus * bus,GstMessage * msg,gpointer user_data)438 on_ErrorMessageOnBus (GstBus * bus, GstMessage * msg, gpointer user_data)
439 {
440 GstAdaptiveDemuxTestEnginePrivate *priv =
441 (GstAdaptiveDemuxTestEnginePrivate *) user_data;
442 GError *err = NULL;
443 gchar *dbg_info = NULL;
444
445 gst_message_parse_error (msg, &err, &dbg_info);
446 GST_DEBUG ("ERROR from element %s: '%s'. Debugging info: %s",
447 GST_OBJECT_NAME (msg->src), err->message, (dbg_info) ? dbg_info : "none");
448 g_error_free (err);
449 g_free (dbg_info);
450
451 GST_TEST_LOCK (priv);
452
453 fail_unless (priv->callbacks->bus_error_message,
454 "unexpected error detected on bus");
455
456 priv->callbacks->bus_error_message (&priv->engine, msg, priv->user_data);
457
458 GST_TEST_UNLOCK (priv);
459 }
460
461 static gboolean
gst_adaptive_demux_update_test_clock(gpointer user_data)462 gst_adaptive_demux_update_test_clock (gpointer user_data)
463 {
464 GstAdaptiveDemuxTestEnginePrivate *priv =
465 (GstAdaptiveDemuxTestEnginePrivate *) user_data;
466 GstClockID id;
467 GstClockTime next_entry;
468 GstTestClock *clock = GST_TEST_CLOCK (priv->engine.clock);
469
470 fail_unless (clock != NULL);
471 next_entry = gst_test_clock_get_next_entry_time (clock);
472 if (next_entry != GST_CLOCK_TIME_NONE) {
473 /* tests that do not want the manifest to update will set the update period
474 * to a big value, eg 500s. The manifest update task will register an alarm
475 * for that value.
476 * We do not want the clock to jump to that. If it does, the manifest update
477 * task will keep scheduling and use all the cpu power, starving the other
478 * threads.
479 * Usually the test require the clock to update with approx 3s, so we will
480 * allow only updates smaller than 100s
481 */
482 GstClockTime curr_time = gst_clock_get_time (GST_CLOCK (clock));
483 if (next_entry - curr_time < 100 * GST_SECOND) {
484 gst_test_clock_set_time (clock, next_entry);
485 id = gst_test_clock_process_next_clock_id (clock);
486 fail_unless (id != NULL);
487 gst_clock_id_unref (id);
488 }
489 }
490 return TRUE;
491 }
492
493 static gboolean
start_pipeline_playing(gpointer user_data)494 start_pipeline_playing (gpointer user_data)
495 {
496 GstAdaptiveDemuxTestEnginePrivate *priv =
497 (GstAdaptiveDemuxTestEnginePrivate *) user_data;
498 GstStateChangeReturn stateChange;
499
500 GST_DEBUG ("Moving pipeline to PLAYING state");
501 stateChange =
502 gst_element_set_state (priv->engine.pipeline, GST_STATE_PLAYING);
503 fail_unless (stateChange != GST_STATE_CHANGE_FAILURE);
504 GST_DEBUG ("PLAYING stateChange = %d", stateChange);
505 return FALSE;
506 }
507
508 /*
509 * Create a demux element, run a test using the input data and check
510 * the output data
511 */
512 void
gst_adaptive_demux_test_run(const gchar * element_name,const gchar * manifest_uri,const GstAdaptiveDemuxTestCallbacks * callbacks,gpointer user_data)513 gst_adaptive_demux_test_run (const gchar * element_name,
514 const gchar * manifest_uri,
515 const GstAdaptiveDemuxTestCallbacks * callbacks, gpointer user_data)
516 {
517 GstBus *bus;
518 GstElement *demux;
519 GstElement *manifest_source;
520 gboolean ret;
521 GstStateChangeReturn stateChange;
522 GstAdaptiveDemuxTestEnginePrivate *priv;
523
524 priv = g_slice_new0 (GstAdaptiveDemuxTestEnginePrivate);
525 priv->engine.output_streams =
526 g_ptr_array_new_with_free_func
527 (adaptive_demux_engine_stream_state_finalize);
528 g_mutex_init (&priv->engine.lock);
529 priv->callbacks = callbacks;
530 priv->user_data = user_data;
531 priv->engine.loop = g_main_loop_new (NULL, TRUE);
532 fail_unless (priv->engine.loop != NULL);
533 GST_TEST_LOCK (priv);
534 priv->engine.pipeline = gst_pipeline_new ("pipeline");
535 fail_unless (priv->engine.pipeline != NULL);
536 GST_DEBUG ("created pipeline %" GST_PTR_FORMAT, priv->engine.pipeline);
537
538 /* register a callback to listen for error messages */
539 bus = gst_pipeline_get_bus (GST_PIPELINE (priv->engine.pipeline));
540 gst_bus_add_signal_watch_full (bus, G_PRIORITY_HIGH);
541 g_signal_connect (bus, "message::error",
542 G_CALLBACK (on_ErrorMessageOnBus), priv);
543
544 manifest_source =
545 gst_element_make_from_uri (GST_URI_SRC, manifest_uri, NULL, NULL);
546 fail_unless (manifest_source != NULL);
547 priv->engine.manifest_source = manifest_source;
548
549 demux = gst_check_setup_element (element_name);
550 fail_unless (demux != NULL);
551 priv->engine.demux = demux;
552 GST_DEBUG ("created demux %" GST_PTR_FORMAT, demux);
553
554 g_signal_connect (demux, "element-added", G_CALLBACK (on_demuxElementAdded),
555 priv);
556 g_signal_connect (demux, "pad-added", G_CALLBACK (on_demuxNewPad), priv);
557 g_signal_connect (demux, "pad-removed",
558 G_CALLBACK (on_demuxPadRemoved), priv);
559
560 gst_bin_add_many (GST_BIN (priv->engine.pipeline), manifest_source, demux,
561 NULL);
562 ASSERT_OBJECT_REFCOUNT (manifest_source, element_name, 1);
563 ASSERT_OBJECT_REFCOUNT (demux, element_name, 1);
564
565 ret = gst_element_link (manifest_source, demux);
566 fail_unless_equals_int (ret, TRUE);
567
568 priv->engine.clock = gst_system_clock_obtain ();
569 if (GST_IS_TEST_CLOCK (priv->engine.clock)) {
570 /*
571 * live tests will want to manipulate the clock, so they will register a
572 * gst_test_clock as the system clock.
573 * The on demand tests do not care about the clock, so they will let the
574 * system clock to the default one.
575 * If a gst_test_clock was installed as system clock, we register a
576 * periodic callback to update its value.
577 */
578 priv->clock_update_id =
579 g_timeout_add (100, gst_adaptive_demux_update_test_clock, priv);
580 }
581
582 /* call a test callback before we start the pipeline */
583 if (callbacks->pre_test)
584 (*callbacks->pre_test) (&priv->engine, priv->user_data);
585
586 GST_TEST_UNLOCK (priv);
587
588 GST_DEBUG ("Starting pipeline");
589 stateChange = gst_element_set_state (priv->engine.pipeline, GST_STATE_PAUSED);
590 fail_unless (stateChange != GST_STATE_CHANGE_FAILURE);
591 /* wait for completion of the move to PAUSED */
592 stateChange = gst_element_get_state (priv->engine.pipeline, NULL, NULL,
593 GST_CLOCK_TIME_NONE);
594 fail_unless (stateChange != GST_STATE_CHANGE_FAILURE);
595
596 g_idle_add ((GSourceFunc) start_pipeline_playing, priv);
597
598 /* block until a callback calls g_main_loop_quit (engine.loop) */
599 GST_DEBUG ("main thread waiting for streams to finish");
600 g_main_loop_run (priv->engine.loop);
601 GST_DEBUG ("main thread finished. Stopping pipeline");
602
603 /* no need to use gst_element_get_state as the move the GST_STATE_NULL
604 is always synchronous */
605 stateChange = gst_element_set_state (priv->engine.pipeline, GST_STATE_NULL);
606 fail_unless (stateChange != GST_STATE_CHANGE_FAILURE);
607
608 GST_TEST_LOCK (priv);
609
610 /* call a test callback after the stop of the pipeline */
611 if (callbacks->post_test)
612 (*callbacks->post_test) (&priv->engine, priv->user_data);
613
614 g_signal_handlers_disconnect_by_func (bus,
615 G_CALLBACK (on_ErrorMessageOnBus), priv);
616 gst_bus_remove_signal_watch (bus);
617 g_signal_handlers_disconnect_by_func (demux, G_CALLBACK (on_demuxNewPad),
618 priv);
619 g_signal_handlers_disconnect_by_func (demux, G_CALLBACK (on_demuxPadRemoved),
620 priv);
621
622 GST_DEBUG ("main thread pipeline stopped");
623 if (priv->clock_update_id != 0)
624 g_source_remove (priv->clock_update_id);
625 gst_object_unref (priv->engine.clock);
626 gst_object_unref (priv->engine.pipeline);
627 priv->engine.pipeline = NULL;
628 g_main_loop_unref (priv->engine.loop);
629 g_ptr_array_unref (priv->engine.output_streams);
630 gst_object_unref (bus);
631
632 GST_TEST_UNLOCK (priv);
633 g_mutex_clear (&priv->engine.lock);
634 g_slice_free (GstAdaptiveDemuxTestEnginePrivate, priv);
635 }
636