1 /* GStreamer unit tests for multiqueue
2  *
3  * Copyright (C) 2007 Tim-Philipp Müller <tim centricular net>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23 
24 #include <gst/check/gstcheck.h>
25 
26 static GMutex _check_lock;
27 
28 static GstElement *
setup_multiqueue(GstElement * pipe,GstElement * inputs[],GstElement * outputs[],guint num)29 setup_multiqueue (GstElement * pipe, GstElement * inputs[],
30     GstElement * outputs[], guint num)
31 {
32   GstElement *mq;
33   guint i;
34 
35   mq = gst_element_factory_make ("multiqueue", NULL);
36   fail_unless (mq != NULL, "failed to create 'multiqueue' element");
37 
38   gst_bin_add (GST_BIN (pipe), mq);
39 
40   for (i = 0; i < num; ++i) {
41     GstPad *sinkpad = NULL;
42     GstPad *srcpad = NULL;
43 
44     /* create multiqueue sink (and source) pad */
45     sinkpad = gst_element_get_request_pad (mq, "sink_%u");
46     fail_unless (sinkpad != NULL,
47         "failed to create multiqueue request pad #%u", i);
48 
49     /* link input element N to the N-th multiqueue sink pad we just created */
50     if (inputs != NULL && inputs[i] != NULL) {
51       gst_bin_add (GST_BIN (pipe), inputs[i]);
52 
53       srcpad = gst_element_get_static_pad (inputs[i], "src");
54       fail_unless (srcpad != NULL, "failed to find src pad for input #%u", i);
55 
56       fail_unless_equals_int (GST_PAD_LINK_OK, gst_pad_link (srcpad, sinkpad));
57 
58       gst_object_unref (srcpad);
59       srcpad = NULL;
60     }
61     gst_object_unref (sinkpad);
62     sinkpad = NULL;
63 
64     /* link output element N to the N-th multiqueue src pad */
65     if (outputs != NULL && outputs[i] != NULL) {
66       gchar padname[10];
67 
68       /* only the sink pads are by request, the source pads are sometimes pads,
69        * so this should return NULL */
70       srcpad = gst_element_get_request_pad (mq, "src_%u");
71       fail_unless (srcpad == NULL);
72 
73       g_snprintf (padname, sizeof (padname), "src_%u", i);
74       srcpad = gst_element_get_static_pad (mq, padname);
75       fail_unless (srcpad != NULL, "failed to get multiqueue src pad #%u", i);
76       fail_unless (GST_PAD_IS_SRC (srcpad),
77           "%s:%s is not a source pad?!", GST_DEBUG_PAD_NAME (srcpad));
78 
79       gst_bin_add (GST_BIN (pipe), outputs[i]);
80 
81       sinkpad = gst_element_get_static_pad (outputs[i], "sink");
82       fail_unless (sinkpad != NULL, "failed to find sink pad of output #%u", i);
83       fail_unless (GST_PAD_IS_SINK (sinkpad));
84 
85       fail_unless_equals_int (GST_PAD_LINK_OK, gst_pad_link (srcpad, sinkpad));
86 
87       gst_object_unref (srcpad);
88       gst_object_unref (sinkpad);
89     }
90   }
91 
92   return mq;
93 }
94 
GST_START_TEST(test_simple_pipeline)95 GST_START_TEST (test_simple_pipeline)
96 {
97   GstElement *pipe;
98   GstElement *inputs[1];
99   GstElement *outputs[1];
100   GstMessage *msg;
101 
102   pipe = gst_pipeline_new ("pipeline");
103 
104   inputs[0] = gst_element_factory_make ("fakesrc", NULL);
105   fail_unless (inputs[0] != NULL, "failed to create 'fakesrc' element");
106   g_object_set (inputs[0], "num-buffers", 256, NULL);
107 
108   outputs[0] = gst_element_factory_make ("fakesink", NULL);
109   fail_unless (outputs[0] != NULL, "failed to create 'fakesink' element");
110 
111   setup_multiqueue (pipe, inputs, outputs, 1);
112 
113   gst_element_set_state (pipe, GST_STATE_PLAYING);
114 
115   msg = gst_bus_poll (GST_ELEMENT_BUS (pipe),
116       GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
117 
118   fail_if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR,
119       "Expected EOS message, got ERROR message");
120   gst_message_unref (msg);
121 
122   GST_LOG ("Got EOS, cleaning up");
123 
124   gst_element_set_state (pipe, GST_STATE_NULL);
125   gst_object_unref (pipe);
126 }
127 
128 GST_END_TEST;
129 
GST_START_TEST(test_simple_shutdown_while_running)130 GST_START_TEST (test_simple_shutdown_while_running)
131 {
132   GstElement *pipe;
133   GstElement *inputs[1];
134   GstElement *outputs[1];
135   GstMessage *msg;
136 
137   pipe = gst_pipeline_new ("pipeline");
138 
139   inputs[0] = gst_element_factory_make ("fakesrc", NULL);
140   fail_unless (inputs[0] != NULL, "failed to create 'fakesrc' element");
141 
142   outputs[0] = gst_element_factory_make ("fakesink", NULL);
143   fail_unless (outputs[0] != NULL, "failed to create 'fakesink' element");
144 
145   setup_multiqueue (pipe, inputs, outputs, 1);
146 
147   gst_element_set_state (pipe, GST_STATE_PAUSED);
148 
149   /* wait until pipeline is up and running */
150   msg = gst_bus_poll (GST_ELEMENT_BUS (pipe),
151       GST_MESSAGE_ERROR | GST_MESSAGE_ASYNC_DONE, -1);
152   fail_if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR, "Got ERROR message");
153   gst_message_unref (msg);
154 
155   GST_LOG ("pipeline is running now");
156   gst_element_set_state (pipe, GST_STATE_PAUSED);
157 
158   /* wait a bit to accumulate some buffers in the queue (while it's blocking
159    * in the sink) */
160   msg =
161       gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_ERROR, GST_SECOND / 4);
162   if (msg)
163     g_error ("Got ERROR message");
164 
165   /* now shut down only the sink, so the queue gets a wrong-state flow return */
166   gst_element_set_state (outputs[0], GST_STATE_NULL);
167   msg =
168       gst_bus_poll (GST_ELEMENT_BUS (pipe), GST_MESSAGE_ERROR, GST_SECOND / 2);
169   if (msg)
170     g_error ("Got ERROR message");
171 
172   GST_LOG ("Cleaning up");
173 
174   gst_element_set_state (pipe, GST_STATE_NULL);
175   gst_object_unref (pipe);
176 }
177 
178 GST_END_TEST;
179 
GST_START_TEST(test_simple_create_destroy)180 GST_START_TEST (test_simple_create_destroy)
181 {
182   GstElement *mq;
183 
184   mq = gst_element_factory_make ("multiqueue", NULL);
185   gst_object_unref (mq);
186 }
187 
188 GST_END_TEST;
189 
GST_START_TEST(test_request_pads)190 GST_START_TEST (test_request_pads)
191 {
192   GstElement *mq;
193   GstPad *sink1, *sink2;
194 
195   mq = gst_element_factory_make ("multiqueue", NULL);
196 
197   sink1 = gst_element_get_request_pad (mq, "foo_%u");
198   fail_unless (sink1 == NULL,
199       "Expected NULL pad, as there is no request pad template for 'foo_%%u'");
200 
201   sink1 = gst_element_get_request_pad (mq, "src_%u");
202   fail_unless (sink1 == NULL,
203       "Expected NULL pad, as there is no request pad template for 'src_%%u'");
204 
205   sink1 = gst_element_get_request_pad (mq, "sink_%u");
206   fail_unless (sink1 != NULL);
207   fail_unless (GST_IS_PAD (sink1));
208   fail_unless (GST_PAD_IS_SINK (sink1));
209   GST_LOG ("Got pad %s:%s", GST_DEBUG_PAD_NAME (sink1));
210 
211   sink2 = gst_element_get_request_pad (mq, "sink_%u");
212   fail_unless (sink2 != NULL);
213   fail_unless (GST_IS_PAD (sink2));
214   fail_unless (GST_PAD_IS_SINK (sink2));
215   GST_LOG ("Got pad %s:%s", GST_DEBUG_PAD_NAME (sink2));
216 
217   fail_unless (sink1 != sink2);
218 
219   GST_LOG ("Cleaning up");
220   gst_object_unref (sink1);
221   gst_object_unref (sink2);
222   gst_object_unref (mq);
223 }
224 
225 GST_END_TEST;
226 
227 static GstPad *
mq_sinkpad_to_srcpad(GstElement * mq,GstPad * sink)228 mq_sinkpad_to_srcpad (GstElement * mq, GstPad * sink)
229 {
230   GstPad *srcpad = NULL;
231 
232   gchar *mq_sinkpad_name;
233   gchar *mq_srcpad_name;
234 
235   mq_sinkpad_name = gst_pad_get_name (sink);
236   fail_unless (g_str_has_prefix (mq_sinkpad_name, "sink"));
237   mq_srcpad_name = g_strdup_printf ("src_%s", mq_sinkpad_name + 5);
238   srcpad = gst_element_get_static_pad (mq, mq_srcpad_name);
239   fail_unless (srcpad != NULL);
240 
241   g_free (mq_sinkpad_name);
242   g_free (mq_srcpad_name);
243 
244   return srcpad;
245 }
246 
GST_START_TEST(test_request_pads_named)247 GST_START_TEST (test_request_pads_named)
248 {
249   GstElement *mq;
250   GstPad *sink1, *sink2, *sink3, *sink4;
251 
252   mq = gst_element_factory_make ("multiqueue", NULL);
253 
254   sink1 = gst_element_get_request_pad (mq, "sink_1");
255   fail_unless (sink1 != NULL);
256   fail_unless (GST_IS_PAD (sink1));
257   fail_unless (GST_PAD_IS_SINK (sink1));
258   fail_unless_equals_string (GST_PAD_NAME (sink1), "sink_1");
259   GST_LOG ("Got pad %s:%s", GST_DEBUG_PAD_NAME (sink1));
260 
261   sink3 = gst_element_get_request_pad (mq, "sink_3");
262   fail_unless (sink3 != NULL);
263   fail_unless (GST_IS_PAD (sink3));
264   fail_unless (GST_PAD_IS_SINK (sink3));
265   fail_unless_equals_string (GST_PAD_NAME (sink3), "sink_3");
266   GST_LOG ("Got pad %s:%s", GST_DEBUG_PAD_NAME (sink3));
267 
268   sink2 = gst_element_get_request_pad (mq, "sink_2");
269   fail_unless (sink2 != NULL);
270   fail_unless (GST_IS_PAD (sink2));
271   fail_unless (GST_PAD_IS_SINK (sink2));
272   fail_unless_equals_string (GST_PAD_NAME (sink2), "sink_2");
273   GST_LOG ("Got pad %s:%s", GST_DEBUG_PAD_NAME (sink2));
274 
275   /* This gets us the first unused id, sink0 */
276   sink4 = gst_element_get_request_pad (mq, "sink_%u");
277   fail_unless (sink4 != NULL);
278   fail_unless (GST_IS_PAD (sink4));
279   fail_unless (GST_PAD_IS_SINK (sink4));
280   fail_unless_equals_string (GST_PAD_NAME (sink4), "sink_0");
281   GST_LOG ("Got pad %s:%s", GST_DEBUG_PAD_NAME (sink4));
282 
283   GST_LOG ("Cleaning up");
284   gst_object_unref (sink1);
285   gst_object_unref (sink2);
286   gst_object_unref (sink3);
287   gst_object_unref (sink4);
288   gst_object_unref (mq);
289 }
290 
291 GST_END_TEST;
292 
293 static gboolean
mq_dummypad_query(GstPad * sinkpad,GstObject * parent,GstQuery * query)294 mq_dummypad_query (GstPad * sinkpad, GstObject * parent, GstQuery * query)
295 {
296   gboolean res = TRUE;
297 
298   switch (GST_QUERY_TYPE (query)) {
299     case GST_QUERY_CAPS:
300     {
301       GstCaps *filter, *caps;
302 
303       gst_query_parse_caps (query, &filter);
304       caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
305       gst_query_set_caps_result (query, caps);
306       gst_caps_unref (caps);
307       break;
308     }
309     default:
310       res = gst_pad_query_default (sinkpad, parent, query);
311       break;
312   }
313   return res;
314 }
315 
316 struct PadData
317 {
318   GstPad *input_pad;
319   GstPad *out_pad;
320   guint8 pad_num;
321   guint32 *max_linked_id_ptr;
322   guint32 *eos_count_ptr;
323   gboolean is_linked;
324   gboolean first_buf;
325   gint n_linked;
326 
327   GMutex *mutex;
328   GCond *cond;
329 
330   /* used by initial_events_nodelay */
331   gint event_count;
332 };
333 
334 static GstFlowReturn
mq_dummypad_chain(GstPad * sinkpad,GstObject * parent,GstBuffer * buf)335 mq_dummypad_chain (GstPad * sinkpad, GstObject * parent, GstBuffer * buf)
336 {
337   guint32 cur_id;
338   struct PadData *pad_data;
339   GstMapInfo info;
340 
341   pad_data = gst_pad_get_element_private (sinkpad);
342 
343   g_mutex_lock (&_check_lock);
344   fail_if (pad_data == NULL);
345   /* Read an ID from the first 4 bytes of the buffer data and check it's
346    * what we expect */
347   fail_unless (gst_buffer_map (buf, &info, GST_MAP_READ));
348   fail_unless (info.size >= 4);
349   g_mutex_unlock (&_check_lock);
350   cur_id = GST_READ_UINT32_BE (info.data);
351   gst_buffer_unmap (buf, &info);
352 
353   g_mutex_lock (pad_data->mutex);
354 
355   /* For not-linked pads, ensure that we're not running ahead of the 'linked'
356    * pads. The first buffer is allowed to get ahead, because otherwise things can't
357    * always pre-roll correctly */
358   if (pad_data->max_linked_id_ptr) {
359     if (!pad_data->is_linked) {
360       /* If there are no linked pads, we can't track a max_id for them :) */
361       if (pad_data->n_linked > 0 && !pad_data->first_buf) {
362         g_mutex_lock (&_check_lock);
363         fail_unless (cur_id <= *(pad_data->max_linked_id_ptr) + 1,
364             "Got buffer %u on pad %u before buffer %u was seen on a "
365             "linked pad (max: %u)", cur_id, pad_data->pad_num, cur_id - 1,
366             *(pad_data->max_linked_id_ptr));
367         g_mutex_unlock (&_check_lock);
368       }
369     } else {
370       /* Update the max_id value */
371       if (cur_id > *(pad_data->max_linked_id_ptr))
372         *(pad_data->max_linked_id_ptr) = cur_id;
373     }
374   }
375   pad_data->first_buf = FALSE;
376 
377   g_mutex_unlock (pad_data->mutex);
378 
379   /* Unref the buffer */
380   gst_buffer_unref (buf);
381 
382   /* Return OK or not-linked as indicated */
383   return pad_data->is_linked ? GST_FLOW_OK : GST_FLOW_NOT_LINKED;
384 }
385 
386 static gboolean
mq_dummypad_event(GstPad * sinkpad,GstObject * parent,GstEvent * event)387 mq_dummypad_event (GstPad * sinkpad, GstObject * parent, GstEvent * event)
388 {
389   struct PadData *pad_data;
390 
391   pad_data = gst_pad_get_element_private (sinkpad);
392   g_mutex_lock (&_check_lock);
393   fail_if (pad_data == NULL);
394   g_mutex_unlock (&_check_lock);
395 
396   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
397     g_mutex_lock (pad_data->mutex);
398 
399     /* Accumulate that we've seen the EOS and signal the main thread */
400     if (pad_data->eos_count_ptr)
401       *(pad_data->eos_count_ptr) += 1;
402 
403     GST_DEBUG ("EOS on pad %u", pad_data->pad_num);
404 
405     g_cond_broadcast (pad_data->cond);
406     g_mutex_unlock (pad_data->mutex);
407   }
408 
409   gst_event_unref (event);
410   return TRUE;
411 }
412 
413 static void
construct_n_pads(GstElement * mq,struct PadData * pad_data,gint n_pads,gint n_linked)414 construct_n_pads (GstElement * mq, struct PadData *pad_data, gint n_pads,
415     gint n_linked)
416 {
417   gint i;
418   GstSegment segment;
419 
420   gst_segment_init (&segment, GST_FORMAT_BYTES);
421 
422   /* Construct NPADS dummy output pads. The first 'n_linked' return FLOW_OK, the rest
423    * return NOT_LINKED. The not-linked ones check the expected ordering of
424    * output buffers */
425   for (i = 0; i < n_pads; i++) {
426     GstPad *mq_srcpad, *mq_sinkpad, *inpad, *outpad;
427     gchar *name;
428 
429     name = g_strdup_printf ("dummysrc%d", i);
430     inpad = gst_pad_new (name, GST_PAD_SRC);
431     g_free (name);
432     gst_pad_set_query_function (inpad, mq_dummypad_query);
433 
434     mq_sinkpad = gst_element_get_request_pad (mq, "sink_%u");
435     fail_unless (mq_sinkpad != NULL);
436     fail_unless (gst_pad_link (inpad, mq_sinkpad) == GST_PAD_LINK_OK);
437 
438     gst_pad_set_active (inpad, TRUE);
439 
440     gst_pad_push_event (inpad, gst_event_new_stream_start ("test"));
441     gst_pad_push_event (inpad, gst_event_new_segment (&segment));
442 
443     mq_srcpad = mq_sinkpad_to_srcpad (mq, mq_sinkpad);
444 
445     name = g_strdup_printf ("dummysink%d", i);
446     outpad = gst_pad_new (name, GST_PAD_SINK);
447     g_free (name);
448     gst_pad_set_chain_function (outpad, mq_dummypad_chain);
449     gst_pad_set_event_function (outpad, mq_dummypad_event);
450     gst_pad_set_query_function (outpad, mq_dummypad_query);
451 
452     pad_data[i].pad_num = i;
453     pad_data[i].input_pad = inpad;
454     pad_data[i].out_pad = outpad;
455     pad_data[i].max_linked_id_ptr = NULL;
456     pad_data[i].eos_count_ptr = NULL;
457     pad_data[i].is_linked = (i < n_linked ? TRUE : FALSE);
458     pad_data[i].n_linked = n_linked;
459     pad_data[i].cond = NULL;
460     pad_data[i].mutex = NULL;
461     pad_data[i].first_buf = TRUE;
462     gst_pad_set_element_private (outpad, pad_data + i);
463 
464     fail_unless (gst_pad_link (mq_srcpad, outpad) == GST_PAD_LINK_OK);
465     gst_pad_set_active (outpad, TRUE);
466 
467     gst_object_unref (mq_sinkpad);
468     gst_object_unref (mq_srcpad);
469   }
470 }
471 
472 static void
push_n_buffers(struct PadData * pad_data,gint num_buffers,const guint8 * pad_pattern,guint pattern_size)473 push_n_buffers (struct PadData *pad_data, gint num_buffers,
474     const guint8 * pad_pattern, guint pattern_size)
475 {
476   gint i;
477 
478   for (i = 0; i < num_buffers; i++) {
479     guint8 cur_pad;
480     GstBuffer *buf;
481     GstFlowReturn ret;
482     GstMapInfo info;
483 
484     cur_pad = pad_pattern[i % pattern_size];
485 
486     buf = gst_buffer_new_and_alloc (4);
487     g_mutex_lock (&_check_lock);
488     fail_if (buf == NULL);
489     g_mutex_unlock (&_check_lock);
490 
491     fail_unless (gst_buffer_map (buf, &info, GST_MAP_WRITE));
492     GST_WRITE_UINT32_BE (info.data, i + 1);
493     gst_buffer_unmap (buf, &info);
494     GST_BUFFER_TIMESTAMP (buf) = (i + 1) * GST_SECOND;
495 
496     ret = gst_pad_push (pad_data[cur_pad].input_pad, buf);
497     g_mutex_lock (&_check_lock);
498     if (pad_data[cur_pad].is_linked) {
499       fail_unless (ret == GST_FLOW_OK,
500           "Push on pad %d returned %d when FLOW_OK was expected", cur_pad, ret);
501     } else {
502       /* Expect OK initially, then NOT_LINKED when the srcpad starts pushing */
503       fail_unless (ret == GST_FLOW_OK || ret == GST_FLOW_NOT_LINKED,
504           "Push on pad %d returned %d when FLOW_OK or NOT_LINKED  was expected",
505           cur_pad, ret);
506     }
507     g_mutex_unlock (&_check_lock);
508   }
509 }
510 
511 static void
run_output_order_test(gint n_linked)512 run_output_order_test (gint n_linked)
513 {
514   /* This test creates a multiqueue with 2 linked output, and 3 outputs that
515    * return 'not-linked' when data is pushed, then verifies that all buffers
516    * are received on not-linked pads only after earlier buffers on the
517    * 'linked' pads are made */
518   GstElement *pipe;
519   GstElement *mq;
520   struct PadData pad_data[5];
521   guint32 max_linked_id;
522   guint32 eos_seen;
523   GMutex mutex;
524   GCond cond;
525   gint i;
526   const gint NPADS = 5;
527   const gint NBUFFERS = 1000;
528 
529   g_mutex_init (&mutex);
530   g_cond_init (&cond);
531 
532   pipe = gst_bin_new ("testbin");
533 
534   mq = gst_element_factory_make ("multiqueue", NULL);
535   fail_unless (mq != NULL);
536   gst_bin_add (GST_BIN (pipe), mq);
537 
538   /* No limits */
539   g_object_set (mq,
540       "max-size-bytes", (guint) 0,
541       "max-size-buffers", (guint) 0,
542       "max-size-time", (guint64) 0,
543       "extra-size-bytes", (guint) 0,
544       "extra-size-buffers", (guint) 0, "extra-size-time", (guint64) 0, NULL);
545 
546   construct_n_pads (mq, pad_data, NPADS, n_linked);
547   for (i = 0; i < NPADS; i++) {
548     pad_data[i].max_linked_id_ptr = &max_linked_id;
549     /* Only look for EOS on the linked pads */
550     pad_data[i].eos_count_ptr = (i < n_linked) ? &eos_seen : NULL;
551     pad_data[i].cond = &cond;
552     pad_data[i].mutex = &mutex;
553   }
554 
555   /* Run the test. Push 1000 buffers through the multiqueue in a pattern */
556   max_linked_id = 0;
557   eos_seen = 0;
558   gst_element_set_state (pipe, GST_STATE_PLAYING);
559 
560   {
561     const guint8 pad_pattern[] =
562         { 0, 0, 0, 0, 1, 1, 2, 1, 0, 2, 3, 2, 3, 1, 4 };
563     const guint n = sizeof (pad_pattern) / sizeof (guint8);
564     push_n_buffers (pad_data, NBUFFERS, pad_pattern, n);
565   }
566 
567   for (i = 0; i < NPADS; i++) {
568     gst_pad_push_event (pad_data[i].input_pad, gst_event_new_eos ());
569   }
570 
571   /* Wait while the buffers are processed */
572   g_mutex_lock (&mutex);
573   /* We wait until EOS has been pushed on all linked pads */
574   while (eos_seen < n_linked) {
575     g_cond_wait (&cond, &mutex);
576   }
577   g_mutex_unlock (&mutex);
578 
579   /* Clean up */
580   for (i = 0; i < NPADS; i++) {
581     GstPad *mq_input = gst_pad_get_peer (pad_data[i].input_pad);
582 
583     gst_pad_unlink (pad_data[i].input_pad, mq_input);
584     gst_element_release_request_pad (mq, mq_input);
585     gst_object_unref (mq_input);
586     gst_object_unref (pad_data[i].input_pad);
587     gst_object_unref (pad_data[i].out_pad);
588   }
589 
590   gst_element_set_state (pipe, GST_STATE_NULL);
591   gst_object_unref (pipe);
592 
593   g_cond_clear (&cond);
594   g_mutex_clear (&mutex);
595 }
596 
GST_START_TEST(test_output_order)597 GST_START_TEST (test_output_order)
598 {
599   run_output_order_test (2);
600   run_output_order_test (0);
601 }
602 
603 GST_END_TEST;
604 
GST_START_TEST(test_not_linked_eos)605 GST_START_TEST (test_not_linked_eos)
606 {
607   /* This test creates a multiqueue with 1 linked output and 1 not-linked
608    * pad. It pushes a few buffers through each, then EOS on the linked
609    * pad and waits until that arrives. After that, it pushes some more
610    * buffers on the not-linked pad and then EOS and checks that those
611    * are all output */
612   GstElement *pipe;
613   GstElement *mq;
614   struct PadData pad_data[2];
615   guint32 eos_seen;
616   GMutex mutex;
617   GCond cond;
618   gint i;
619   const gint NPADS = 2;
620   const gint NBUFFERS = 20;
621   GstSegment segment;
622 
623   gst_segment_init (&segment, GST_FORMAT_BYTES);
624 
625   g_mutex_init (&mutex);
626   g_cond_init (&cond);
627 
628   pipe = gst_bin_new ("testbin");
629 
630   mq = gst_element_factory_make ("multiqueue", NULL);
631   fail_unless (mq != NULL);
632   gst_bin_add (GST_BIN (pipe), mq);
633 
634   /* No limits */
635   g_object_set (mq,
636       "max-size-bytes", (guint) 0,
637       "max-size-buffers", (guint) 0,
638       "max-size-time", (guint64) 0,
639       "extra-size-bytes", (guint) 0,
640       "extra-size-buffers", (guint) 0, "extra-size-time", (guint64) 0, NULL);
641 
642   /* Construct NPADS dummy output pads. The first 1 returns FLOW_OK, the rest
643    * return NOT_LINKED. */
644   construct_n_pads (mq, pad_data, NPADS, 1);
645   for (i = 0; i < NPADS; i++) {
646     /* Only look for EOS on the linked pads */
647     pad_data[i].eos_count_ptr = &eos_seen;
648     pad_data[i].cond = &cond;
649     pad_data[i].mutex = &mutex;
650   }
651 
652   /* Run the test. Push 20 buffers through the multiqueue in a pattern */
653   eos_seen = 0;
654   gst_element_set_state (pipe, GST_STATE_PLAYING);
655 
656   {
657     const guint8 pad_pattern[] = { 0, 1 };
658     const guint n = sizeof (pad_pattern) / sizeof (guint8);
659     push_n_buffers (pad_data, NBUFFERS, pad_pattern, n);
660   }
661 
662   /* Make the linked pad go EOS */
663   gst_pad_push_event (pad_data[0].input_pad, gst_event_new_eos ());
664 
665   g_mutex_lock (&mutex);
666   /* Wait until EOS has been seen on the linked pad */
667   while (eos_seen == 0)
668     g_cond_wait (&cond, &mutex);
669   g_mutex_unlock (&mutex);
670 
671   /* Now push some more buffers to the not-linked pad */
672   {
673     const guint8 pad_pattern[] = { 1, 1 };
674     const guint n = sizeof (pad_pattern) / sizeof (guint8);
675     push_n_buffers (pad_data, NBUFFERS, pad_pattern, n);
676   }
677   /* And EOS on the not-linked pad */
678   gst_pad_push_event (pad_data[1].input_pad, gst_event_new_eos ());
679 
680   g_mutex_lock (&mutex);
681   while (eos_seen < NPADS)
682     g_cond_wait (&cond, &mutex);
683   g_mutex_unlock (&mutex);
684 
685   /* Clean up */
686   for (i = 0; i < NPADS; i++) {
687     GstPad *mq_input = gst_pad_get_peer (pad_data[i].input_pad);
688 
689     gst_pad_unlink (pad_data[i].input_pad, mq_input);
690     gst_element_release_request_pad (mq, mq_input);
691     gst_object_unref (mq_input);
692     gst_object_unref (pad_data[i].input_pad);
693     gst_object_unref (pad_data[i].out_pad);
694   }
695 
696   gst_element_set_state (pipe, GST_STATE_NULL);
697   gst_object_unref (pipe);
698 
699   g_cond_clear (&cond);
700   g_mutex_clear (&mutex);
701 }
702 
703 GST_END_TEST;
704 
GST_START_TEST(test_sparse_stream)705 GST_START_TEST (test_sparse_stream)
706 {
707   /* This test creates a multiqueue with 2 streams. One receives
708    * a constant flow of buffers, the other only gets one buffer, and then
709    * new-segment events, and returns not-linked. The multiqueue should not fill.
710    */
711   GstElement *pipe;
712   GstElement *mq;
713   GstPad *inputpads[2];
714   GstPad *sinkpads[2];
715   GstEvent *event;
716   struct PadData pad_data[2];
717   guint32 eos_seen, max_linked_id;
718   GMutex mutex;
719   GCond cond;
720   gint i;
721   const gint NBUFFERS = 100;
722   GstSegment segment;
723 
724   g_mutex_init (&mutex);
725   g_cond_init (&cond);
726 
727   pipe = gst_pipeline_new ("testbin");
728   mq = gst_element_factory_make ("multiqueue", NULL);
729   fail_unless (mq != NULL);
730   gst_bin_add (GST_BIN (pipe), mq);
731 
732   /* 1 second limit */
733   g_object_set (mq,
734       "max-size-bytes", (guint) 0,
735       "max-size-buffers", (guint) 0,
736       "max-size-time", (guint64) GST_SECOND,
737       "extra-size-bytes", (guint) 0,
738       "extra-size-buffers", (guint) 0, "extra-size-time", (guint64) 0, NULL);
739 
740   gst_segment_init (&segment, GST_FORMAT_TIME);
741 
742   /* Construct 2 dummy output pads. */
743   for (i = 0; i < 2; i++) {
744     GstPad *mq_srcpad, *mq_sinkpad;
745     gchar *name;
746 
747     name = g_strdup_printf ("dummysrc%d", i);
748     inputpads[i] = gst_pad_new (name, GST_PAD_SRC);
749     g_free (name);
750     gst_pad_set_query_function (inputpads[i], mq_dummypad_query);
751 
752     mq_sinkpad = gst_element_get_request_pad (mq, "sink_%u");
753     fail_unless (mq_sinkpad != NULL);
754     fail_unless (gst_pad_link (inputpads[i], mq_sinkpad) == GST_PAD_LINK_OK);
755 
756     gst_pad_set_active (inputpads[i], TRUE);
757 
758     gst_pad_push_event (inputpads[i], gst_event_new_stream_start ("test"));
759     gst_pad_push_event (inputpads[i], gst_event_new_segment (&segment));
760 
761     mq_srcpad = mq_sinkpad_to_srcpad (mq, mq_sinkpad);
762 
763     name = g_strdup_printf ("dummysink%d", i);
764     sinkpads[i] = gst_pad_new (name, GST_PAD_SINK);
765     g_free (name);
766     gst_pad_set_chain_function (sinkpads[i], mq_dummypad_chain);
767     gst_pad_set_event_function (sinkpads[i], mq_dummypad_event);
768     gst_pad_set_query_function (sinkpads[i], mq_dummypad_query);
769 
770     pad_data[i].pad_num = i;
771     pad_data[i].max_linked_id_ptr = &max_linked_id;
772     if (i == 0)
773       pad_data[i].eos_count_ptr = &eos_seen;
774     else
775       pad_data[i].eos_count_ptr = NULL;
776     pad_data[i].is_linked = (i == 0) ? TRUE : FALSE;
777     pad_data[i].n_linked = 1;
778     pad_data[i].cond = &cond;
779     pad_data[i].mutex = &mutex;
780     pad_data[i].first_buf = TRUE;
781     gst_pad_set_element_private (sinkpads[i], pad_data + i);
782 
783     fail_unless (gst_pad_link (mq_srcpad, sinkpads[i]) == GST_PAD_LINK_OK);
784     gst_pad_set_active (sinkpads[i], TRUE);
785 
786     gst_object_unref (mq_sinkpad);
787     gst_object_unref (mq_srcpad);
788   }
789 
790   /* Run the test. Push 100 buffers through the multiqueue */
791   max_linked_id = 0;
792   eos_seen = 0;
793 
794   gst_element_set_state (pipe, GST_STATE_PLAYING);
795 
796   for (i = 0; i < NBUFFERS; i++) {
797     GstBuffer *buf;
798     GstFlowReturn ret;
799     GstClockTime ts;
800     GstMapInfo info;
801 
802     ts = gst_util_uint64_scale_int (GST_SECOND, i, 10);
803 
804     buf = gst_buffer_new_and_alloc (4);
805     g_mutex_lock (&_check_lock);
806     fail_if (buf == NULL);
807     g_mutex_unlock (&_check_lock);
808 
809     fail_unless (gst_buffer_map (buf, &info, GST_MAP_WRITE));
810     GST_WRITE_UINT32_BE (info.data, i + 1);
811     gst_buffer_unmap (buf, &info);
812 
813     GST_BUFFER_TIMESTAMP (buf) = gst_util_uint64_scale_int (GST_SECOND, i, 10);
814 
815     /* If i == 0, also push the buffer to the 2nd pad */
816     if (i == 0)
817       ret = gst_pad_push (inputpads[1], gst_buffer_ref (buf));
818 
819     ret = gst_pad_push (inputpads[0], buf);
820     g_mutex_lock (&_check_lock);
821     fail_unless (ret == GST_FLOW_OK,
822         "Push on pad %d returned %d when FLOW_OK was expected", 0, ret);
823     g_mutex_unlock (&_check_lock);
824 
825     /* Push a new segment update on the 2nd pad */
826     gst_segment_init (&segment, GST_FORMAT_TIME);
827     segment.start = ts;
828     segment.time = ts;
829     event = gst_event_new_segment (&segment);
830     gst_pad_push_event (inputpads[1], event);
831   }
832 
833   event = gst_event_new_eos ();
834   gst_pad_push_event (inputpads[0], gst_event_ref (event));
835   gst_pad_push_event (inputpads[1], event);
836 
837   /* Wait while the buffers are processed */
838   g_mutex_lock (&mutex);
839   /* We wait until EOS has been pushed on pad 1 */
840   while (eos_seen < 1) {
841     g_cond_wait (&cond, &mutex);
842   }
843   g_mutex_unlock (&mutex);
844 
845   /* Clean up */
846   for (i = 0; i < 2; i++) {
847     GstPad *mq_input = gst_pad_get_peer (inputpads[i]);
848 
849     gst_pad_unlink (inputpads[i], mq_input);
850     gst_element_release_request_pad (mq, mq_input);
851     gst_object_unref (mq_input);
852     gst_object_unref (inputpads[i]);
853 
854     gst_object_unref (sinkpads[i]);
855   }
856 
857   gst_element_set_state (pipe, GST_STATE_NULL);
858   gst_object_unref (pipe);
859 
860   g_cond_clear (&cond);
861   g_mutex_clear (&mutex);
862 }
863 
864 GST_END_TEST;
865 
866 static gpointer
pad_push_datablock_thread(gpointer data)867 pad_push_datablock_thread (gpointer data)
868 {
869   GstPad *pad = data;
870   GstBuffer *buf;
871 
872   buf = gst_buffer_new_allocate (NULL, 80 * 1000, NULL);
873   gst_pad_push (pad, buf);
874 
875   return NULL;
876 }
877 
878 static GstPadProbeReturn
block_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)879 block_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
880 {
881   return GST_PAD_PROBE_OK;
882 }
883 
884 static void
check_for_buffering_msg(GstElement * pipeline,gint expected_perc)885 check_for_buffering_msg (GstElement * pipeline, gint expected_perc)
886 {
887   gint buf_perc;
888   GstMessage *msg;
889 
890   GST_LOG ("waiting for %d%% buffering message", expected_perc);
891 
892   msg = gst_bus_poll (GST_ELEMENT_BUS (pipeline),
893       GST_MESSAGE_BUFFERING | GST_MESSAGE_ERROR, -1);
894   fail_if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR,
895       "Expected BUFFERING message, got ERROR message");
896 
897   gst_message_parse_buffering (msg, &buf_perc);
898   fail_unless (buf_perc == expected_perc,
899       "Got incorrect percentage: %d%% expected: %d%%", buf_perc, expected_perc);
900 
901   gst_message_unref (msg);
902 }
903 
GST_START_TEST(test_initial_fill_above_high_threshold)904 GST_START_TEST (test_initial_fill_above_high_threshold)
905 {
906   /* This test checks what happens if the first buffer that enters
907    * the queue immediately fills it above the high-threshold. */
908   GstElement *pipe;
909   GstElement *mq, *fakesink;
910   GstPad *inputpad;
911   GstPad *mq_sinkpad;
912   GstPad *sinkpad;
913   GstSegment segment;
914   GThread *thread;
915 
916 
917   /* Setup test pipeline with one multiqueue and one fakesink */
918 
919   pipe = gst_pipeline_new ("testbin");
920   mq = gst_element_factory_make ("multiqueue", NULL);
921   fail_unless (mq != NULL);
922   gst_bin_add (GST_BIN (pipe), mq);
923 
924   fakesink = gst_element_factory_make ("fakesink", NULL);
925   fail_unless (fakesink != NULL);
926   gst_bin_add (GST_BIN (pipe), fakesink);
927 
928   /* Block fakesink sinkpad flow to ensure the queue isn't emptied
929    * by the prerolling sink */
930   sinkpad = gst_element_get_static_pad (fakesink, "sink");
931   gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BLOCK, block_probe, NULL,
932       NULL);
933   gst_object_unref (sinkpad);
934 
935   /* Set size limit to 1000000 byte, low threshold to 1%, high
936    * threshold to 5%, to make sure that even just one data push
937    * will exceed both thresholds.*/
938   g_object_set (mq,
939       "use-buffering", (gboolean) TRUE,
940       "max-size-bytes", (guint) 1000 * 1000,
941       "max-size-buffers", (guint) 0,
942       "max-size-time", (guint64) 0,
943       "extra-size-bytes", (guint) 0,
944       "extra-size-buffers", (guint) 0,
945       "extra-size-time", (guint64) 0,
946       "low-percent", (gint) 1, "high-percent", (gint) 5, NULL);
947 
948   gst_segment_init (&segment, GST_FORMAT_TIME);
949 
950   inputpad = gst_pad_new ("dummysrc", GST_PAD_SRC);
951   gst_pad_set_query_function (inputpad, mq_dummypad_query);
952 
953   mq_sinkpad = gst_element_get_request_pad (mq, "sink_%u");
954   fail_unless (mq_sinkpad != NULL);
955   fail_unless (gst_pad_link (inputpad, mq_sinkpad) == GST_PAD_LINK_OK);
956 
957   gst_pad_set_active (inputpad, TRUE);
958 
959   gst_pad_push_event (inputpad, gst_event_new_stream_start ("test"));
960   gst_pad_push_event (inputpad, gst_event_new_segment (&segment));
961 
962   gst_object_unref (mq_sinkpad);
963 
964   fail_unless (gst_element_link (mq, fakesink));
965 
966   /* Start pipeline in paused state to ensure the sink remains
967    * in preroll mode and blocks */
968   gst_element_set_state (pipe, GST_STATE_PAUSED);
969 
970   /* Feed data. queue will be filled to 8% (because it pushes 80000 bytes),
971    * which is above both the low- and the high-threshold. This should
972    * produce a 100% buffering message. */
973   thread = g_thread_new ("push1", pad_push_datablock_thread, inputpad);
974   g_thread_join (thread);
975   check_for_buffering_msg (pipe, 100);
976 
977   gst_element_set_state (pipe, GST_STATE_NULL);
978   gst_object_unref (inputpad);
979   gst_object_unref (pipe);
980 }
981 
982 GST_END_TEST;
983 
GST_START_TEST(test_watermark_and_fill_level)984 GST_START_TEST (test_watermark_and_fill_level)
985 {
986   /* This test checks the behavior of the fill level and
987    * the low/high watermarks. It also checks if the
988    * low/high-percent and low/high-watermark properties
989    * are coupled together properly. */
990   GstElement *pipe;
991   GstElement *mq, *fakesink;
992   GstPad *inputpad;
993   GstPad *mq_sinkpad;
994   GstPad *sinkpad;
995   GstSegment segment;
996   GThread *thread;
997   gint low_perc, high_perc;
998 
999 
1000   /* Setup test pipeline with one multiqueue and one fakesink */
1001 
1002   pipe = gst_pipeline_new ("testbin");
1003   mq = gst_element_factory_make ("multiqueue", NULL);
1004   fail_unless (mq != NULL);
1005   gst_bin_add (GST_BIN (pipe), mq);
1006 
1007   fakesink = gst_element_factory_make ("fakesink", NULL);
1008   fail_unless (fakesink != NULL);
1009   gst_bin_add (GST_BIN (pipe), fakesink);
1010 
1011   /* Block fakesink sinkpad flow to ensure the queue isn't emptied
1012    * by the prerolling sink */
1013   sinkpad = gst_element_get_static_pad (fakesink, "sink");
1014   gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BLOCK, block_probe, NULL,
1015       NULL);
1016   gst_object_unref (sinkpad);
1017 
1018   g_object_set (mq,
1019       "use-buffering", (gboolean) TRUE,
1020       "max-size-bytes", (guint) 1000 * 1000,
1021       "max-size-buffers", (guint) 0,
1022       "max-size-time", (guint64) 0,
1023       "extra-size-bytes", (guint) 0,
1024       "extra-size-buffers", (guint) 0,
1025       "extra-size-time", (guint64) 0,
1026       "low-watermark", (gdouble) 0.01, "high-watermark", (gdouble) 0.10, NULL);
1027 
1028   g_object_get (mq, "low-percent", &low_perc, "high-percent", &high_perc, NULL);
1029 
1030   /* Check that low/high-watermark and low/high-percent are
1031    * coupled properly. (low/high-percent are deprecated and
1032    * exist for backwards compatibility.) */
1033   fail_unless_equals_int (low_perc, 1);
1034   fail_unless_equals_int (high_perc, 10);
1035 
1036   gst_segment_init (&segment, GST_FORMAT_TIME);
1037 
1038   inputpad = gst_pad_new ("dummysrc", GST_PAD_SRC);
1039   gst_pad_set_query_function (inputpad, mq_dummypad_query);
1040 
1041   mq_sinkpad = gst_element_get_request_pad (mq, "sink_%u");
1042   fail_unless (mq_sinkpad != NULL);
1043   fail_unless (gst_pad_link (inputpad, mq_sinkpad) == GST_PAD_LINK_OK);
1044 
1045   gst_pad_set_active (inputpad, TRUE);
1046 
1047   gst_pad_push_event (inputpad, gst_event_new_stream_start ("test"));
1048   gst_pad_push_event (inputpad, gst_event_new_segment (&segment));
1049 
1050   gst_object_unref (mq_sinkpad);
1051 
1052   fail_unless (gst_element_link (mq, fakesink));
1053 
1054   /* Start pipeline in paused state to ensure the sink remains
1055    * in preroll mode and blocks */
1056   gst_element_set_state (pipe, GST_STATE_PAUSED);
1057 
1058   /* Feed data. queue will be filled to 8% (because it pushes 80000 bytes),
1059    * which is below the high-threshold, provoking a buffering message. */
1060   thread = g_thread_new ("push1", pad_push_datablock_thread, inputpad);
1061   g_thread_join (thread);
1062 
1063   /* Check for the buffering message; it should indicate 80% fill level
1064    * (Note that the percentage from the message is normalized) */
1065   check_for_buffering_msg (pipe, 80);
1066 
1067   /* Increase the buffer size and lower the watermarks to test
1068    * if <1% watermarks are supported. */
1069   g_object_set (mq,
1070       "max-size-bytes", (guint) 20 * 1000 * 1000,
1071       "low-watermark", (gdouble) 0.0001, "high-watermark", (gdouble) 0.005,
1072       NULL);
1073   /* First buffering message is posted after the max-size-bytes limit
1074    * is set to 20000000 bytes & the low-watermark is set. Since the
1075    * multiqueue contains 80000 bytes, and the high watermark still is
1076    * 0.1 at this point, and the buffer level 80000 / 20000000 = 0.004 is
1077    * normalized by 0.1: 0.004 / 0.1 => buffering percentage 4%. */
1078   check_for_buffering_msg (pipe, 4);
1079   /* Second buffering message is posted after the high-watermark limit
1080    * is set to 0.005. This time, the buffer level is normalized this way:
1081    * 0.004 / 0.005 => buffering percentage 80%. */
1082   check_for_buffering_msg (pipe, 80);
1083 
1084 
1085   gst_element_set_state (pipe, GST_STATE_NULL);
1086   gst_object_unref (inputpad);
1087   gst_object_unref (pipe);
1088 }
1089 
1090 GST_END_TEST;
1091 
GST_START_TEST(test_high_threshold_change)1092 GST_START_TEST (test_high_threshold_change)
1093 {
1094   /* This test checks what happens if the high threshold is changed to a
1095    * value below the current buffer fill level. Expected behavior is for
1096    * multiqueue to emit a 100% buffering message in that case. */
1097   GstElement *pipe;
1098   GstElement *mq, *fakesink;
1099   GstPad *inputpad;
1100   GstPad *mq_sinkpad;
1101   GstPad *sinkpad;
1102   GstSegment segment;
1103   GThread *thread;
1104 
1105 
1106   /* Setup test pipeline with one multiqueue and one fakesink */
1107 
1108   pipe = gst_pipeline_new ("testbin");
1109   mq = gst_element_factory_make ("multiqueue", NULL);
1110   fail_unless (mq != NULL);
1111   gst_bin_add (GST_BIN (pipe), mq);
1112 
1113   fakesink = gst_element_factory_make ("fakesink", NULL);
1114   fail_unless (fakesink != NULL);
1115   gst_bin_add (GST_BIN (pipe), fakesink);
1116 
1117   /* Block fakesink sinkpad flow to ensure the queue isn't emptied
1118    * by the prerolling sink */
1119   sinkpad = gst_element_get_static_pad (fakesink, "sink");
1120   gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BLOCK, block_probe, NULL,
1121       NULL);
1122   gst_object_unref (sinkpad);
1123 
1124   g_object_set (mq,
1125       "use-buffering", (gboolean) TRUE,
1126       "max-size-bytes", (guint) 1000 * 1000,
1127       "max-size-buffers", (guint) 0,
1128       "max-size-time", (guint64) 0,
1129       "extra-size-bytes", (guint) 0,
1130       "extra-size-buffers", (guint) 0,
1131       "extra-size-time", (guint64) 0,
1132       "low-percent", (gint) 1, "high-percent", (gint) 99, NULL);
1133 
1134   gst_segment_init (&segment, GST_FORMAT_TIME);
1135 
1136   inputpad = gst_pad_new ("dummysrc", GST_PAD_SRC);
1137   gst_pad_set_query_function (inputpad, mq_dummypad_query);
1138 
1139   mq_sinkpad = gst_element_get_request_pad (mq, "sink_%u");
1140   fail_unless (mq_sinkpad != NULL);
1141   fail_unless (gst_pad_link (inputpad, mq_sinkpad) == GST_PAD_LINK_OK);
1142 
1143   gst_pad_set_active (inputpad, TRUE);
1144 
1145   gst_pad_push_event (inputpad, gst_event_new_stream_start ("test"));
1146   gst_pad_push_event (inputpad, gst_event_new_segment (&segment));
1147 
1148   gst_object_unref (mq_sinkpad);
1149 
1150   fail_unless (gst_element_link (mq, fakesink));
1151 
1152   /* Start pipeline in paused state to ensure the sink remains
1153    * in preroll mode and blocks */
1154   gst_element_set_state (pipe, GST_STATE_PAUSED);
1155 
1156   /* Feed data. queue will be filled to 8% (because it pushes 80000 bytes),
1157    * which is below the high-threshold, provoking a buffering message. */
1158   thread = g_thread_new ("push1", pad_push_datablock_thread, inputpad);
1159   g_thread_join (thread);
1160 
1161   /* Check for the buffering message; it should indicate 8% fill level
1162    * (Note that the percentage from the message is normalized, but since
1163    * the high threshold is at 99%, it should still apply) */
1164   check_for_buffering_msg (pipe, 8);
1165 
1166   /* Set high threshold to half of what it was before. This means that the
1167    * relative fill level doubles. As a result, this should trigger a buffering
1168    * message with a percentage of 16%. */
1169   g_object_set (mq, "high-percent", (gint) 50, NULL);
1170   check_for_buffering_msg (pipe, 16);
1171 
1172   /* Set high threshold to a value that lies below the current fill level.
1173    * This should trigger a 100% buffering message immediately, even without
1174    * pushing in extra data. */
1175   g_object_set (mq, "high-percent", (gint) 5, NULL);
1176   check_for_buffering_msg (pipe, 100);
1177 
1178   gst_element_set_state (pipe, GST_STATE_NULL);
1179   gst_object_unref (inputpad);
1180   gst_object_unref (pipe);
1181 }
1182 
1183 GST_END_TEST;
1184 
GST_START_TEST(test_low_threshold_change)1185 GST_START_TEST (test_low_threshold_change)
1186 {
1187   /* This tests what happens if the queue isn't currently buffering and the
1188    * low-threshold is raised above the current fill level. */
1189   GstElement *pipe;
1190   GstElement *mq, *fakesink;
1191   GstPad *inputpad;
1192   GstPad *mq_sinkpad;
1193   GstPad *sinkpad;
1194   GstSegment segment;
1195   GThread *thread;
1196 
1197 
1198   /* Setup test pipeline with one multiqueue and one fakesink */
1199 
1200   pipe = gst_pipeline_new ("testbin");
1201   mq = gst_element_factory_make ("multiqueue", NULL);
1202   fail_unless (mq != NULL);
1203   gst_bin_add (GST_BIN (pipe), mq);
1204 
1205   fakesink = gst_element_factory_make ("fakesink", NULL);
1206   fail_unless (fakesink != NULL);
1207   gst_bin_add (GST_BIN (pipe), fakesink);
1208 
1209   /* Block fakesink sinkpad flow to ensure the queue isn't emptied
1210    * by the prerolling sink */
1211   sinkpad = gst_element_get_static_pad (fakesink, "sink");
1212   gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BLOCK, block_probe, NULL,
1213       NULL);
1214   gst_object_unref (sinkpad);
1215 
1216   /* Enable buffering and set the low/high thresholds to 1%/5%. This ensures
1217    * that after pushing one data block, the high threshold is reached, and
1218    * buffering ceases. */
1219   g_object_set (mq,
1220       "use-buffering", (gboolean) TRUE,
1221       "max-size-bytes", (guint) 1000 * 1000,
1222       "max-size-buffers", (guint) 0,
1223       "max-size-time", (guint64) 0,
1224       "extra-size-bytes", (guint) 0,
1225       "extra-size-buffers", (guint) 0,
1226       "extra-size-time", (guint64) 0,
1227       "low-percent", (gint) 1, "high-percent", (gint) 5, NULL);
1228 
1229   gst_segment_init (&segment, GST_FORMAT_TIME);
1230 
1231   inputpad = gst_pad_new ("dummysrc", GST_PAD_SRC);
1232   gst_pad_set_query_function (inputpad, mq_dummypad_query);
1233 
1234   mq_sinkpad = gst_element_get_request_pad (mq, "sink_%u");
1235   fail_unless (mq_sinkpad != NULL);
1236   fail_unless (gst_pad_link (inputpad, mq_sinkpad) == GST_PAD_LINK_OK);
1237 
1238   gst_pad_set_active (inputpad, TRUE);
1239 
1240   gst_pad_push_event (inputpad, gst_event_new_stream_start ("test"));
1241   gst_pad_push_event (inputpad, gst_event_new_segment (&segment));
1242 
1243   gst_object_unref (mq_sinkpad);
1244 
1245   fail_unless (gst_element_link (mq, fakesink));
1246 
1247   /* Start pipeline in paused state to ensure the sink remains
1248    * in preroll mode and blocks */
1249   gst_element_set_state (pipe, GST_STATE_PAUSED);
1250 
1251   /* Feed data. queue will be filled to 8% (because it pushes 80000 bytes),
1252    * which is above the high-threshold, ensuring that the queue disables
1253    * its buffering mode internally. */
1254   thread = g_thread_new ("push1", pad_push_datablock_thread, inputpad);
1255   g_thread_join (thread);
1256 
1257   /* Check for the buffering message; it should indicate 100% relative fill
1258    * level (Note that the percentage from the message is normalized) */
1259   check_for_buffering_msg (pipe, 100);
1260 
1261   /* Set low threshold to a 10%, which is above the current fill level of 8%.
1262    * As a result, the queue must re-enable its buffering mode, and post the
1263    * current relative fill level of 40% (since high-percent is also set to 20%
1264    * and 8%/20% = 40%). */
1265   g_object_set (mq, "high-percent", (gint) 20, "low-percent", (gint) 10, NULL);
1266   check_for_buffering_msg (pipe, 40);
1267 
1268   gst_element_set_state (pipe, GST_STATE_NULL);
1269   gst_object_unref (inputpad);
1270   gst_object_unref (pipe);
1271 }
1272 
1273 GST_END_TEST;
1274 
1275 static gpointer
pad_push_thread(gpointer data)1276 pad_push_thread (gpointer data)
1277 {
1278   GstPad *pad = data;
1279   GstBuffer *buf;
1280 
1281   buf = gst_buffer_new ();
1282   gst_pad_push (pad, buf);
1283 
1284   return NULL;
1285 }
1286 
GST_START_TEST(test_limit_changes)1287 GST_START_TEST (test_limit_changes)
1288 {
1289   /* This test creates a multiqueue with 1 stream. The limit of the queue
1290    * is two buffers, we check if we block once this is reached. Then we
1291    * change the limit to three buffers and check if this is waking up
1292    * the queue and we get the third buffer.
1293    */
1294   GstElement *pipe;
1295   GstElement *mq, *fakesink;
1296   GstPad *inputpad;
1297   GstPad *mq_sinkpad;
1298   GstSegment segment;
1299   GThread *thread;
1300 
1301   pipe = gst_pipeline_new ("testbin");
1302   mq = gst_element_factory_make ("multiqueue", NULL);
1303   fail_unless (mq != NULL);
1304   gst_bin_add (GST_BIN (pipe), mq);
1305 
1306   fakesink = gst_element_factory_make ("fakesink", NULL);
1307   fail_unless (fakesink != NULL);
1308   gst_bin_add (GST_BIN (pipe), fakesink);
1309 
1310   g_object_set (mq,
1311       "max-size-bytes", (guint) 0,
1312       "max-size-buffers", (guint) 2,
1313       "max-size-time", (guint64) 0,
1314       "extra-size-bytes", (guint) 0,
1315       "extra-size-buffers", (guint) 0, "extra-size-time", (guint64) 0, NULL);
1316 
1317   gst_segment_init (&segment, GST_FORMAT_TIME);
1318 
1319   inputpad = gst_pad_new ("dummysrc", GST_PAD_SRC);
1320   gst_pad_set_query_function (inputpad, mq_dummypad_query);
1321 
1322   mq_sinkpad = gst_element_get_request_pad (mq, "sink_%u");
1323   fail_unless (mq_sinkpad != NULL);
1324   fail_unless (gst_pad_link (inputpad, mq_sinkpad) == GST_PAD_LINK_OK);
1325 
1326   gst_pad_set_active (inputpad, TRUE);
1327 
1328   gst_pad_push_event (inputpad, gst_event_new_stream_start ("test"));
1329   gst_pad_push_event (inputpad, gst_event_new_segment (&segment));
1330 
1331   gst_object_unref (mq_sinkpad);
1332 
1333   fail_unless (gst_element_link (mq, fakesink));
1334 
1335   gst_element_set_state (pipe, GST_STATE_PAUSED);
1336 
1337   thread = g_thread_new ("push1", pad_push_thread, inputpad);
1338   g_thread_join (thread);
1339   thread = g_thread_new ("push2", pad_push_thread, inputpad);
1340   g_thread_join (thread);
1341   thread = g_thread_new ("push3", pad_push_thread, inputpad);
1342   g_thread_join (thread);
1343   thread = g_thread_new ("push4", pad_push_thread, inputpad);
1344 
1345   /* Wait until we are actually blocking... we unfortunately can't
1346    * know that without sleeping */
1347   g_usleep (G_USEC_PER_SEC);
1348   g_object_set (mq, "max-size-buffers", (guint) 3, NULL);
1349   g_thread_join (thread);
1350 
1351   g_object_set (mq, "max-size-buffers", (guint) 4, NULL);
1352   thread = g_thread_new ("push5", pad_push_thread, inputpad);
1353   g_thread_join (thread);
1354 
1355   gst_element_set_state (pipe, GST_STATE_NULL);
1356   gst_object_unref (inputpad);
1357   gst_object_unref (pipe);
1358 }
1359 
1360 GST_END_TEST;
1361 
1362 static GMutex block_mutex;
1363 static GCond block_cond;
1364 static gint unblock_count;
1365 static gboolean expect_overrun;
1366 
1367 static GstFlowReturn
pad_chain_block(GstPad * pad,GstObject * parent,GstBuffer * buffer)1368 pad_chain_block (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1369 {
1370   g_mutex_lock (&block_mutex);
1371   while (unblock_count == 0) {
1372     g_cond_wait (&block_cond, &block_mutex);
1373   }
1374   if (unblock_count > 0) {
1375     unblock_count--;
1376   }
1377   g_mutex_unlock (&block_mutex);
1378 
1379   gst_buffer_unref (buffer);
1380   return GST_FLOW_OK;
1381 }
1382 
1383 static gboolean
pad_event_always_ok(GstPad * pad,GstObject * parent,GstEvent * event)1384 pad_event_always_ok (GstPad * pad, GstObject * parent, GstEvent * event)
1385 {
1386   gst_event_unref (event);
1387   return TRUE;
1388 }
1389 
1390 static void
mq_overrun(GstElement * mq,gpointer udata)1391 mq_overrun (GstElement * mq, gpointer udata)
1392 {
1393   fail_unless (expect_overrun);
1394 
1395   /* unblock always so we don't get stuck */
1396   g_mutex_lock (&block_mutex);
1397   unblock_count = 2;            /* let the PTS=0 and PTS=none go */
1398   g_cond_signal (&block_cond);
1399   g_mutex_unlock (&block_mutex);
1400 }
1401 
GST_START_TEST(test_buffering_with_none_pts)1402 GST_START_TEST (test_buffering_with_none_pts)
1403 {
1404   /*
1405    * This test creates a multiqueue where source pushing blocks so we can check
1406    * how its buffering level is reacting to GST_CLOCK_TIME_NONE buffers
1407    * mixed with properly timestamped buffers.
1408    *
1409    * Sequence of pushing:
1410    * pts=0
1411    * pts=none
1412    * pts=1 (it gets full now)
1413    * pts=none (overrun expected)
1414    */
1415   GstElement *mq;
1416   GstPad *inputpad;
1417   GstPad *outputpad;
1418   GstPad *mq_sinkpad;
1419   GstPad *mq_srcpad;
1420   GstSegment segment;
1421   GstBuffer *buffer;
1422 
1423   g_mutex_init (&block_mutex);
1424   g_cond_init (&block_cond);
1425   unblock_count = 0;
1426   expect_overrun = FALSE;
1427 
1428   mq = gst_element_factory_make ("multiqueue", NULL);
1429   fail_unless (mq != NULL);
1430 
1431   g_object_set (mq,
1432       "max-size-bytes", (guint) 0,
1433       "max-size-buffers", (guint) 0,
1434       "max-size-time", (guint64) GST_SECOND, NULL);
1435   g_signal_connect (mq, "overrun", (GCallback) mq_overrun, NULL);
1436 
1437   gst_segment_init (&segment, GST_FORMAT_TIME);
1438 
1439   inputpad = gst_pad_new ("dummysrc", GST_PAD_SRC);
1440   outputpad = gst_pad_new ("dummysink", GST_PAD_SINK);
1441   gst_pad_set_chain_function (outputpad, pad_chain_block);
1442   gst_pad_set_event_function (outputpad, pad_event_always_ok);
1443   mq_sinkpad = gst_element_get_request_pad (mq, "sink_%u");
1444   mq_srcpad = gst_element_get_static_pad (mq, "src_0");
1445   fail_unless (mq_sinkpad != NULL);
1446   fail_unless (gst_pad_link (inputpad, mq_sinkpad) == GST_PAD_LINK_OK);
1447   fail_unless (gst_pad_link (mq_srcpad, outputpad) == GST_PAD_LINK_OK);
1448 
1449   gst_pad_set_active (inputpad, TRUE);
1450   gst_pad_set_active (outputpad, TRUE);
1451   gst_pad_push_event (inputpad, gst_event_new_stream_start ("test"));
1452   gst_pad_push_event (inputpad, gst_event_new_segment (&segment));
1453 
1454   gst_element_set_state (mq, GST_STATE_PAUSED);
1455 
1456   /* push a buffer with PTS = 0 */
1457   buffer = gst_buffer_new ();
1458   GST_BUFFER_PTS (buffer) = 0;
1459   fail_unless (gst_pad_push (inputpad, buffer) == GST_FLOW_OK);
1460 
1461   /* push a buffer with PTS = NONE */
1462   buffer = gst_buffer_new ();
1463   GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
1464   fail_unless (gst_pad_push (inputpad, buffer) == GST_FLOW_OK);
1465 
1466   /* push a buffer with PTS = 1s, so we have 1s of data in multiqueue, we are
1467    * full */
1468   buffer = gst_buffer_new ();
1469   GST_BUFFER_PTS (buffer) = GST_SECOND;
1470   fail_unless (gst_pad_push (inputpad, buffer) == GST_FLOW_OK);
1471 
1472   /* push a buffer with PTS = NONE, the queue is full so it should overrun */
1473   expect_overrun = TRUE;
1474   buffer = gst_buffer_new ();
1475   GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
1476   fail_unless (gst_pad_push (inputpad, buffer) == GST_FLOW_OK);
1477 
1478   g_mutex_lock (&block_mutex);
1479   unblock_count = -1;
1480   g_cond_signal (&block_cond);
1481   g_mutex_unlock (&block_mutex);
1482 
1483   gst_element_set_state (mq, GST_STATE_NULL);
1484   gst_object_unref (inputpad);
1485   gst_object_unref (outputpad);
1486   gst_object_unref (mq_sinkpad);
1487   gst_object_unref (mq_srcpad);
1488   gst_object_unref (mq);
1489   g_mutex_clear (&block_mutex);
1490   g_cond_clear (&block_cond);
1491 }
1492 
1493 GST_END_TEST;
1494 
1495 static gboolean
event_func_signal(GstPad * sinkpad,GstObject * parent,GstEvent * event)1496 event_func_signal (GstPad * sinkpad, GstObject * parent, GstEvent * event)
1497 {
1498   struct PadData *pad_data;
1499 
1500   GST_LOG_OBJECT (sinkpad, "%s event", GST_EVENT_TYPE_NAME (event));
1501 
1502   pad_data = gst_pad_get_element_private (sinkpad);
1503 
1504   g_mutex_lock (pad_data->mutex);
1505   ++pad_data->event_count;
1506   g_cond_broadcast (pad_data->cond);
1507   g_mutex_unlock (pad_data->mutex);
1508 
1509   gst_event_unref (event);
1510   return TRUE;
1511 }
1512 
GST_START_TEST(test_initial_events_nodelay)1513 GST_START_TEST (test_initial_events_nodelay)
1514 {
1515   struct PadData pad_data = { 0, };
1516   GstElement *pipe;
1517   GstElement *mq;
1518   GstPad *inputpad;
1519   GstPad *sinkpad;
1520   GstSegment segment;
1521   GstCaps *caps;
1522   GMutex mutex;
1523   GCond cond;
1524 
1525   g_mutex_init (&mutex);
1526   g_cond_init (&cond);
1527 
1528   pipe = gst_pipeline_new ("testbin");
1529 
1530   mq = gst_element_factory_make ("multiqueue", NULL);
1531   fail_unless (mq != NULL);
1532   gst_bin_add (GST_BIN (pipe), mq);
1533 
1534   {
1535     GstPad *mq_srcpad, *mq_sinkpad;
1536 
1537     inputpad = gst_pad_new ("dummysrc", GST_PAD_SRC);
1538 
1539     mq_sinkpad = gst_element_get_request_pad (mq, "sink_%u");
1540     fail_unless (mq_sinkpad != NULL);
1541     fail_unless (gst_pad_link (inputpad, mq_sinkpad) == GST_PAD_LINK_OK);
1542 
1543     gst_pad_set_active (inputpad, TRUE);
1544 
1545     mq_srcpad = mq_sinkpad_to_srcpad (mq, mq_sinkpad);
1546 
1547     sinkpad = gst_pad_new ("dummysink", GST_PAD_SINK);
1548     gst_pad_set_event_function (sinkpad, event_func_signal);
1549 
1550     pad_data.event_count = 0;
1551     pad_data.cond = &cond;
1552     pad_data.mutex = &mutex;
1553     gst_pad_set_element_private (sinkpad, &pad_data);
1554 
1555     fail_unless (gst_pad_link (mq_srcpad, sinkpad) == GST_PAD_LINK_OK);
1556     gst_pad_set_active (sinkpad, TRUE);
1557 
1558     gst_object_unref (mq_sinkpad);
1559     gst_object_unref (mq_srcpad);
1560   }
1561 
1562   /* Run the test: push events through multiqueue */
1563   gst_element_set_state (pipe, GST_STATE_PLAYING);
1564 
1565   gst_pad_push_event (inputpad, gst_event_new_stream_start ("test"));
1566 
1567   caps = gst_caps_new_empty_simple ("foo/x-bar");
1568   gst_pad_push_event (inputpad, gst_event_new_caps (caps));
1569   gst_caps_unref (caps);
1570 
1571   gst_segment_init (&segment, GST_FORMAT_TIME);
1572   gst_pad_push_event (inputpad, gst_event_new_segment (&segment));
1573 
1574   g_mutex_lock (&mutex);
1575   while (pad_data.event_count < 3) {
1576     GST_LOG ("%d events so far, waiting for more", pad_data.event_count);
1577     g_cond_wait (&cond, &mutex);
1578   }
1579   g_mutex_unlock (&mutex);
1580 
1581   /* Clean up */
1582   {
1583     GstPad *mq_input = gst_pad_get_peer (inputpad);
1584 
1585     gst_pad_unlink (inputpad, mq_input);
1586     gst_element_release_request_pad (mq, mq_input);
1587     gst_object_unref (mq_input);
1588     gst_object_unref (inputpad);
1589 
1590     gst_object_unref (sinkpad);
1591   }
1592 
1593   gst_element_set_state (pipe, GST_STATE_NULL);
1594   gst_object_unref (pipe);
1595 
1596   g_cond_clear (&cond);
1597   g_mutex_clear (&mutex);
1598 }
1599 
1600 GST_END_TEST;
1601 
1602 static void
check_for_stream_status_msg(GstElement * pipeline,GstElement * multiqueue,GstStreamStatusType expected_type)1603 check_for_stream_status_msg (GstElement * pipeline, GstElement * multiqueue,
1604     GstStreamStatusType expected_type)
1605 {
1606   GEnumClass *klass;
1607   const gchar *expected_nick, *nick;
1608   GstMessage *msg;
1609   GstStreamStatusType type;
1610   GstElement *owner;
1611 
1612   klass = g_type_class_ref (GST_TYPE_STREAM_STATUS_TYPE);
1613   expected_nick = g_enum_get_value (klass, expected_type)->value_nick;
1614 
1615   GST_LOG ("waiting for stream-status %s message", expected_nick);
1616 
1617   msg = gst_bus_poll (GST_ELEMENT_BUS (pipeline),
1618       GST_MESSAGE_STREAM_STATUS | GST_MESSAGE_ERROR, -1);
1619   fail_if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR,
1620       "Expected stream-status message, got error message");
1621 
1622   gst_message_parse_stream_status (msg, &type, &owner);
1623   nick = g_enum_get_value (klass, type)->value_nick;
1624   fail_unless (owner == multiqueue,
1625       "Got incorrect owner: %" GST_PTR_FORMAT " expected: %" GST_PTR_FORMAT,
1626       owner, multiqueue);
1627   fail_unless (type == expected_type,
1628       "Got incorrect type: %s expected: %s", nick, expected_nick);
1629 
1630   gst_message_unref (msg);
1631   g_type_class_unref (klass);
1632 }
1633 
GST_START_TEST(test_stream_status_messages)1634 GST_START_TEST (test_stream_status_messages)
1635 {
1636   GstElement *pipe, *mq;
1637   GstPad *pad;
1638 
1639   pipe = gst_pipeline_new ("pipeline");
1640   mq = gst_element_factory_make ("multiqueue", NULL);
1641 
1642   gst_bin_add (GST_BIN (pipe), mq);
1643 
1644   pad = gst_element_get_request_pad (mq, "sink_%u");
1645   gst_object_unref (pad);
1646 
1647   gst_element_set_state (pipe, GST_STATE_PAUSED);
1648 
1649   check_for_stream_status_msg (pipe, mq, GST_STREAM_STATUS_TYPE_CREATE);
1650   check_for_stream_status_msg (pipe, mq, GST_STREAM_STATUS_TYPE_ENTER);
1651 
1652   pad = gst_element_get_request_pad (mq, "sink_%u");
1653   gst_object_unref (pad);
1654 
1655   check_for_stream_status_msg (pipe, mq, GST_STREAM_STATUS_TYPE_CREATE);
1656   check_for_stream_status_msg (pipe, mq, GST_STREAM_STATUS_TYPE_ENTER);
1657 
1658   gst_element_set_state (pipe, GST_STATE_NULL);
1659   gst_object_unref (pipe);
1660 }
1661 
1662 GST_END_TEST;
1663 
1664 static Suite *
multiqueue_suite(void)1665 multiqueue_suite (void)
1666 {
1667   Suite *s = suite_create ("multiqueue");
1668   TCase *tc_chain = tcase_create ("general");
1669 
1670   suite_add_tcase (s, tc_chain);
1671   tcase_add_test (tc_chain, test_simple_create_destroy);
1672   tcase_add_test (tc_chain, test_simple_pipeline);
1673   tcase_add_test (tc_chain, test_simple_shutdown_while_running);
1674 
1675   tcase_add_test (tc_chain, test_request_pads);
1676   tcase_add_test (tc_chain, test_request_pads_named);
1677 
1678   /* Disabled, The test (and not multiqueue itself) is racy.
1679    * See https://bugzilla.gnome.org/show_bug.cgi?id=708661 */
1680   tcase_skip_broken_test (tc_chain, test_output_order);
1681 
1682   tcase_add_test (tc_chain, test_not_linked_eos);
1683 
1684   tcase_add_test (tc_chain, test_sparse_stream);
1685   tcase_add_test (tc_chain, test_initial_fill_above_high_threshold);
1686   tcase_add_test (tc_chain, test_watermark_and_fill_level);
1687   tcase_add_test (tc_chain, test_high_threshold_change);
1688   tcase_add_test (tc_chain, test_low_threshold_change);
1689   tcase_add_test (tc_chain, test_limit_changes);
1690 
1691   tcase_add_test (tc_chain, test_buffering_with_none_pts);
1692   tcase_add_test (tc_chain, test_initial_events_nodelay);
1693 
1694   tcase_add_test (tc_chain, test_stream_status_messages);
1695 
1696   return s;
1697 }
1698 
1699 GST_CHECK_MAIN (multiqueue)
1700