1 /* A set of utility functions that are common between elements
2 * based upon GstAdaptiveDemux
3 *
4 * Copyright (c) <2015> YouView TV Ltd
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21
22 #include <gst/check/gstcheck.h>
23 #include "adaptive_demux_engine.h"
24 #include "adaptive_demux_common.h"
25
26 #define GST_TEST_HTTP_SRC_NAME "testhttpsrc"
27
28 #define gst_adaptive_demux_test_case_parent_class parent_class
29
30 static void gst_adaptive_demux_test_case_dispose (GObject * object);
31 static void gst_adaptive_demux_test_case_finalize (GObject * object);
32 static void gst_adaptive_demux_test_case_clear (GstAdaptiveDemuxTestCase *
33 testData);
34
35 G_DEFINE_TYPE (GstAdaptiveDemuxTestCase, gst_adaptive_demux_test_case,
36 G_TYPE_OBJECT);
37
38 static void
gst_adaptive_demux_test_case_class_init(GstAdaptiveDemuxTestCaseClass * klass)39 gst_adaptive_demux_test_case_class_init (GstAdaptiveDemuxTestCaseClass * klass)
40 {
41 GObjectClass *object = G_OBJECT_CLASS (klass);
42
43 object->dispose = gst_adaptive_demux_test_case_dispose;
44 object->finalize = gst_adaptive_demux_test_case_finalize;
45 }
46
47 static void
gst_adaptive_demux_test_case_init(GstAdaptiveDemuxTestCase * testData)48 gst_adaptive_demux_test_case_init (GstAdaptiveDemuxTestCase * testData)
49 {
50 testData->output_streams = NULL;
51 testData->test_task = NULL;
52 g_rec_mutex_init (&testData->test_task_lock);
53 g_mutex_init (&testData->test_task_state_lock);
54 g_cond_init (&testData->test_task_state_cond);
55 gst_adaptive_demux_test_case_clear (testData);
56 }
57
58 static void
gst_adaptive_demux_test_case_clear(GstAdaptiveDemuxTestCase * testData)59 gst_adaptive_demux_test_case_clear (GstAdaptiveDemuxTestCase * testData)
60 {
61 if (testData->output_streams) {
62 g_list_free (testData->output_streams);
63 testData->output_streams = NULL;
64 }
65 testData->count_of_finished_streams = 0;
66 if (testData->test_task) {
67 gst_task_stop (testData->test_task);
68 gst_task_join (testData->test_task);
69 gst_object_unref (testData->test_task);
70 testData->test_task = NULL;
71 }
72 testData->signal_context = NULL;
73 testData->test_task_state = TEST_TASK_STATE_NOT_STARTED;
74 testData->threshold_for_seek = 0;
75 gst_event_replace (&testData->seek_event, NULL);
76 testData->signal_context = NULL;
77 }
78
79
80 static void
gst_adaptive_demux_test_case_dispose(GObject * object)81 gst_adaptive_demux_test_case_dispose (GObject * object)
82 {
83 GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (object);
84
85 gst_adaptive_demux_test_case_clear (testData);
86
87 GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object));
88 }
89
90 static void
gst_adaptive_demux_test_case_finalize(GObject * object)91 gst_adaptive_demux_test_case_finalize (GObject * object)
92 {
93 GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (object);
94
95 g_cond_clear (&testData->test_task_state_cond);
96 g_mutex_clear (&testData->test_task_state_lock);
97 g_rec_mutex_clear (&testData->test_task_lock);
98 if (testData->test_task) {
99 gst_task_stop (testData->test_task);
100 gst_task_join (testData->test_task);
101 gst_object_unref (testData->test_task);
102 testData->test_task = NULL;
103 }
104 if (testData->output_streams) {
105 g_list_free (testData->output_streams);
106 testData->output_streams = NULL;
107 }
108
109 GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
110 }
111
112 /**
113 * gst_adaptive_demux_test_case_new:
114 *
115 * Creates a new #GstAdaptiveDemuxTestCase. Free with g_object_unref().
116 *
117 * Returns: (transfer full): a new #GstAdaptiveDemuxTestCase
118 */
119 GstAdaptiveDemuxTestCase *
gst_adaptive_demux_test_case_new(void)120 gst_adaptive_demux_test_case_new (void)
121 {
122 return g_object_new (GST_TYPE_ADAPTIVE_DEMUX_TEST_CASE, NULL);
123 }
124
125
126 GstAdaptiveDemuxTestExpectedOutput *
gst_adaptive_demux_test_find_test_data_by_stream(GstAdaptiveDemuxTestCase * testData,GstAdaptiveDemuxTestOutputStream * stream,guint * index)127 gst_adaptive_demux_test_find_test_data_by_stream (GstAdaptiveDemuxTestCase *
128 testData, GstAdaptiveDemuxTestOutputStream * stream, guint * index)
129 {
130 gchar *pad_name;
131 GstAdaptiveDemuxTestExpectedOutput *ret = NULL;
132 guint count = 0;
133 GList *walk;
134
135 pad_name = gst_pad_get_name (stream->pad);
136 fail_unless (pad_name != NULL);
137 for (walk = testData->output_streams; walk; walk = g_list_next (walk)) {
138 GstAdaptiveDemuxTestExpectedOutput *td = walk->data;
139 if (strcmp (td->name, pad_name) == 0) {
140 ret = td;
141 if (index)
142 *index = count;
143 }
144 ++count;
145 }
146 g_free (pad_name);
147 return ret;
148 }
149
150 /* function to validate data received by AppSink */
151 gboolean
gst_adaptive_demux_test_check_received_data(GstAdaptiveDemuxTestEngine * engine,GstAdaptiveDemuxTestOutputStream * stream,GstBuffer * buffer,gpointer user_data)152 gst_adaptive_demux_test_check_received_data (GstAdaptiveDemuxTestEngine *
153 engine, GstAdaptiveDemuxTestOutputStream * stream, GstBuffer * buffer,
154 gpointer user_data)
155 {
156 GstMapInfo info;
157 guint pattern;
158 guint64 streamOffset;
159 GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data);
160 GstAdaptiveDemuxTestExpectedOutput *testOutputStreamData;
161 guint64 i;
162
163 fail_unless (stream != NULL);
164 fail_unless (engine->pipeline != NULL);
165 testOutputStreamData =
166 gst_adaptive_demux_test_find_test_data_by_stream (testData, stream, NULL);
167 fail_unless (testOutputStreamData != NULL);
168
169 GST_DEBUG
170 ("total_received_size=%" G_GUINT64_FORMAT
171 " segment_received_size = %" G_GUINT64_FORMAT
172 " buffer_size=%" G_GUINT64_FORMAT
173 " expected_size=%" G_GUINT64_FORMAT
174 " segment_start = %" G_GUINT64_FORMAT,
175 stream->total_received_size,
176 stream->segment_received_size,
177 (guint64) gst_buffer_get_size (buffer),
178 testOutputStreamData->expected_size, stream->segment_start);
179
180 /* Only verify after seeking */
181 if (testData->seek_event && testData->seeked)
182 fail_unless (stream->total_received_size +
183 stream->segment_received_size +
184 gst_buffer_get_size (buffer) <= testOutputStreamData->expected_size,
185 "Received unexpected data, please check what segments are being downloaded");
186
187 streamOffset = stream->segment_start + stream->segment_received_size;
188 if (testOutputStreamData->expected_data) {
189 gsize size = gst_buffer_get_size (buffer);
190 if (gst_buffer_memcmp (buffer, 0,
191 &testOutputStreamData->expected_data[streamOffset], size) == 0) {
192 return TRUE;
193 }
194 /* If buffers do not match, fall back to a slower byte-based check
195 so that the test can output the position where the received data
196 diverges from expected_data
197 */
198 }
199
200 gst_buffer_map (buffer, &info, GST_MAP_READ);
201
202 pattern = streamOffset - streamOffset % sizeof (pattern);
203 for (i = 0; i != info.size; ++i) {
204 guint received = info.data[i];
205 guint expected;
206
207 if (testOutputStreamData->expected_data) {
208 fail_unless (streamOffset + i < testOutputStreamData->expected_size);
209 expected = testOutputStreamData->expected_data[streamOffset + i];
210 } else {
211 gchar pattern_byte_to_read;
212
213 pattern_byte_to_read = (streamOffset + i) % sizeof (pattern);
214 if (pattern_byte_to_read == 0) {
215 pattern = streamOffset + i;
216 }
217
218 expected = (pattern >> (pattern_byte_to_read * 8)) & 0xFF;
219 #if 0
220 GST_DEBUG
221 ("received '0x%02x' expected '0x%02x' offset %" G_GUINT64_FORMAT
222 " pattern=%08x byte_to_read=%d",
223 received, expected, i, pattern, pattern_byte_to_read);
224 #endif
225 }
226
227 fail_unless (received == expected,
228 "output validation failed: received '0x%02x' expected '0x%02x' byte %"
229 G_GUINT64_FORMAT " offset=%" G_GUINT64_FORMAT "\n", received, expected,
230 i, streamOffset);
231 }
232
233 gst_buffer_unmap (buffer, &info);
234 return TRUE;
235 }
236
237 /* AppSink EOS callback.
238 * To be used by tests that don't expect AppSink to receive EOS.
239 */
240 void
gst_adaptive_demux_test_unexpected_eos(GstAdaptiveDemuxTestEngine * engine,GstAdaptiveDemuxTestOutputStream * stream,gpointer user_data)241 gst_adaptive_demux_test_unexpected_eos (GstAdaptiveDemuxTestEngine *
242 engine, GstAdaptiveDemuxTestOutputStream * stream, gpointer user_data)
243 {
244 fail_if (TRUE);
245 }
246
247 /* AppSink EOS callback.
248 * To be used by tests that expect AppSink to receive EOS.
249 * Will check total size of data received by AppSink.
250 */
251 void
gst_adaptive_demux_test_check_size_of_received_data(GstAdaptiveDemuxTestEngine * engine,GstAdaptiveDemuxTestOutputStream * stream,gpointer user_data)252 gst_adaptive_demux_test_check_size_of_received_data (GstAdaptiveDemuxTestEngine
253 * engine, GstAdaptiveDemuxTestOutputStream * stream, gpointer user_data)
254 {
255 GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data);
256 GstAdaptiveDemuxTestExpectedOutput *testOutputStreamData;
257
258 testOutputStreamData =
259 gst_adaptive_demux_test_find_test_data_by_stream (testData, stream, NULL);
260 fail_unless (testOutputStreamData != NULL);
261
262 fail_unless (stream->total_received_size ==
263 testOutputStreamData->expected_size,
264 "size validation failed, expected %d received %d",
265 testOutputStreamData->expected_size, stream->total_received_size);
266 testData->count_of_finished_streams++;
267 if (testData->count_of_finished_streams ==
268 g_list_length (testData->output_streams)) {
269 g_main_loop_quit (engine->loop);
270 }
271 }
272
273 typedef struct _SeekTaskContext
274 {
275 GstElement *pipeline;
276 GstTask *task;
277 GstEvent *seek_event;
278 } SeekTaskContext;
279
280 /* function to generate a seek event. Will be run in a separate thread */
281 static void
testSeekTaskDoSeek(gpointer user_data)282 testSeekTaskDoSeek (gpointer user_data)
283 {
284 SeekTaskContext *context = (SeekTaskContext *) user_data;
285 GstTask *task;
286
287 GST_DEBUG ("testSeekTaskDoSeek calling seek");
288
289 fail_unless (GST_IS_EVENT (context->seek_event));
290 fail_unless (GST_EVENT_TYPE (context->seek_event) == GST_EVENT_SEEK);
291
292 if (!gst_element_send_event (GST_ELEMENT (context->pipeline),
293 context->seek_event))
294 fail ("Seek failed!\n");
295 GST_DEBUG ("seek ok");
296 task = context->task;
297 g_slice_free (SeekTaskContext, context);
298 gst_task_stop (task);
299 }
300
301 /* function to be called during seek test when demux sends data to AppSink
302 * It monitors the data sent and after a while will generate a seek request.
303 */
304 static gboolean
testSeekAdaptiveDemuxSendsData(GstAdaptiveDemuxTestEngine * engine,GstAdaptiveDemuxTestOutputStream * stream,GstBuffer * buffer,gpointer user_data)305 testSeekAdaptiveDemuxSendsData (GstAdaptiveDemuxTestEngine * engine,
306 GstAdaptiveDemuxTestOutputStream * stream,
307 GstBuffer * buffer, gpointer user_data)
308 {
309 GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data);
310 SeekTaskContext *seekContext;
311 GstAdaptiveDemuxTestExpectedOutput *testOutputStreamData;
312 guint index = 0;
313
314 testOutputStreamData =
315 gst_adaptive_demux_test_find_test_data_by_stream (testData, stream,
316 &index);
317 fail_unless (testOutputStreamData != NULL);
318 /* first entry in testData->output_streams is the
319 PAD on which to perform the seek */
320 if (index == 0 &&
321 testData->test_task == NULL &&
322 (stream->total_received_size + stream->segment_received_size) >=
323 testData->threshold_for_seek) {
324 GstSeekFlags seek_flags;
325
326 testData->threshold_for_seek =
327 stream->total_received_size + stream->segment_received_size;
328
329 gst_event_parse_seek (testData->seek_event, NULL, NULL, &seek_flags, NULL,
330 NULL, NULL, NULL);
331 if (seek_flags & GST_SEEK_FLAG_FLUSH)
332 testOutputStreamData->expected_size += testData->threshold_for_seek;
333
334 GST_DEBUG ("starting seek task");
335
336 g_mutex_lock (&testData->test_task_state_lock);
337 testData->test_task_state =
338 TEST_TASK_STATE_WAITING_FOR_TESTSRC_STATE_CHANGE;
339 g_mutex_unlock (&testData->test_task_state_lock);
340
341 seekContext = g_slice_new (SeekTaskContext);
342 seekContext->pipeline = engine->pipeline;
343 seekContext->seek_event = gst_event_ref (testData->seek_event);
344 testData->test_task = seekContext->task =
345 gst_task_new ((GstTaskFunction) testSeekTaskDoSeek, seekContext, NULL);
346 gst_task_set_lock (testData->test_task, &testData->test_task_lock);
347 gst_task_start (testData->test_task);
348
349 GST_DEBUG ("seek task started");
350
351 if (seek_flags & GST_SEEK_FLAG_FLUSH) {
352 g_mutex_lock (&testData->test_task_state_lock);
353
354 GST_DEBUG ("waiting for seek task to change state on testsrc");
355
356 /* wait for test_task to run, send a flush start event to AppSink
357 * and change the testhttpsrc element state from PLAYING to PAUSED
358 */
359 while (testData->test_task_state ==
360 TEST_TASK_STATE_WAITING_FOR_TESTSRC_STATE_CHANGE) {
361 g_cond_wait (&testData->test_task_state_cond,
362 &testData->test_task_state_lock);
363 }
364 testData->seeked = TRUE;
365 g_mutex_unlock (&testData->test_task_state_lock);
366 /* we can continue now, but this buffer will be rejected by AppSink
367 * because it is in flushing mode
368 */
369 GST_DEBUG ("seek task changed state on testsrc, resuming");
370 }
371 }
372
373 return TRUE;
374 }
375
376 static void
testSeekAdaptiveAppSinkEvent(GstAdaptiveDemuxTestEngine * engine,GstAdaptiveDemuxTestOutputStream * stream,GstEvent * event,gpointer user_data)377 testSeekAdaptiveAppSinkEvent (GstAdaptiveDemuxTestEngine * engine,
378 GstAdaptiveDemuxTestOutputStream * stream,
379 GstEvent * event, gpointer user_data)
380 {
381 GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data);
382 GstAdaptiveDemuxTestExpectedOutput *testOutputStreamData;
383 guint index = 0;
384
385 testOutputStreamData =
386 gst_adaptive_demux_test_find_test_data_by_stream (testData, stream,
387 &index);
388 fail_unless (testOutputStreamData != NULL);
389
390 if (testData->seek_event && GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT
391 && testOutputStreamData->post_seek_segment.format != GST_FORMAT_UNDEFINED
392 && gst_event_get_seqnum (event) ==
393 gst_event_get_seqnum (testData->seek_event)) {
394 const GstSegment *seek_segment;
395
396
397 gst_event_parse_segment (event, &seek_segment);
398 fail_unless (seek_segment->format ==
399 testOutputStreamData->post_seek_segment.format);
400 fail_unless (seek_segment->rate ==
401 testOutputStreamData->post_seek_segment.rate);
402 fail_unless (seek_segment->start ==
403 testOutputStreamData->post_seek_segment.start);
404 fail_unless (seek_segment->stop ==
405 testOutputStreamData->post_seek_segment.stop);
406 fail_unless (seek_segment->base ==
407 testOutputStreamData->post_seek_segment.base);
408 fail_unless (seek_segment->time ==
409 testOutputStreamData->post_seek_segment.time);
410
411 testOutputStreamData->segment_verification_needed = FALSE;
412 }
413 }
414
415 /* callback called when main_loop detects a state changed event */
416 static void
testSeekOnStateChanged(GstBus * bus,GstMessage * msg,gpointer user_data)417 testSeekOnStateChanged (GstBus * bus, GstMessage * msg, gpointer user_data)
418 {
419 GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data);
420 GstState old_state, new_state;
421 const char *srcName = GST_OBJECT_NAME (msg->src);
422
423 gst_message_parse_state_changed (msg, &old_state, &new_state, NULL);
424 GST_DEBUG ("Element %s changed state from %s to %s",
425 GST_OBJECT_NAME (msg->src),
426 gst_element_state_get_name (old_state),
427 gst_element_state_get_name (new_state));
428
429 if (strstr (srcName, "srcbin") == srcName &&
430 old_state == GST_STATE_PLAYING && new_state == GST_STATE_PAUSED) {
431 g_mutex_lock (&testData->test_task_state_lock);
432 if (testData->test_task_state ==
433 TEST_TASK_STATE_WAITING_FOR_TESTSRC_STATE_CHANGE) {
434 GST_DEBUG ("changing test_task_state");
435 testData->test_task_state = TEST_TASK_STATE_EXITING;
436 g_cond_signal (&testData->test_task_state_cond);
437 }
438 g_mutex_unlock (&testData->test_task_state_lock);
439 }
440 }
441
442 /*
443 * Issue a seek request after media segment has started to be downloaded
444 * on the first pad listed in GstAdaptiveDemuxTestOutputStreamData and the
445 * first chunk of at least one byte has already arrived in AppSink
446 */
447 static void
testSeekPreTestCallback(GstAdaptiveDemuxTestEngine * engine,gpointer user_data)448 testSeekPreTestCallback (GstAdaptiveDemuxTestEngine * engine,
449 gpointer user_data)
450 {
451 GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data);
452 GstBus *bus;
453
454 /* register a callback to listen for state change events */
455 bus = gst_pipeline_get_bus (GST_PIPELINE (engine->pipeline));
456 gst_bus_add_signal_watch (bus);
457 g_signal_connect (bus, "message::state-changed",
458 G_CALLBACK (testSeekOnStateChanged), testData);
459 gst_object_unref (bus);
460 }
461
462 static void
testSeekPostTestCallback(GstAdaptiveDemuxTestEngine * engine,gpointer user_data)463 testSeekPostTestCallback (GstAdaptiveDemuxTestEngine * engine,
464 gpointer user_data)
465 {
466 GstBus *bus;
467 GList *walk;
468
469 GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data);
470 for (walk = testData->output_streams; walk; walk = g_list_next (walk)) {
471 GstAdaptiveDemuxTestExpectedOutput *td = walk->data;
472
473 fail_if (td->segment_verification_needed);
474 }
475
476 bus = gst_pipeline_get_bus (GST_PIPELINE (engine->pipeline));
477 gst_bus_remove_signal_watch (bus);
478 gst_object_unref (bus);
479 }
480
481 /* function to check total size of data received by AppSink
482 * will be called when AppSink receives eos.
483 */
gst_adaptive_demux_test_download_error_size_of_received_data(GstAdaptiveDemuxTestEngine * engine,GstAdaptiveDemuxTestOutputStream * stream,gpointer user_data)484 void gst_adaptive_demux_test_download_error_size_of_received_data
485 (GstAdaptiveDemuxTestEngine * engine,
486 GstAdaptiveDemuxTestOutputStream * stream, gpointer user_data)
487 {
488 GstAdaptiveDemuxTestCase *testData = GST_ADAPTIVE_DEMUX_TEST_CASE (user_data);
489 GstAdaptiveDemuxTestExpectedOutput *testOutputStreamData;
490
491 testOutputStreamData =
492 gst_adaptive_demux_test_find_test_data_by_stream (testData, stream, NULL);
493 fail_unless (testOutputStreamData != NULL);
494 /* expect to receive more than 0 */
495 fail_unless (stream->total_received_size > 0,
496 "size validation failed for %s, expected > 0, received %d",
497 testOutputStreamData->name, stream->total_received_size);
498
499 /* expect to receive less than file size */
500 fail_unless (stream->total_received_size <
501 testOutputStreamData->expected_size,
502 "size validation failed for %s, expected < %d received %d",
503 testOutputStreamData->name, testOutputStreamData->expected_size,
504 stream->total_received_size);
505 if (testData->count_of_finished_streams ==
506 g_list_length (testData->output_streams)) {
507 g_main_loop_quit (engine->loop);
508 }
509 }
510
511 void
gst_adaptive_demux_test_seek(const gchar * element_name,const gchar * manifest_uri,GstAdaptiveDemuxTestCase * testData)512 gst_adaptive_demux_test_seek (const gchar * element_name,
513 const gchar * manifest_uri, GstAdaptiveDemuxTestCase * testData)
514 {
515 GstAdaptiveDemuxTestCallbacks cb = { 0 };
516 cb.appsink_received_data = gst_adaptive_demux_test_check_received_data;
517 cb.appsink_eos = gst_adaptive_demux_test_check_size_of_received_data;
518 cb.appsink_event = testSeekAdaptiveAppSinkEvent;
519 cb.pre_test = testSeekPreTestCallback;
520 cb.post_test = testSeekPostTestCallback;
521 cb.demux_sent_data = testSeekAdaptiveDemuxSendsData;
522 gst_adaptive_demux_test_run (element_name, manifest_uri, &cb, testData);
523 /* the call to g_object_unref of testData will clean up the seek task */
524 }
525
526 void
gst_adaptive_demux_test_setup(void)527 gst_adaptive_demux_test_setup (void)
528 {
529 GstRegistry *registry;
530 gboolean ret;
531
532 registry = gst_registry_get ();
533 ret = gst_test_http_src_register_plugin (registry, GST_TEST_HTTP_SRC_NAME);
534 fail_unless (ret);
535 }
536
537 void
gst_adaptive_demux_test_teardown(void)538 gst_adaptive_demux_test_teardown (void)
539 {
540 gst_test_http_src_install_callbacks (NULL, NULL);
541 gst_test_http_src_set_default_blocksize (0);
542 }
543