1 /* GStreamer
2  *
3  * Copyright (C) <2015> Centricular Ltd
4  *  @author: Edward Hervey <edward@centricular.com>
5  *  @author: Jan Schmidt <jan@centricular.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26 
27 #include <glib.h>
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
30 #include <gst/gst.h>
31 #include <gst/pbutils/pbutils.h>
32 
33 #include "gstplayback.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
36 
37 /**
38  * SECTION:element-decodebin3
39  * @title: decodebin3
40  *
41  * #GstBin that auto-magically constructs a decoding pipeline using available
42  * decoders and demuxers via auto-plugging. The output is raw audio, video
43  * or subtitle streams.
44  *
45  * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
46  *
47  * * supports publication and selection of stream information via
48  * GstStreamCollection messages and #GST_EVENT_SELECT_STREAM events.
49  *
50  * * dynamically switches stream connections internally, and
51  * reuses decoder elements when stream selections change, so that in
52  * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
53  * and only creates new elements when streams change and an existing decoder
54  * is not capable of handling the new format.
55  *
56  * * supports multiple input pads for the parallel decoding of auxilliary streams
57  * not muxed with the primary stream.
58  *
59  * * does not handle network stream buffering. decodebin3 expects that network stream
60  * buffering is handled upstream, before data is passed to it.
61  *
62  * <emphasis>decodebin3 is still experimental API and a technology preview.
63  * Its behaviour and exposed API is subject to change.</emphasis>
64  *
65  */
66 
67 /**
68  * Global design
69  *
70  * 1) From sink pad to elementary streams (GstParseBin)
71  *
72  * The input sink pads are fed to GstParseBin. GstParseBin will feed them
73  * through typefind. When the caps are detected (or changed) we recursively
74  * figure out which demuxer, parser or depayloader is needed until we get to
75  * elementary streams.
76  *
77  * All elementary streams (whether decoded or not, whether exposed or not) are
78  * fed through multiqueue. There is only *one* multiqueue in decodebin3.
79  *
80  * => MultiQueue is the cornerstone.
81  * => No buffering before multiqueue
82  *
83  * 2) Elementary streams
84  *
85  * After GstParseBin, there are 3 main components:
86  *  1) Input Streams (provided by GstParseBin)
87  *  2) Multiqueue slots
88  *  3) Output Streams
89  *
90  * Input Streams correspond to the stream coming from GstParseBin and that gets
91  * fed into a multiqueue slot.
92  *
93  * Output Streams correspond to the combination of a (optional) decoder and an
94  * output ghostpad. Output Streams can be moved from one multiqueue slot to
95  * another, can reconfigure itself (different decoders), and can be
96  * added/removed depending on the configuration (all streams outputted, only one
97  * of each type, ...).
98  *
99  * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
100  * each 'active' Input Stream there is a corresponding slot.
101  * Slots might have different streams on input and output (due to internal
102  * buffering).
103  *
104  * Due to internal queuing/buffering/..., all those components (might) behave
105  * asynchronously. Therefore probes will be used on each component source pad to
106  * detect various key-points:
107  *  * EOS :
108  *     the stream is done => Mark that component as done, optionally freeing/removing it
109  *  * STREAM_START :
110  *     a new stream is starting => link it further if needed
111  *
112  * 3) Gradual replacement
113  *
114  * If the caps change at any point in decodebin (input sink pad, demuxer output,
115  * multiqueue output, ..), we gradually replace (if needed) the following elements.
116  *
117  * This is handled by the probes in various locations:
118  *  a) typefind output
119  *  b) multiqueue input (source pad of Input Streams)
120  *  c) multiqueue output (source pad of Multiqueue Slots)
121  *  d) final output (target of source ghostpads)
122  *
123  * When CAPS event arrive at those points, one of three things can happen:
124  * a) There is no elements downstream yet, just create/link-to following elements
125  * b) There are downstream elements, do a ACCEPT_CAPS query
126  *  b.1) The new CAPS are accepted, keep current configuration
127  *  b.2) The new CAPS are not accepted, remove following elements then do a)
128  *
129  *    Components:
130  *
131  *                                                   MultiQ     Output
132  *                     Input(s)                      Slots      Streams
133  *  /-------------------------------------------\   /-----\  /------------- \
134  *
135  * +-------------------------------------------------------------------------+
136  * |                                                                         |
137  * | +---------------------------------------------+                         |
138  * | |   GstParseBin(s)                            |                         |
139  * | |                +--------------+             |  +-----+                |
140  * | |                |              |---[parser]-[|--| Mul |---[ decoder ]-[|
141  * |]--[ typefind ]---|  demuxer(s)  |------------[|  | ti  |                |
142  * | |                |  (if needed) |---[parser]-[|--| qu  |                |
143  * | |                |              |---[parser]-[|--| eu  |---[ decoder ]-[|
144  * | |                +--------------+             |  +------             ^  |
145  * | +---------------------------------------------+        ^             |  |
146  * |                                               ^        |             |  |
147  * +-----------------------------------------------+--------+-------------+--+
148  *                                                 |        |             |
149  *                                                 |        |             |
150  *                                       Probes  --/--------/-------------/
151  *
152  * ATOMIC SWITCHING
153  *
154  * We want to ensure we re-use decoders when switching streams. This takes place
155  * at the multiqueue output level.
156  *
157  * MAIN CONCEPTS
158  *  1) Activating a stream (i.e. linking a slot to an output) is only done within
159  *    the streaming thread in the multiqueue_src_probe() and only if the
160       stream is in the REQUESTED selection.
161  *  2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
162  *    within the stream thread, but only in a purposefully called IDLE probe
163  *    that calls reassign_slot().
164  *
165  * Based on those two principles, 3 "selection" of streams (stream-id) are used:
166  * 1) requested_selection
167  *    All streams within that list should be activated
168  * 2) active_selection
169  *    List of streams that are exposed by decodebin
170  * 3) to_activate
171  *    List of streams that will be moved to requested_selection in the
172  *    reassign_slot() method (i.e. once a stream was deactivated, and the output
173  *    was retargetted)
174  */
175 
176 
177 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
178 #define GST_CAT_DEFAULT decodebin3_debug
179 
180 #define GST_TYPE_DECODEBIN3	 (gst_decodebin3_get_type ())
181 
182 #define EXTRA_DEBUG 1
183 
184 #define CUSTOM_FINAL_EOS_QUARK _custom_final_eos_quark_get ()
185 #define CUSTOM_FINAL_EOS_QUARK_DATA "custom-final-eos"
186 static GQuark
_custom_final_eos_quark_get(void)187 _custom_final_eos_quark_get (void)
188 {
189   static gsize g_quark;
190 
191   if (g_once_init_enter (&g_quark)) {
192     gsize quark =
193         (gsize) g_quark_from_static_string ("decodebin3-custom-final-eos");
194     g_once_init_leave (&g_quark, quark);
195   }
196   return g_quark;
197 }
198 
199 typedef struct _GstDecodebin3 GstDecodebin3;
200 typedef struct _GstDecodebin3Class GstDecodebin3Class;
201 
202 typedef struct _DecodebinInputStream DecodebinInputStream;
203 typedef struct _DecodebinInput DecodebinInput;
204 typedef struct _DecodebinOutputStream DecodebinOutputStream;
205 
206 struct _GstDecodebin3
207 {
208   GstBin bin;
209 
210   /* input_lock protects the following variables */
211   GMutex input_lock;
212   /* Main input (static sink pad) */
213   DecodebinInput *main_input;
214   /* Supplementary input (request sink pads) */
215   GList *other_inputs;
216   /* counter for input */
217   guint32 input_counter;
218   /* Current stream group_id (default : GST_GROUP_ID_INVALID) */
219   /* FIXME : Needs to be resetted appropriately (when upstream changes ?) */
220   guint32 current_group_id;
221   /* End of variables protected by input_lock */
222 
223   GstElement *multiqueue;
224 
225   /* selection_lock protects access to following variables */
226   GMutex selection_lock;
227   GList *input_streams;         /* List of DecodebinInputStream for active collection */
228   GList *output_streams;        /* List of DecodebinOutputStream used for output */
229   GList *slots;                 /* List of MultiQueueSlot */
230   guint slot_id;
231 
232   /* Active collection */
233   GstStreamCollection *collection;
234   /* requested selection of stream-id to activate post-multiqueue */
235   GList *requested_selection;
236   /* list of stream-id currently activated in output */
237   GList *active_selection;
238   /* List of stream-id that need to be activated (after a stream switch for ex) */
239   GList *to_activate;
240   /* Pending select streams event */
241   guint32 select_streams_seqnum;
242   /* pending list of streams to select (from downstream) */
243   GList *pending_select_streams;
244   /* TRUE if requested_selection was updated, will become FALSE once
245    * it has fully transitioned to active */
246   gboolean selection_updated;
247   /* End of variables protected by selection_lock */
248 
249   /* List of pending collections.
250    * FIXME : Is this really needed ? */
251   GList *pending_collection;
252 
253   /* Factories */
254   GMutex factories_lock;
255   guint32 factories_cookie;
256   /* All DECODABLE factories */
257   GList *factories;
258   /* Only DECODER factories */
259   GList *decoder_factories;
260   /* DECODABLE but not DECODER factories */
261   GList *decodable_factories;
262 
263   /* counters for pads */
264   guint32 apadcount, vpadcount, tpadcount, opadcount;
265 
266   /* Properties */
267   GstCaps *caps;
268 };
269 
270 struct _GstDecodebin3Class
271 {
272   GstBinClass class;
273 
274     gint (*select_stream) (GstDecodebin3 * dbin,
275       GstStreamCollection * collection, GstStream * stream);
276 };
277 
278 /* Input of decodebin, controls input pad and parsebin */
279 struct _DecodebinInput
280 {
281   GstDecodebin3 *dbin;
282 
283   gboolean is_main;
284 
285   GstPad *ghost_sink;
286   GstPad *parsebin_sink;
287 
288   GstStreamCollection *collection;      /* Active collection */
289 
290   guint group_id;
291 
292   GstElement *parsebin;
293 
294   gulong pad_added_sigid;
295   gulong pad_removed_sigid;
296   gulong drained_sigid;
297 
298   /* TRUE if the input got drained
299    * FIXME : When do we reset it if re-used ?
300    */
301   gboolean drained;
302 
303   /* HACK : Remove these fields */
304   /* List of PendingPad structures */
305   GList *pending_pads;
306 };
307 
308 /* Multiqueue Slots */
309 typedef struct _MultiQueueSlot
310 {
311   guint id;
312 
313   GstDecodebin3 *dbin;
314   /* Type of stream handled by this slot */
315   GstStreamType type;
316 
317   /* Linked input and output */
318   DecodebinInputStream *input;
319 
320   /* pending => last stream received on sink pad */
321   GstStream *pending_stream;
322   /* active => last stream outputted on source pad */
323   GstStream *active_stream;
324 
325   GstPad *sink_pad, *src_pad;
326 
327   /* id of the MQ src_pad event probe */
328   gulong probe_id;
329 
330   gboolean is_drained;
331 
332   DecodebinOutputStream *output;
333 } MultiQueueSlot;
334 
335 /* Streams that are exposed downstream (i.e. output) */
336 struct _DecodebinOutputStream
337 {
338   GstDecodebin3 *dbin;
339   /* The type of stream handled by this output stream */
340   GstStreamType type;
341 
342   /* The slot to which this output stream is currently connected to */
343   MultiQueueSlot *slot;
344 
345   GstElement *decoder;          /* Optional */
346   GstPad *decoder_sink, *decoder_src;
347   gboolean linked;
348 
349   /* ghostpad */
350   GstPad *src_pad;
351   /* Flag if ghost pad is exposed */
352   gboolean src_exposed;
353 
354   /* keyframe dropping probe */
355   gulong drop_probe_id;
356 };
357 
358 /* Pending pads from parsebin */
359 typedef struct _PendingPad
360 {
361   GstDecodebin3 *dbin;
362   DecodebinInput *input;
363   GstPad *pad;
364 
365   gulong buffer_probe;
366   gulong event_probe;
367   gboolean saw_eos;
368 } PendingPad;
369 
370 /* properties */
371 enum
372 {
373   PROP_0,
374   PROP_CAPS
375 };
376 
377 /* signals */
378 enum
379 {
380   SIGNAL_SELECT_STREAM,
381   SIGNAL_ABOUT_TO_FINISH,
382   LAST_SIGNAL
383 };
384 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
385 
386 #define SELECTION_LOCK(dbin) G_STMT_START {				\
387     GST_LOG_OBJECT (dbin,						\
388 		    "selection locking from thread %p",			\
389 		    g_thread_self ());					\
390     g_mutex_lock (&dbin->selection_lock);				\
391     GST_LOG_OBJECT (dbin,						\
392 		    "selection locked from thread %p",			\
393 		    g_thread_self ());					\
394   } G_STMT_END
395 
396 #define SELECTION_UNLOCK(dbin) G_STMT_START {				\
397     GST_LOG_OBJECT (dbin,						\
398 		    "selection unlocking from thread %p",		\
399 		    g_thread_self ());					\
400     g_mutex_unlock (&dbin->selection_lock);				\
401   } G_STMT_END
402 
403 #define INPUT_LOCK(dbin) G_STMT_START {				\
404     GST_LOG_OBJECT (dbin,						\
405 		    "input locking from thread %p",			\
406 		    g_thread_self ());					\
407     g_mutex_lock (&dbin->input_lock);				\
408     GST_LOG_OBJECT (dbin,						\
409 		    "input locked from thread %p",			\
410 		    g_thread_self ());					\
411   } G_STMT_END
412 
413 #define INPUT_UNLOCK(dbin) G_STMT_START {				\
414     GST_LOG_OBJECT (dbin,						\
415 		    "input unlocking from thread %p",		\
416 		    g_thread_self ());					\
417     g_mutex_unlock (&dbin->input_lock);				\
418   } G_STMT_END
419 
420 GType gst_decodebin3_get_type (void);
421 #define gst_decodebin3_parent_class parent_class
422 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
423 
424 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
425 
426 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
427     GST_PAD_SINK,
428     GST_PAD_ALWAYS,
429     GST_STATIC_CAPS_ANY);
430 
431 static GstStaticPadTemplate request_sink_template =
432 GST_STATIC_PAD_TEMPLATE ("sink_%u",
433     GST_PAD_SINK,
434     GST_PAD_REQUEST,
435     GST_STATIC_CAPS_ANY);
436 
437 static GstStaticPadTemplate video_src_template =
438 GST_STATIC_PAD_TEMPLATE ("video_%u",
439     GST_PAD_SRC,
440     GST_PAD_SOMETIMES,
441     GST_STATIC_CAPS_ANY);
442 
443 static GstStaticPadTemplate audio_src_template =
444 GST_STATIC_PAD_TEMPLATE ("audio_%u",
445     GST_PAD_SRC,
446     GST_PAD_SOMETIMES,
447     GST_STATIC_CAPS_ANY);
448 
449 static GstStaticPadTemplate text_src_template =
450 GST_STATIC_PAD_TEMPLATE ("text_%u",
451     GST_PAD_SRC,
452     GST_PAD_SOMETIMES,
453     GST_STATIC_CAPS_ANY);
454 
455 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
456     GST_PAD_SRC,
457     GST_PAD_SOMETIMES,
458     GST_STATIC_CAPS_ANY);
459 
460 
461 static void gst_decodebin3_dispose (GObject * object);
462 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
463     const GValue * value, GParamSpec * pspec);
464 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
465     GValue * value, GParamSpec * pspec);
466 
467 static gboolean parsebin_autoplug_continue_cb (GstElement *
468     parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
469 
470 static gint
gst_decodebin3_select_stream(GstDecodebin3 * dbin,GstStreamCollection * collection,GstStream * stream)471 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
472     GstStreamCollection * collection, GstStream * stream)
473 {
474   GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
475 
476   return -1;
477 }
478 
479 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
480     GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
481 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
482 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
483     GstStateChange transition);
484 static gboolean gst_decodebin3_send_event (GstElement * element,
485     GstEvent * event);
486 
487 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
488 #if 0
489 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
490     GstElementFactoryListType ftype);
491 #endif
492 
493 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
494 static void free_input_async (GstDecodebin3 * dbin, DecodebinInput * input);
495 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
496 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
497 
498 static void reconfigure_output_stream (DecodebinOutputStream * output,
499     MultiQueueSlot * slot);
500 static void free_output_stream (GstDecodebin3 * dbin,
501     DecodebinOutputStream * output);
502 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
503     GstStreamType type);
504 
505 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
506     GstPadProbeInfo * info, MultiQueueSlot * slot);
507 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
508 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
509     DecodebinInputStream * input);
510 static void link_input_to_slot (DecodebinInputStream * input,
511     MultiQueueSlot * slot);
512 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
513 static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
514     MultiQueueSlot * slot);
515 
516 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
517 static void update_requested_selection (GstDecodebin3 * dbin);
518 
519 /* FIXME: Really make all the parser stuff a self-contained helper object */
520 #include "gstdecodebin3-parse.c"
521 
522 static gboolean
_gst_int_accumulator(GSignalInvocationHint * ihint,GValue * return_accu,const GValue * handler_return,gpointer dummy)523 _gst_int_accumulator (GSignalInvocationHint * ihint,
524     GValue * return_accu, const GValue * handler_return, gpointer dummy)
525 {
526   gint res = g_value_get_int (handler_return);
527 
528   if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
529     g_value_set_int (return_accu, res);
530 
531   if (res == -1)
532     return TRUE;
533 
534   return FALSE;
535 }
536 
537 static void
gst_decodebin3_class_init(GstDecodebin3Class * klass)538 gst_decodebin3_class_init (GstDecodebin3Class * klass)
539 {
540   GObjectClass *gobject_klass = (GObjectClass *) klass;
541   GstElementClass *element_class = (GstElementClass *) klass;
542   GstBinClass *bin_klass = (GstBinClass *) klass;
543 
544   gobject_klass->dispose = gst_decodebin3_dispose;
545   gobject_klass->set_property = gst_decodebin3_set_property;
546   gobject_klass->get_property = gst_decodebin3_get_property;
547 
548   /* FIXME : ADD PROPERTIES ! */
549   g_object_class_install_property (gobject_klass, PROP_CAPS,
550       g_param_spec_boxed ("caps", "Caps",
551           "The caps on which to stop decoding. (NULL = default)",
552           GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
553 
554   /* FIXME : ADD SIGNALS ! */
555   /**
556    * GstDecodebin3::select-stream
557    * @decodebin: a #GstDecodebin3
558    * @collection: a #GstStreamCollection
559    * @stream: a #GstStream
560    *
561    * This signal is emitted whenever @decodebin needs to decide whether
562    * to expose a @stream of a given @collection.
563    *
564    * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
565    * A value of -1 (default) lets @decodebin decide what to do with the stream.
566    * */
567   gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
568       g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
569       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
570       _gst_int_accumulator, NULL, g_cclosure_marshal_generic,
571       G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
572 
573   /**
574    * GstDecodebin3::about-to-finish:
575    *
576    * This signal is emitted when the data for the selected URI is
577    * entirely buffered and it is safe to specify anothe URI.
578    */
579   gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH] =
580       g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
581       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE,
582       0, G_TYPE_NONE);
583 
584 
585   element_class->request_new_pad =
586       GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
587   element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
588   element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
589 
590   gst_element_class_add_pad_template (element_class,
591       gst_static_pad_template_get (&sink_template));
592   gst_element_class_add_pad_template (element_class,
593       gst_static_pad_template_get (&request_sink_template));
594   gst_element_class_add_pad_template (element_class,
595       gst_static_pad_template_get (&video_src_template));
596   gst_element_class_add_pad_template (element_class,
597       gst_static_pad_template_get (&audio_src_template));
598   gst_element_class_add_pad_template (element_class,
599       gst_static_pad_template_get (&text_src_template));
600   gst_element_class_add_pad_template (element_class,
601       gst_static_pad_template_get (&src_template));
602 
603   gst_element_class_set_static_metadata (element_class,
604       "Decoder Bin 3", "Generic/Bin/Decoder",
605       "Autoplug and decode to raw media",
606       "Edward Hervey <edward@centricular.com>");
607 
608   bin_klass->handle_message = gst_decodebin3_handle_message;
609 
610   klass->select_stream = gst_decodebin3_select_stream;
611 }
612 
613 static void
gst_decodebin3_init(GstDecodebin3 * dbin)614 gst_decodebin3_init (GstDecodebin3 * dbin)
615 {
616   /* Create main input */
617   dbin->main_input = create_new_input (dbin, TRUE);
618 
619   dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
620   g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
621       "max-size-buffers", 0, "use-interleave", TRUE, NULL);
622   gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
623 
624   dbin->current_group_id = GST_GROUP_ID_INVALID;
625 
626   g_mutex_init (&dbin->factories_lock);
627   g_mutex_init (&dbin->selection_lock);
628   g_mutex_init (&dbin->input_lock);
629 
630   dbin->caps = gst_static_caps_get (&default_raw_caps);
631 
632   GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE);
633 }
634 
635 static void
gst_decodebin3_dispose(GObject * object)636 gst_decodebin3_dispose (GObject * object)
637 {
638   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
639   GList *walk, *next;
640 
641   if (dbin->factories)
642     gst_plugin_feature_list_free (dbin->factories);
643   if (dbin->decoder_factories)
644     g_list_free (dbin->decoder_factories);
645   if (dbin->decodable_factories)
646     g_list_free (dbin->decodable_factories);
647   g_list_free_full (dbin->requested_selection, g_free);
648   g_list_free (dbin->active_selection);
649   g_list_free (dbin->to_activate);
650   g_list_free (dbin->pending_select_streams);
651   g_clear_object (&dbin->collection);
652 
653   free_input (dbin, dbin->main_input);
654 
655   for (walk = dbin->other_inputs; walk; walk = next) {
656     DecodebinInput *input = walk->data;
657 
658     next = g_list_next (walk);
659 
660     free_input (dbin, input);
661     dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk);
662   }
663 
664   G_OBJECT_CLASS (parent_class)->dispose (object);
665 }
666 
667 static void
gst_decodebin3_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)668 gst_decodebin3_set_property (GObject * object, guint prop_id,
669     const GValue * value, GParamSpec * pspec)
670 {
671   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
672 
673   /* FIXME : IMPLEMENT */
674   switch (prop_id) {
675     case PROP_CAPS:
676       GST_OBJECT_LOCK (dbin);
677       if (dbin->caps)
678         gst_caps_unref (dbin->caps);
679       dbin->caps = g_value_dup_boxed (value);
680       GST_OBJECT_UNLOCK (dbin);
681       break;
682     default:
683       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
684       break;
685   }
686 }
687 
688 static void
gst_decodebin3_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)689 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
690     GParamSpec * pspec)
691 {
692   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
693 
694   /* FIXME : IMPLEMENT */
695   switch (prop_id) {
696     case PROP_CAPS:
697       GST_OBJECT_LOCK (dbin);
698       g_value_set_boxed (value, dbin->caps);
699       GST_OBJECT_UNLOCK (dbin);
700       break;
701     default:
702       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
703       break;
704   }
705 }
706 
707 static gboolean
parsebin_autoplug_continue_cb(GstElement * parsebin,GstPad * pad,GstCaps * caps,GstDecodebin3 * dbin)708 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
709     GstCaps * caps, GstDecodebin3 * dbin)
710 {
711   GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
712 
713   /* If it matches our target caps, expose it */
714   if (gst_caps_can_intersect (caps, dbin->caps))
715     return FALSE;
716 
717   return TRUE;
718 }
719 
720 /* This method should be called whenever a STREAM_START event
721  * comes out of a given parsebin.
722  * The caller shall replace the group_id if the function returns TRUE */
723 static gboolean
set_input_group_id(DecodebinInput * input,guint32 * group_id)724 set_input_group_id (DecodebinInput * input, guint32 * group_id)
725 {
726   GstDecodebin3 *dbin = input->dbin;
727 
728   if (input->group_id != *group_id) {
729     if (input->group_id != GST_GROUP_ID_INVALID)
730       GST_WARNING_OBJECT (dbin,
731           "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
732           ") on input %p ", input->group_id, *group_id, input);
733     input->group_id = *group_id;
734   }
735 
736   if (*group_id != dbin->current_group_id) {
737     if (dbin->current_group_id == GST_GROUP_ID_INVALID) {
738       GST_DEBUG_OBJECT (dbin, "Setting current group id to %" G_GUINT32_FORMAT,
739           *group_id);
740       dbin->current_group_id = *group_id;
741     }
742     *group_id = dbin->current_group_id;
743     return TRUE;
744   }
745 
746   return FALSE;
747 }
748 
749 static void
parsebin_drained_cb(GstElement * parsebin,DecodebinInput * input)750 parsebin_drained_cb (GstElement * parsebin, DecodebinInput * input)
751 {
752   GstDecodebin3 *dbin = input->dbin;
753   gboolean all_drained;
754   GList *tmp;
755 
756   GST_WARNING_OBJECT (dbin, "input %p drained", input);
757   input->drained = TRUE;
758 
759   all_drained = dbin->main_input->drained;
760   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
761     DecodebinInput *data = (DecodebinInput *) tmp->data;
762 
763     all_drained &= data->drained;
764   }
765 
766   if (all_drained) {
767     GST_WARNING_OBJECT (dbin, "All inputs drained. Posting about-to-finish");
768     g_signal_emit (dbin, gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH], 0,
769         NULL);
770   }
771 }
772 
773 /* Call with INPUT_LOCK taken */
774 static gboolean
ensure_input_parsebin(GstDecodebin3 * dbin,DecodebinInput * input)775 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
776 {
777   gboolean set_state = FALSE;
778 
779   if (input->parsebin == NULL) {
780     input->parsebin = gst_element_factory_make ("parsebin", NULL);
781     if (input->parsebin == NULL)
782       goto no_parsebin;
783     input->parsebin = gst_object_ref (input->parsebin);
784     input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
785     input->pad_added_sigid =
786         g_signal_connect (input->parsebin, "pad-added",
787         (GCallback) parsebin_pad_added_cb, input);
788     input->pad_removed_sigid =
789         g_signal_connect (input->parsebin, "pad-removed",
790         (GCallback) parsebin_pad_removed_cb, input);
791     input->drained_sigid =
792         g_signal_connect (input->parsebin, "drained",
793         (GCallback) parsebin_drained_cb, input);
794     g_signal_connect (input->parsebin, "autoplug-continue",
795         (GCallback) parsebin_autoplug_continue_cb, dbin);
796   }
797 
798   if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
799     gst_bin_add (GST_BIN (dbin), input->parsebin);
800     set_state = TRUE;
801   }
802 
803   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
804       input->parsebin_sink);
805   if (set_state)
806     gst_element_sync_state_with_parent (input->parsebin);
807 
808   return TRUE;
809 
810   /* ERRORS */
811 no_parsebin:
812   {
813     gst_element_post_message ((GstElement *) dbin,
814         gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
815     return FALSE;
816   }
817 }
818 
819 static GstPadLinkReturn
gst_decodebin3_input_pad_link(GstPad * pad,GstObject * parent,GstPad * peer)820 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
821 {
822   GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
823   GstPadLinkReturn res = GST_PAD_LINK_OK;
824   DecodebinInput *input;
825 
826   GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT
827       ". Creating parsebin if needed", pad);
828 
829   if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
830     goto fail;
831 
832   INPUT_LOCK (dbin);
833   if (!ensure_input_parsebin (dbin, input))
834     res = GST_PAD_LINK_REFUSED;
835   INPUT_UNLOCK (dbin);
836 
837   return res;
838 fail:
839   GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
840   return GST_PAD_LINK_REFUSED;
841 }
842 
843 /* Drop duration query during _input_pad_unlink */
844 static GstPadProbeReturn
query_duration_drop_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinInput * input)845 query_duration_drop_probe (GstPad * pad, GstPadProbeInfo * info,
846     DecodebinInput * input)
847 {
848   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
849 
850   if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
851     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
852     if (GST_QUERY_TYPE (query) == GST_QUERY_DURATION) {
853       GST_LOG_OBJECT (pad, "stop forwarding query duration");
854       ret = GST_PAD_PROBE_HANDLED;
855     }
856   }
857 
858   return ret;
859 }
860 
861 static void
gst_decodebin3_input_pad_unlink(GstPad * pad,GstObject * parent)862 gst_decodebin3_input_pad_unlink (GstPad * pad, GstObject * parent)
863 {
864   GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
865   DecodebinInput *input;
866 
867   GST_LOG_OBJECT (parent, "Got unlink on input pad %" GST_PTR_FORMAT
868       ". Removing parsebin.", pad);
869 
870   if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
871     goto fail;
872 
873   INPUT_LOCK (dbin);
874   if (input->parsebin == NULL) {
875     INPUT_UNLOCK (dbin);
876     return;
877   }
878 
879   if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) == GST_OBJECT (dbin)) {
880     GstStreamCollection *collection = NULL;
881     gulong probe_id = gst_pad_add_probe (input->parsebin_sink,
882         GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
883         (GstPadProbeCallback) query_duration_drop_probe, input, NULL);
884 
885     /* Clear stream-collection corresponding to current INPUT and post new
886      * stream-collection message, if needed */
887     if (input->collection) {
888       gst_object_unref (input->collection);
889       input->collection = NULL;
890     }
891 
892     SELECTION_LOCK (dbin);
893     collection = get_merged_collection (dbin);
894     if (collection && collection != dbin->collection) {
895       GstMessage *msg;
896       GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
897 
898       if (dbin->collection)
899         gst_object_unref (dbin->collection);
900       dbin->collection = collection;
901 
902       msg =
903           gst_message_new_stream_collection ((GstObject *) dbin,
904           dbin->collection);
905 
906       SELECTION_UNLOCK (dbin);
907       gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
908       update_requested_selection (dbin);
909     } else
910       SELECTION_UNLOCK (dbin);
911 
912     gst_bin_remove (GST_BIN (dbin), input->parsebin);
913     gst_element_set_state (input->parsebin, GST_STATE_NULL);
914     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
915     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
916     g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
917     gst_pad_remove_probe (input->parsebin_sink, probe_id);
918     gst_object_unref (input->parsebin);
919     gst_object_unref (input->parsebin_sink);
920 
921     input->parsebin = NULL;
922     input->parsebin_sink = NULL;
923 
924     if (!input->is_main) {
925       dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
926       free_input_async (dbin, input);
927     }
928   }
929   INPUT_UNLOCK (dbin);
930   return;
931 
932 fail:
933   GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
934   return;
935 }
936 
937 static void
free_input(GstDecodebin3 * dbin,DecodebinInput * input)938 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
939 {
940   GST_DEBUG ("Freeing input %p", input);
941   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
942   gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
943   if (input->parsebin) {
944     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
945     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
946     g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
947     gst_element_set_state (input->parsebin, GST_STATE_NULL);
948     gst_object_unref (input->parsebin);
949     gst_object_unref (input->parsebin_sink);
950   }
951   if (input->collection)
952     gst_object_unref (input->collection);
953   g_free (input);
954 }
955 
956 static void
free_input_async(GstDecodebin3 * dbin,DecodebinInput * input)957 free_input_async (GstDecodebin3 * dbin, DecodebinInput * input)
958 {
959   GST_LOG_OBJECT (dbin, "pushing input %p on thread pool to free", input);
960   gst_element_call_async (GST_ELEMENT_CAST (dbin),
961       (GstElementCallAsyncFunc) free_input, input, NULL);
962 }
963 
964 /* Call with INPUT_LOCK taken */
965 static DecodebinInput *
create_new_input(GstDecodebin3 * dbin,gboolean main)966 create_new_input (GstDecodebin3 * dbin, gboolean main)
967 {
968   DecodebinInput *input;
969 
970   input = g_new0 (DecodebinInput, 1);
971   input->dbin = dbin;
972   input->is_main = main;
973   input->group_id = GST_GROUP_ID_INVALID;
974   if (main)
975     input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
976   else {
977     gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
978     input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
979     g_free (pad_name);
980   }
981   g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
982   gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
983   gst_pad_set_unlink_function (input->ghost_sink,
984       gst_decodebin3_input_pad_unlink);
985 
986   gst_pad_set_active (input->ghost_sink, TRUE);
987   gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
988 
989   return input;
990 
991 }
992 
993 static GstPad *
gst_decodebin3_request_new_pad(GstElement * element,GstPadTemplate * temp,const gchar * name,const GstCaps * caps)994 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
995     const gchar * name, const GstCaps * caps)
996 {
997   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
998   DecodebinInput *input;
999   GstPad *res = NULL;
1000 
1001   /* We are ignoring names for the time being, not sure it makes any sense
1002    * within the context of decodebin3 ... */
1003   input = create_new_input (dbin, FALSE);
1004   if (input) {
1005     INPUT_LOCK (dbin);
1006     dbin->other_inputs = g_list_append (dbin->other_inputs, input);
1007     res = input->ghost_sink;
1008     INPUT_UNLOCK (dbin);
1009   }
1010 
1011   return res;
1012 }
1013 
1014 /* Must be called with factories lock! */
1015 static void
gst_decode_bin_update_factories_list(GstDecodebin3 * dbin)1016 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
1017 {
1018   guint cookie;
1019 
1020   cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
1021   if (!dbin->factories || dbin->factories_cookie != cookie) {
1022     GList *tmp;
1023     if (dbin->factories)
1024       gst_plugin_feature_list_free (dbin->factories);
1025     if (dbin->decoder_factories)
1026       g_list_free (dbin->decoder_factories);
1027     if (dbin->decodable_factories)
1028       g_list_free (dbin->decodable_factories);
1029     dbin->factories =
1030         gst_element_factory_list_get_elements
1031         (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
1032     dbin->factories =
1033         g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
1034     dbin->factories_cookie = cookie;
1035 
1036     /* Filter decoder and other decodables */
1037     dbin->decoder_factories = NULL;
1038     dbin->decodable_factories = NULL;
1039     for (tmp = dbin->factories; tmp; tmp = tmp->next) {
1040       GstElementFactory *fact = (GstElementFactory *) tmp->data;
1041       if (gst_element_factory_list_is_type (fact,
1042               GST_ELEMENT_FACTORY_TYPE_DECODER))
1043         dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
1044       else
1045         dbin->decodable_factories =
1046             g_list_append (dbin->decodable_factories, fact);
1047     }
1048   }
1049 }
1050 
1051 /* Must be called with appropriate lock if list is a protected variable */
1052 static const gchar *
stream_in_list(GList * list,const gchar * sid)1053 stream_in_list (GList * list, const gchar * sid)
1054 {
1055   GList *tmp;
1056 
1057 #if EXTRA_DEBUG
1058   for (tmp = list; tmp; tmp = tmp->next) {
1059     gchar *osid = (gchar *) tmp->data;
1060     GST_DEBUG ("Checking %s against %s", sid, osid);
1061   }
1062 #endif
1063 
1064   for (tmp = list; tmp; tmp = tmp->next) {
1065     const gchar *osid = (gchar *) tmp->data;
1066     if (!g_strcmp0 (sid, osid))
1067       return osid;
1068   }
1069 
1070   return NULL;
1071 }
1072 
1073 static void
update_requested_selection(GstDecodebin3 * dbin)1074 update_requested_selection (GstDecodebin3 * dbin)
1075 {
1076   guint i, nb;
1077   GList *tmp = NULL;
1078   GstStreamType used_types = 0;
1079   GstStreamCollection *collection;
1080 
1081   /* 1. Is there a pending SELECT_STREAMS we can return straight away since
1082    *  the switch handler will take care of the pending selection */
1083   SELECTION_LOCK (dbin);
1084   if (dbin->pending_select_streams) {
1085     GST_DEBUG_OBJECT (dbin,
1086         "No need to create pending selection, SELECT_STREAMS underway");
1087     goto beach;
1088   }
1089 
1090   collection = dbin->collection;
1091   if (G_UNLIKELY (collection == NULL)) {
1092     GST_DEBUG_OBJECT (dbin, "No current GstStreamCollection");
1093     goto beach;
1094   }
1095   nb = gst_stream_collection_get_size (collection);
1096 
1097   /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
1098   GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
1099 
1100   /* 3. If not, check if we already have some of the streams in the
1101    * existing active/requested selection */
1102   for (i = 0; i < nb; i++) {
1103     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1104     const gchar *sid = gst_stream_get_stream_id (stream);
1105     gint request = -1;
1106     /* Fire select-stream signal to see if outside components want to
1107      * hint at which streams should be selected */
1108     g_signal_emit (G_OBJECT (dbin),
1109         gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
1110         &request);
1111     GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
1112     if (request == 1 || (request == -1
1113             && (stream_in_list (dbin->requested_selection, sid)
1114                 || stream_in_list (dbin->active_selection, sid)))) {
1115       GstStreamType curtype = gst_stream_get_stream_type (stream);
1116       if (request == 1)
1117         GST_DEBUG_OBJECT (dbin,
1118             "Using stream requested by 'select-stream' signal : %s", sid);
1119       else
1120         GST_DEBUG_OBJECT (dbin,
1121             "Re-using stream already present in requested or active selection : %s",
1122             sid);
1123       tmp = g_list_append (tmp, (gchar *) sid);
1124       used_types |= curtype;
1125     }
1126   }
1127 
1128   /* 4. If not, match one stream of each type */
1129   for (i = 0; i < nb; i++) {
1130     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1131     GstStreamType curtype = gst_stream_get_stream_type (stream);
1132     if (!(used_types & curtype)) {
1133       const gchar *sid = gst_stream_get_stream_id (stream);
1134       GST_DEBUG_OBJECT (dbin, "Selecting stream '%s' of type %s",
1135           sid, gst_stream_type_get_name (curtype));
1136       tmp = g_list_append (tmp, (gchar *) sid);
1137       used_types |= curtype;
1138     }
1139   }
1140 
1141 beach:
1142   /* Finally set the requested selection */
1143   if (tmp) {
1144     if (dbin->requested_selection) {
1145       GST_FIXME_OBJECT (dbin,
1146           "Replacing non-NULL requested_selection, what should we do ??");
1147       g_list_free_full (dbin->requested_selection, g_free);
1148     }
1149     dbin->requested_selection =
1150         g_list_copy_deep (tmp, (GCopyFunc) g_strdup, NULL);
1151     dbin->selection_updated = TRUE;
1152     g_list_free (tmp);
1153   }
1154   SELECTION_UNLOCK (dbin);
1155 }
1156 
1157 /* sort_streams:
1158  * GCompareFunc to use with lists of GstStream.
1159  * Sorts GstStreams by stream type and SELECT flag and stream-id
1160  * First video, then audio, then others.
1161  *
1162  * Return: negative if a<b, 0 if a==b, positive if a>b
1163  */
1164 static gint
sort_streams(GstStream * sa,GstStream * sb)1165 sort_streams (GstStream * sa, GstStream * sb)
1166 {
1167   GstStreamType typea, typeb;
1168   GstStreamFlags flaga, flagb;
1169   const gchar *ida, *idb;
1170   gint ret = 0;
1171 
1172   typea = gst_stream_get_stream_type (sa);
1173   typeb = gst_stream_get_stream_type (sb);
1174 
1175   GST_LOG ("sa(%s), sb(%s)", gst_stream_get_stream_id (sa),
1176       gst_stream_get_stream_id (sb));
1177 
1178   /* Sort by stream type. First video, then audio, then others(text, container, unknown) */
1179   if (typea != typeb) {
1180     if (typea & GST_STREAM_TYPE_VIDEO)
1181       ret = -1;
1182     else if (typea & GST_STREAM_TYPE_AUDIO)
1183       ret = (!(typeb & GST_STREAM_TYPE_VIDEO)) ? -1 : 1;
1184     else if (typea & GST_STREAM_TYPE_TEXT)
1185       ret = (!(typeb & GST_STREAM_TYPE_VIDEO)
1186           && !(typeb & GST_STREAM_TYPE_AUDIO)) ? -1 : 1;
1187     else if (typea & GST_STREAM_TYPE_CONTAINER)
1188       ret = (typeb & GST_STREAM_TYPE_UNKNOWN) ? -1 : 1;
1189     else
1190       ret = 1;
1191 
1192     if (ret != 0) {
1193       GST_LOG ("Sort by stream-type: %d", ret);
1194       return ret;
1195     }
1196   }
1197 
1198   /* Sort by SELECT flag, if stream type is same. */
1199   flaga = gst_stream_get_stream_flags (sa);
1200   flagb = gst_stream_get_stream_flags (sb);
1201 
1202   ret =
1203       (flaga & GST_STREAM_FLAG_SELECT) ? ((flagb & GST_STREAM_FLAG_SELECT) ? 0 :
1204       -1) : ((flagb & GST_STREAM_FLAG_SELECT) ? 1 : 0);
1205 
1206   if (ret != 0) {
1207     GST_LOG ("Sort by SELECT flag: %d", ret);
1208     return ret;
1209   }
1210 
1211   /* Sort by stream-id, if otherwise the same. */
1212   ida = gst_stream_get_stream_id (sa);
1213   idb = gst_stream_get_stream_id (sb);
1214   ret = g_strcmp0 (ida, idb);
1215 
1216   GST_LOG ("Sort by stream-id: %d", ret);
1217 
1218   return ret;
1219 }
1220 
1221 /* Call with INPUT_LOCK taken */
1222 static GstStreamCollection *
get_merged_collection(GstDecodebin3 * dbin)1223 get_merged_collection (GstDecodebin3 * dbin)
1224 {
1225   gboolean needs_merge = FALSE;
1226   GstStreamCollection *res = NULL;
1227   GList *tmp;
1228   GList *unsorted_streams = NULL;
1229   guint i, nb_stream;
1230 
1231   /* First check if we need to do a merge or just return the only collection */
1232   res = dbin->main_input->collection;
1233 
1234   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1235     DecodebinInput *input = (DecodebinInput *) tmp->data;
1236     if (input->collection) {
1237       if (res) {
1238         needs_merge = TRUE;
1239         break;
1240       }
1241       res = input->collection;
1242     }
1243   }
1244 
1245   if (!needs_merge) {
1246     GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1247     return res ? gst_object_ref (res) : NULL;
1248   }
1249 
1250   /* We really need to create a new collection */
1251   /* FIXME : Some numbering scheme maybe ?? */
1252   res = gst_stream_collection_new ("decodebin3");
1253   if (dbin->main_input->collection) {
1254     nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1255     GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1256     for (i = 0; i < nb_stream; i++) {
1257       GstStream *stream =
1258           gst_stream_collection_get_stream (dbin->main_input->collection, i);
1259       unsorted_streams = g_list_append (unsorted_streams, stream);
1260     }
1261   }
1262 
1263   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1264     DecodebinInput *input = (DecodebinInput *) tmp->data;
1265     GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1266         input->collection);
1267     if (input->collection) {
1268       nb_stream = gst_stream_collection_get_size (input->collection);
1269       GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1270       for (i = 0; i < nb_stream; i++) {
1271         GstStream *stream =
1272             gst_stream_collection_get_stream (input->collection, i);
1273         unsorted_streams = g_list_append (unsorted_streams, stream);
1274       }
1275     }
1276   }
1277 
1278   /* re-order streams : video, then audio, then others */
1279   unsorted_streams =
1280       g_list_sort (unsorted_streams, (GCompareFunc) sort_streams);
1281   for (tmp = unsorted_streams; tmp; tmp = tmp->next) {
1282     GstStream *stream = (GstStream *) tmp->data;
1283     GST_DEBUG_OBJECT (dbin, "Adding #stream(%s) to collection",
1284         gst_stream_get_stream_id (stream));
1285     gst_stream_collection_add_stream (res, gst_object_ref (stream));
1286   }
1287 
1288   if (unsorted_streams)
1289     g_list_free (unsorted_streams);
1290 
1291   return res;
1292 }
1293 
1294 /* Call with INPUT_LOCK taken */
1295 static DecodebinInput *
find_message_parsebin(GstDecodebin3 * dbin,GstElement * child)1296 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1297 {
1298   DecodebinInput *input = NULL;
1299   GstElement *parent = gst_object_ref (child);
1300   GList *tmp;
1301 
1302   do {
1303     GstElement *next_parent;
1304 
1305     GST_DEBUG_OBJECT (dbin, "parent %s",
1306         parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1307 
1308     if (parent == dbin->main_input->parsebin) {
1309       input = dbin->main_input;
1310       break;
1311     }
1312     for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1313       DecodebinInput *cur = (DecodebinInput *) tmp->data;
1314       if (parent == cur->parsebin) {
1315         input = cur;
1316         break;
1317       }
1318     }
1319     next_parent = (GstElement *) gst_element_get_parent (parent);
1320     gst_object_unref (parent);
1321     parent = next_parent;
1322 
1323   } while (parent && parent != (GstElement *) dbin);
1324 
1325   if (parent)
1326     gst_object_unref (parent);
1327 
1328   return input;
1329 }
1330 
1331 static const gchar *
stream_in_collection(GstDecodebin3 * dbin,gchar * sid)1332 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1333 {
1334   guint i, len;
1335 
1336   if (dbin->collection == NULL)
1337     return NULL;
1338   len = gst_stream_collection_get_size (dbin->collection);
1339   for (i = 0; i < len; i++) {
1340     GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1341     const gchar *osid = gst_stream_get_stream_id (stream);
1342     if (!g_strcmp0 (sid, osid))
1343       return osid;
1344   }
1345 
1346   return NULL;
1347 }
1348 
1349 /* Call with INPUT_LOCK taken */
1350 static void
handle_stream_collection(GstDecodebin3 * dbin,GstStreamCollection * collection,GstElement * child)1351 handle_stream_collection (GstDecodebin3 * dbin,
1352     GstStreamCollection * collection, GstElement * child)
1353 {
1354 #ifndef GST_DISABLE_GST_DEBUG
1355   const gchar *upstream_id;
1356   guint i;
1357 #endif
1358   DecodebinInput *input = find_message_parsebin (dbin, child);
1359 
1360   if (!input) {
1361     GST_DEBUG_OBJECT (dbin,
1362         "Couldn't find corresponding input, most likely shutting down");
1363     return;
1364   }
1365 
1366   /* Replace collection in input */
1367   if (input->collection)
1368     gst_object_unref (input->collection);
1369   input->collection = gst_object_ref (collection);
1370   GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1371       input);
1372 
1373   /* Merge collection if needed */
1374   collection = get_merged_collection (dbin);
1375 
1376 #ifndef GST_DISABLE_GST_DEBUG
1377   /* Just some debugging */
1378   upstream_id = gst_stream_collection_get_upstream_id (collection);
1379   GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1380   GST_DEBUG ("From input %p", input);
1381   GST_DEBUG ("  %d streams", gst_stream_collection_get_size (collection));
1382   for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1383     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1384     GstTagList *taglist;
1385     GstCaps *caps;
1386 
1387     GST_DEBUG ("   Stream '%s'", gst_stream_get_stream_id (stream));
1388     GST_DEBUG ("     type  : %s",
1389         gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1390     GST_DEBUG ("     flags : 0x%x", gst_stream_get_stream_flags (stream));
1391     taglist = gst_stream_get_tags (stream);
1392     GST_DEBUG ("     tags  : %" GST_PTR_FORMAT, taglist);
1393     caps = gst_stream_get_caps (stream);
1394     GST_DEBUG ("     caps  : %" GST_PTR_FORMAT, caps);
1395     if (taglist)
1396       gst_tag_list_unref (taglist);
1397     if (caps)
1398       gst_caps_unref (caps);
1399   }
1400 #endif
1401 
1402   /* Store collection for later usage */
1403   SELECTION_LOCK (dbin);
1404   if (dbin->collection == NULL) {
1405     dbin->collection = collection;
1406   } else {
1407     /* We need to check who emitted this collection (the owner).
1408      * If we already had a collection from that user, this one is an update,
1409      * that is to say that we need to figure out how we are going to re-use
1410      * the streams/slot */
1411     GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1412     /* FIXME : When do we switch from pending collection to active collection ?
1413      * When all streams from active collection are drained in multiqueue output ? */
1414     gst_object_unref (dbin->collection);
1415     dbin->collection = collection;
1416     /* dbin->pending_collection = */
1417     /*     g_list_append (dbin->pending_collection, collection); */
1418   }
1419   SELECTION_UNLOCK (dbin);
1420 }
1421 
1422 static void
gst_decodebin3_handle_message(GstBin * bin,GstMessage * message)1423 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1424 {
1425   GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1426   gboolean posting_collection = FALSE;
1427 
1428   GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1429 
1430   switch (GST_MESSAGE_TYPE (message)) {
1431     case GST_MESSAGE_STREAM_COLLECTION:
1432     {
1433       GstStreamCollection *collection = NULL;
1434       gst_message_parse_stream_collection (message, &collection);
1435       if (collection) {
1436         INPUT_LOCK (dbin);
1437         handle_stream_collection (dbin, collection,
1438             (GstElement *) GST_MESSAGE_SRC (message));
1439         posting_collection = TRUE;
1440         INPUT_UNLOCK (dbin);
1441       }
1442 
1443       SELECTION_LOCK (dbin);
1444       if (dbin->collection && collection != dbin->collection) {
1445         /* Replace collection message, we most likely aggregated it */
1446         GstMessage *new_msg;
1447         new_msg =
1448             gst_message_new_stream_collection ((GstObject *) dbin,
1449             dbin->collection);
1450         gst_message_unref (message);
1451         message = new_msg;
1452       }
1453       SELECTION_UNLOCK (dbin);
1454 
1455       if (collection)
1456         gst_object_unref (collection);
1457       break;
1458     }
1459     default:
1460       break;
1461   }
1462 
1463   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1464 
1465   if (posting_collection) {
1466     /* Figure out a selection for that collection */
1467     update_requested_selection (dbin);
1468   }
1469 }
1470 
1471 static DecodebinOutputStream *
find_free_compatible_output(GstDecodebin3 * dbin,GstStream * stream)1472 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
1473 {
1474   GList *tmp;
1475   GstStreamType stype = gst_stream_get_stream_type (stream);
1476 
1477   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1478     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1479     if (output->type == stype && output->slot && output->slot->active_stream) {
1480       GstStream *tstream = output->slot->active_stream;
1481       if (!stream_in_list (dbin->requested_selection,
1482               (gchar *) gst_stream_get_stream_id (tstream))) {
1483         return output;
1484       }
1485     }
1486   }
1487 
1488   return NULL;
1489 }
1490 
1491 /* Give a certain slot, figure out if it should be linked to an
1492  * output stream
1493  * CALL WITH SELECTION LOCK TAKEN !*/
1494 static DecodebinOutputStream *
get_output_for_slot(MultiQueueSlot * slot)1495 get_output_for_slot (MultiQueueSlot * slot)
1496 {
1497   GstDecodebin3 *dbin = slot->dbin;
1498   DecodebinOutputStream *output = NULL;
1499   const gchar *stream_id;
1500   GstCaps *caps;
1501   gchar *id_in_list = NULL;
1502 
1503   /* If we already have a configured output, just use it */
1504   if (slot->output != NULL)
1505     return slot->output;
1506 
1507   /*
1508    * FIXME
1509    *
1510    * This method needs to be split into multiple parts
1511    *
1512    * 1) Figure out whether stream should be exposed or not
1513    *   This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
1514    *   in the default stream attribution
1515    *
1516    * 2) Figure out whether an output stream should be created, whether
1517    *   we can re-use the output stream already linked to the slot, or
1518    *   whether we need to get re-assigned another (currently used) output
1519    *   stream.
1520    */
1521 
1522   stream_id = gst_stream_get_stream_id (slot->active_stream);
1523   caps = gst_stream_get_caps (slot->active_stream);
1524   GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
1525   gst_caps_unref (caps);
1526 
1527   /* 0. Emit autoplug-continue signal for pending caps ? */
1528   GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
1529 
1530   /* 1. if in EXPOSE_ALL_MODE, just accept */
1531   GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
1532 
1533 #if 0
1534   /* FIXME : The idea around this was to avoid activating a stream for
1535    *     which we have no decoder. Unfortunately it is way too
1536    *     expensive. Need to figure out a better solution */
1537   /* 2. Is there a potential decoder (if one is required) */
1538   if (!gst_caps_can_intersect (caps, dbin->caps)
1539       && !have_factory (dbin, (GstCaps *) caps,
1540           GST_ELEMENT_FACTORY_TYPE_DECODER)) {
1541     GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
1542         caps);
1543     SELECTION_UNLOCK (dbin);
1544     gst_element_post_message (GST_ELEMENT_CAST (dbin),
1545         gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
1546     SELECTION_LOCK (dbin);
1547     return NULL;
1548   }
1549 #endif
1550 
1551   /* 3. In default mode check if we should expose */
1552   id_in_list = (gchar *) stream_in_list (dbin->requested_selection, stream_id);
1553   if (id_in_list) {
1554     /* Check if we can steal an existing output stream we could re-use.
1555      * that is:
1556      * * an output stream whose slot->stream is not in requested
1557      * * and is of the same type as this stream
1558      */
1559     output = find_free_compatible_output (dbin, slot->active_stream);
1560     if (output) {
1561       /* Move this output from its current slot to this slot */
1562       dbin->to_activate =
1563           g_list_append (dbin->to_activate, (gchar *) stream_id);
1564       dbin->requested_selection =
1565           g_list_remove (dbin->requested_selection, id_in_list);
1566       g_free (id_in_list);
1567       SELECTION_UNLOCK (dbin);
1568       gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
1569           (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
1570       SELECTION_LOCK (dbin);
1571       return NULL;
1572     }
1573 
1574     output = create_output_stream (dbin, slot->type);
1575     output->slot = slot;
1576     GST_DEBUG ("Linking slot %p to new output %p", slot, output);
1577     slot->output = output;
1578     dbin->active_selection =
1579         g_list_append (dbin->active_selection, (gchar *) stream_id);
1580   } else
1581     GST_DEBUG ("Not creating any output for slot %p", slot);
1582 
1583   return output;
1584 }
1585 
1586 /* Returns SELECTED_STREAMS message if active_selection is equal to
1587  * requested_selection, else NULL.
1588  * Must be called with LOCK taken */
1589 static GstMessage *
is_selection_done(GstDecodebin3 * dbin)1590 is_selection_done (GstDecodebin3 * dbin)
1591 {
1592   GList *tmp;
1593   GstMessage *msg;
1594 
1595   if (!dbin->selection_updated)
1596     return NULL;
1597 
1598   GST_LOG_OBJECT (dbin, "Checking");
1599 
1600   if (dbin->to_activate != NULL) {
1601     GST_DEBUG ("Still have streams to activate");
1602     return NULL;
1603   }
1604   for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
1605     GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
1606     if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
1607       GST_DEBUG ("Not in active selection, returning");
1608       return NULL;
1609     }
1610   }
1611 
1612   GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
1613 
1614   /* We are completely active */
1615   msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
1616   GST_MESSAGE_SEQNUM (msg) = dbin->select_streams_seqnum;
1617   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1618     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1619     if (output->slot) {
1620       GST_DEBUG_OBJECT (dbin, "Adding stream %s",
1621           gst_stream_get_stream_id (output->slot->active_stream));
1622 
1623       gst_message_streams_selected_add (msg, output->slot->active_stream);
1624     } else
1625       GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
1626   }
1627   dbin->selection_updated = FALSE;
1628   return msg;
1629 }
1630 
1631 /* Must be called with SELECTION_LOCK taken */
1632 static void
check_all_slot_for_eos(GstDecodebin3 * dbin)1633 check_all_slot_for_eos (GstDecodebin3 * dbin)
1634 {
1635   gboolean all_drained = TRUE;
1636   GList *iter;
1637 
1638   GST_DEBUG_OBJECT (dbin, "check slot for eos");
1639 
1640   for (iter = dbin->slots; iter; iter = iter->next) {
1641     MultiQueueSlot *slot = iter->data;
1642 
1643     if (!slot->output)
1644       continue;
1645 
1646     if (slot->is_drained) {
1647       GST_LOG_OBJECT (slot->sink_pad, "slot %p is drained", slot);
1648       continue;
1649     }
1650 
1651     all_drained = FALSE;
1652     break;
1653   }
1654 
1655   if (all_drained) {
1656     INPUT_LOCK (dbin);
1657     if (!pending_pads_are_eos (dbin->main_input))
1658       all_drained = FALSE;
1659 
1660     if (all_drained) {
1661       for (iter = dbin->other_inputs; iter; iter = iter->next) {
1662         if (!pending_pads_are_eos ((DecodebinInput *) iter->data)) {
1663           all_drained = FALSE;
1664           break;
1665         }
1666       }
1667     }
1668     INPUT_UNLOCK (dbin);
1669   }
1670 
1671   if (all_drained) {
1672     GST_DEBUG_OBJECT (dbin,
1673         "All active slots are drained, and no pending input, push EOS");
1674 
1675     for (iter = dbin->input_streams; iter; iter = iter->next) {
1676       DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
1677       GstPad *peer = gst_pad_get_peer (input->srcpad);
1678 
1679       /* Send EOS to all slots */
1680       if (peer) {
1681         GstEvent *stream_start, *eos;
1682 
1683         stream_start =
1684             gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
1685 
1686         /* First forward a custom STREAM_START event to reset the EOS status (if any) */
1687         if (stream_start) {
1688           GstStructure *s;
1689           GstEvent *custom_stream_start = gst_event_copy (stream_start);
1690           gst_event_unref (stream_start);
1691           s = (GstStructure *) gst_event_get_structure (custom_stream_start);
1692           gst_structure_set (s, "decodebin3-flushing-stream-start",
1693               G_TYPE_BOOLEAN, TRUE, NULL);
1694           gst_pad_send_event (peer, custom_stream_start);
1695         }
1696 
1697         eos = gst_event_new_eos ();
1698         gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (eos),
1699             CUSTOM_FINAL_EOS_QUARK, (gchar *) CUSTOM_FINAL_EOS_QUARK_DATA,
1700             NULL);
1701         gst_pad_send_event (peer, eos);
1702         gst_object_unref (peer);
1703       } else
1704         GST_DEBUG_OBJECT (dbin, "no output");
1705     }
1706   }
1707 }
1708 
1709 static GstPadProbeReturn
multiqueue_src_probe(GstPad * pad,GstPadProbeInfo * info,MultiQueueSlot * slot)1710 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
1711     MultiQueueSlot * slot)
1712 {
1713   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
1714   GstDecodebin3 *dbin = slot->dbin;
1715 
1716   if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
1717     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
1718 
1719     GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
1720     switch (GST_EVENT_TYPE (ev)) {
1721       case GST_EVENT_STREAM_START:
1722       {
1723         GstStream *stream = NULL;
1724         const GstStructure *s = gst_event_get_structure (ev);
1725 
1726         /* Drop STREAM_START events used to cleanup multiqueue */
1727         if (s
1728             && gst_structure_has_field (s,
1729                 "decodebin3-flushing-stream-start")) {
1730           ret = GST_PAD_PROBE_HANDLED;
1731           gst_event_unref (ev);
1732           break;
1733         }
1734 
1735         gst_event_parse_stream (ev, &stream);
1736         if (stream == NULL) {
1737           GST_ERROR_OBJECT (pad,
1738               "Got a STREAM_START event without a GstStream");
1739           break;
1740         }
1741         slot->is_drained = FALSE;
1742         GST_DEBUG_OBJECT (pad, "Stream Start '%s'",
1743             gst_stream_get_stream_id (stream));
1744         if (slot->active_stream == NULL) {
1745           slot->active_stream = stream;
1746         } else if (slot->active_stream != stream) {
1747           GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
1748               gst_stream_get_stream_id (slot->active_stream),
1749               gst_stream_get_stream_id (stream));
1750           gst_object_unref (slot->active_stream);
1751           slot->active_stream = stream;
1752         } else
1753           gst_object_unref (stream);
1754 #if 0                           /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
1755         {
1756           gboolean is_active, is_requested;
1757           /* Quick check to see if we're in the current selection */
1758           /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
1759           SELECTION_LOCK (dbin);
1760           GST_DEBUG_OBJECT (dbin, "Checking active selection");
1761           is_active = stream_in_list (dbin->active_selection, stream_id);
1762           GST_DEBUG_OBJECT (dbin, "Checking requested selection");
1763           is_requested = stream_in_list (dbin->requested_selection, stream_id);
1764           SELECTION_UNLOCK (dbin);
1765           if (is_active)
1766             GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
1767                 slot->output);
1768           if (is_requested)
1769             GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
1770                 slot->output);
1771           else if (slot->output) {
1772             GST_DEBUG_OBJECT (pad,
1773                 "Slot needs to be deactivated ? It's no longer in requested selection");
1774           } else if (!is_active)
1775             GST_DEBUG_OBJECT (pad,
1776                 "Slot in neither active nor requested selection");
1777         }
1778 #endif
1779       }
1780         break;
1781       case GST_EVENT_CAPS:
1782       {
1783         /* Configure the output slot if needed */
1784         DecodebinOutputStream *output;
1785         GstMessage *msg = NULL;
1786         SELECTION_LOCK (dbin);
1787         output = get_output_for_slot (slot);
1788         if (output) {
1789           reconfigure_output_stream (output, slot);
1790           msg = is_selection_done (dbin);
1791         }
1792         SELECTION_UNLOCK (dbin);
1793         if (msg)
1794           gst_element_post_message ((GstElement *) slot->dbin, msg);
1795       }
1796         break;
1797       case GST_EVENT_EOS:
1798       {
1799         gboolean was_drained = slot->is_drained;
1800         slot->is_drained = TRUE;
1801 
1802         /* Custom EOS handling first */
1803         if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
1804                 CUSTOM_EOS_QUARK)) {
1805           /* remove custom-eos */
1806           gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev),
1807               CUSTOM_EOS_QUARK, NULL, NULL);
1808           GST_LOG_OBJECT (pad, "Received custom EOS");
1809           ret = GST_PAD_PROBE_HANDLED;
1810           SELECTION_LOCK (dbin);
1811           if (slot->input == NULL) {
1812             GST_DEBUG_OBJECT (pad,
1813                 "Got custom-eos from null input stream, remove output stream");
1814             /* Remove the output */
1815             if (slot->output) {
1816               DecodebinOutputStream *output = slot->output;
1817               dbin->output_streams =
1818                   g_list_remove (dbin->output_streams, output);
1819               free_output_stream (dbin, output);
1820             }
1821             slot->probe_id = 0;
1822             dbin->slots = g_list_remove (dbin->slots, slot);
1823             free_multiqueue_slot_async (dbin, slot);
1824             ret = GST_PAD_PROBE_REMOVE;
1825           } else if (!was_drained) {
1826             check_all_slot_for_eos (dbin);
1827           }
1828           if (ret == GST_PAD_PROBE_HANDLED)
1829             gst_event_unref (ev);
1830           SELECTION_UNLOCK (dbin);
1831           break;
1832         }
1833 
1834         GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
1835             slot->input);
1836         if (slot->input == NULL) {
1837           GstPad *peer;
1838           GST_DEBUG_OBJECT (pad,
1839               "last EOS for input, forwarding and removing slot");
1840           peer = gst_pad_get_peer (pad);
1841           if (peer) {
1842             gst_pad_send_event (peer, ev);
1843             gst_object_unref (peer);
1844           } else {
1845             gst_event_unref (ev);
1846           }
1847           SELECTION_LOCK (dbin);
1848           /* FIXME : Shouldn't we try to re-assign the output instead of just
1849            * removing it ? */
1850           /* Remove the output */
1851           if (slot->output) {
1852             DecodebinOutputStream *output = slot->output;
1853             dbin->output_streams = g_list_remove (dbin->output_streams, output);
1854             free_output_stream (dbin, output);
1855           }
1856           slot->probe_id = 0;
1857           dbin->slots = g_list_remove (dbin->slots, slot);
1858           SELECTION_UNLOCK (dbin);
1859 
1860           free_multiqueue_slot_async (dbin, slot);
1861           ret = GST_PAD_PROBE_REMOVE;
1862         } else if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
1863                 CUSTOM_FINAL_EOS_QUARK)) {
1864           GST_DEBUG_OBJECT (pad, "Got final eos, propagating downstream");
1865         } else {
1866           GST_DEBUG_OBJECT (pad, "Got regular eos (all_inputs_are_eos)");
1867           /* drop current event as eos will be sent in check_all_slot_for_eos
1868            * when all output streams are also eos */
1869           ret = GST_PAD_PROBE_DROP;
1870           SELECTION_LOCK (dbin);
1871           check_all_slot_for_eos (dbin);
1872           SELECTION_UNLOCK (dbin);
1873         }
1874       }
1875         break;
1876       default:
1877         break;
1878     }
1879   } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
1880     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
1881     switch (GST_QUERY_TYPE (query)) {
1882       case GST_QUERY_CAPS:
1883       {
1884         GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
1885         gst_query_set_caps_result (query, GST_CAPS_ANY);
1886         ret = GST_PAD_PROBE_HANDLED;
1887       }
1888         break;
1889 
1890       case GST_QUERY_ACCEPT_CAPS:
1891       {
1892         GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
1893         /* If the current decoder doesn't accept caps, we'll reconfigure
1894          * on the actual caps event. So accept any caps. */
1895         gst_query_set_accept_caps_result (query, TRUE);
1896         ret = GST_PAD_PROBE_HANDLED;
1897       }
1898       default:
1899         break;
1900     }
1901   }
1902 
1903   return ret;
1904 }
1905 
1906 /* Create a new multiqueue slot for the given type
1907  *
1908  * It is up to the caller to know whether that slot is needed or not
1909  * (and release it when no longer needed) */
1910 static MultiQueueSlot *
create_new_slot(GstDecodebin3 * dbin,GstStreamType type)1911 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
1912 {
1913   MultiQueueSlot *slot;
1914   GstIterator *it = NULL;
1915   GValue item = { 0, };
1916 
1917   GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
1918       gst_stream_type_get_name (type));
1919   slot = g_new0 (MultiQueueSlot, 1);
1920   slot->dbin = dbin;
1921 
1922   slot->id = dbin->slot_id++;
1923 
1924   slot->type = type;
1925   slot->sink_pad = gst_element_get_request_pad (dbin->multiqueue, "sink_%u");
1926   if (slot->sink_pad == NULL)
1927     goto fail;
1928 
1929   it = gst_pad_iterate_internal_links (slot->sink_pad);
1930   if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
1931       || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
1932     GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
1933         GST_DEBUG_PAD_NAME (slot->src_pad));
1934     goto fail;
1935   }
1936   gst_iterator_free (it);
1937   g_value_reset (&item);
1938 
1939   g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
1940 
1941   /* Add event probe */
1942   slot->probe_id =
1943       gst_pad_add_probe (slot->src_pad,
1944       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
1945       (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
1946 
1947   GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
1948       GST_DEBUG_PAD_NAME (slot->src_pad));
1949 
1950   dbin->slots = g_list_append (dbin->slots, slot);
1951 
1952   return slot;
1953 
1954   /* ERRORS */
1955 fail:
1956   {
1957     if (slot->sink_pad)
1958       gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
1959     g_free (slot);
1960     return NULL;
1961   }
1962 }
1963 
1964 /* Must be called with SELECTION_LOCK */
1965 static MultiQueueSlot *
get_slot_for_input(GstDecodebin3 * dbin,DecodebinInputStream * input)1966 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
1967 {
1968   GList *tmp;
1969   MultiQueueSlot *empty_slot = NULL;
1970   GstStreamType input_type = 0;
1971   gchar *stream_id = NULL;
1972 
1973   GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
1974       input, input->active_stream,
1975       input->
1976       active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
1977 
1978   if (input->active_stream) {
1979     input_type = gst_stream_get_stream_type (input->active_stream);
1980     stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
1981   }
1982 
1983   /* Go over existing slots and check if there is already one for it */
1984   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
1985     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
1986     /* Already used input, return that one */
1987     if (slot->input == input) {
1988       GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
1989       return slot;
1990     }
1991   }
1992 
1993   /* Go amongst all unused slots of the right type and try to find a candidate */
1994   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
1995     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
1996     if (slot->input == NULL && input_type == slot->type) {
1997       /* Remember this empty slot for later */
1998       empty_slot = slot;
1999       /* Check if available slot is of the same stream_id */
2000       GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
2001           slot->id, slot->active_stream);
2002       if (stream_id && slot->active_stream) {
2003         gchar *ostream_id =
2004             (gchar *) gst_stream_get_stream_id (slot->active_stream);
2005         GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
2006             ostream_id, stream_id);
2007         if (!g_strcmp0 (stream_id, ostream_id))
2008           break;
2009       }
2010     }
2011   }
2012 
2013   if (empty_slot) {
2014     GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
2015     empty_slot->input = input;
2016     return empty_slot;
2017   }
2018 
2019   if (input_type)
2020     return create_new_slot (dbin, input_type);
2021 
2022   return NULL;
2023 }
2024 
2025 static void
link_input_to_slot(DecodebinInputStream * input,MultiQueueSlot * slot)2026 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
2027 {
2028   if (slot->input != NULL && slot->input != input) {
2029     GST_ERROR_OBJECT (slot->dbin,
2030         "Trying to link input to an already used slot");
2031     return;
2032   }
2033   gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
2034   slot->pending_stream = input->active_stream;
2035   slot->input = input;
2036 }
2037 
2038 #if 0
2039 static gboolean
2040 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
2041     GstElementFactoryListType ftype)
2042 {
2043   gboolean ret = FALSE;
2044   GList *res;
2045 
2046   g_mutex_lock (&dbin->factories_lock);
2047   gst_decode_bin_update_factories_list (dbin);
2048   if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
2049     res =
2050         gst_element_factory_list_filter (dbin->decoder_factories,
2051         caps, GST_PAD_SINK, TRUE);
2052   else
2053     res =
2054         gst_element_factory_list_filter (dbin->decodable_factories,
2055         caps, GST_PAD_SINK, TRUE);
2056   g_mutex_unlock (&dbin->factories_lock);
2057 
2058   if (res) {
2059     ret = TRUE;
2060     gst_plugin_feature_list_free (res);
2061   }
2062 
2063   return ret;
2064 }
2065 #endif
2066 
2067 static GstElement *
create_element(GstDecodebin3 * dbin,GstStream * stream,GstElementFactoryListType ftype)2068 create_element (GstDecodebin3 * dbin, GstStream * stream,
2069     GstElementFactoryListType ftype)
2070 {
2071   GList *res;
2072   GstElement *element = NULL;
2073   GstCaps *caps;
2074 
2075   g_mutex_lock (&dbin->factories_lock);
2076   gst_decode_bin_update_factories_list (dbin);
2077   caps = gst_stream_get_caps (stream);
2078   if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
2079     res =
2080         gst_element_factory_list_filter (dbin->decoder_factories,
2081         caps, GST_PAD_SINK, TRUE);
2082   else
2083     res =
2084         gst_element_factory_list_filter (dbin->decodable_factories,
2085         caps, GST_PAD_SINK, TRUE);
2086   g_mutex_unlock (&dbin->factories_lock);
2087 
2088   if (res) {
2089     element =
2090         gst_element_factory_create ((GstElementFactory *) res->data, NULL);
2091     GST_DEBUG ("Created element '%s'", GST_ELEMENT_NAME (element));
2092     gst_plugin_feature_list_free (res);
2093   } else {
2094     GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT, caps);
2095   }
2096 
2097   gst_caps_unref (caps);
2098   return element;
2099 }
2100 
2101 /* FIXME : VERY NAIVE. ASSUMING FIRST ONE WILL WORK */
2102 static GstElement *
create_decoder(GstDecodebin3 * dbin,GstStream * stream)2103 create_decoder (GstDecodebin3 * dbin, GstStream * stream)
2104 {
2105   return create_element (dbin, stream, GST_ELEMENT_FACTORY_TYPE_DECODER);
2106 }
2107 
2108 static GstPadProbeReturn
keyframe_waiter_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinOutputStream * output)2109 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
2110     DecodebinOutputStream * output)
2111 {
2112   GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2113   /* If we have a keyframe, remove the probe and let all data through */
2114   /* FIXME : HANDLE HEADER BUFFER ?? */
2115   if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
2116       GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
2117     GST_DEBUG_OBJECT (pad,
2118         "Buffer is keyframe or header, letting through and removing probe");
2119     output->drop_probe_id = 0;
2120     return GST_PAD_PROBE_REMOVE;
2121   }
2122   GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
2123   return GST_PAD_PROBE_DROP;
2124 }
2125 
2126 static void
reconfigure_output_stream(DecodebinOutputStream * output,MultiQueueSlot * slot)2127 reconfigure_output_stream (DecodebinOutputStream * output,
2128     MultiQueueSlot * slot)
2129 {
2130   GstDecodebin3 *dbin = output->dbin;
2131   GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
2132   gboolean needs_decoder;
2133 
2134   needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
2135 
2136   GST_DEBUG_OBJECT (dbin,
2137       "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
2138       needs_decoder);
2139 
2140   /* FIXME : Maybe make the output un-hook itself automatically ? */
2141   if (output->slot != NULL && output->slot != slot) {
2142     GST_WARNING_OBJECT (dbin,
2143         "Output still linked to another slot (%p)", output->slot);
2144     gst_caps_unref (new_caps);
2145     return;
2146   }
2147 
2148   /* Check if existing config is reusable as-is by checking if
2149    * the existing decoder accepts the new caps, if not delete
2150    * it and create a new one */
2151   if (output->decoder) {
2152     gboolean can_reuse_decoder;
2153 
2154     if (needs_decoder) {
2155       can_reuse_decoder =
2156           gst_pad_query_accept_caps (output->decoder_sink, new_caps);
2157     } else
2158       can_reuse_decoder = FALSE;
2159 
2160     if (can_reuse_decoder) {
2161       if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
2162         GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2163         output->drop_probe_id =
2164             gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2165             (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2166       }
2167       GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
2168       if (output->linked == FALSE) {
2169         gst_pad_link_full (slot->src_pad, output->decoder_sink,
2170             GST_PAD_LINK_CHECK_NOTHING);
2171         output->linked = TRUE;
2172       }
2173       gst_caps_unref (new_caps);
2174       return;
2175     }
2176 
2177     GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
2178 
2179     if (output->linked)
2180       gst_pad_unlink (slot->src_pad, output->decoder_sink);
2181     output->linked = FALSE;
2182     if (output->drop_probe_id) {
2183       gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2184       output->drop_probe_id = 0;
2185     }
2186 
2187     if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2188       GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
2189       gst_caps_unref (new_caps);
2190       goto cleanup;
2191     }
2192 
2193     gst_element_set_locked_state (output->decoder, TRUE);
2194     gst_element_set_state (output->decoder, GST_STATE_NULL);
2195 
2196     gst_bin_remove ((GstBin *) dbin, output->decoder);
2197     output->decoder = NULL;
2198   }
2199 
2200   gst_caps_unref (new_caps);
2201 
2202   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2203   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2204 
2205   /* If a decoder is required, create one */
2206   if (needs_decoder) {
2207     /* If we don't have a decoder yet, instantiate one */
2208     output->decoder = create_decoder (dbin, slot->active_stream);
2209     if (output->decoder == NULL) {
2210       GstCaps *caps;
2211 
2212       SELECTION_UNLOCK (dbin);
2213       /* FIXME : Should we be smarter if there's a missing decoder ?
2214        * Should we deactivate that stream ? */
2215       caps = gst_stream_get_caps (slot->active_stream);
2216       gst_element_post_message (GST_ELEMENT_CAST (dbin),
2217           gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2218       gst_caps_unref (caps);
2219       SELECTION_LOCK (dbin);
2220       goto cleanup;
2221     }
2222     if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
2223       GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
2224       goto cleanup;
2225     }
2226     output->decoder_sink = gst_element_get_static_pad (output->decoder, "sink");
2227     output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
2228     if (output->type & GST_STREAM_TYPE_VIDEO) {
2229       GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2230       output->drop_probe_id =
2231           gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2232           (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2233     }
2234     if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
2235             GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
2236       GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
2237           GST_DEBUG_PAD_NAME (output->decoder_sink));
2238       goto cleanup;
2239     }
2240   } else {
2241     output->decoder_src = gst_object_ref (slot->src_pad);
2242     output->decoder_sink = NULL;
2243   }
2244   output->linked = TRUE;
2245   if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
2246           output->decoder_src)) {
2247     GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
2248     goto cleanup;
2249   }
2250   if (output->src_exposed == FALSE) {
2251     output->src_exposed = TRUE;
2252     gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
2253   }
2254 
2255   if (output->decoder)
2256     gst_element_sync_state_with_parent (output->decoder);
2257 
2258   output->slot = slot;
2259   return;
2260 
2261 cleanup:
2262   {
2263     GST_DEBUG_OBJECT (dbin, "Cleanup");
2264     if (output->decoder_sink) {
2265       gst_object_unref (output->decoder_sink);
2266       output->decoder_sink = NULL;
2267     }
2268     if (output->decoder_src) {
2269       gst_object_unref (output->decoder_src);
2270       output->decoder_src = NULL;
2271     }
2272     if (output->decoder) {
2273       gst_element_set_state (output->decoder, GST_STATE_NULL);
2274       gst_bin_remove ((GstBin *) dbin, output->decoder);
2275       output->decoder = NULL;
2276     }
2277   }
2278 }
2279 
2280 static GstPadProbeReturn
idle_reconfigure(GstPad * pad,GstPadProbeInfo * info,MultiQueueSlot * slot)2281 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
2282 {
2283   GstMessage *msg = NULL;
2284   DecodebinOutputStream *output;
2285 
2286   SELECTION_LOCK (slot->dbin);
2287   output = get_output_for_slot (slot);
2288 
2289   GST_DEBUG_OBJECT (pad, "output : %p", output);
2290 
2291   if (output) {
2292     reconfigure_output_stream (output, slot);
2293     msg = is_selection_done (slot->dbin);
2294   }
2295   SELECTION_UNLOCK (slot->dbin);
2296   if (msg)
2297     gst_element_post_message ((GstElement *) slot->dbin, msg);
2298 
2299   return GST_PAD_PROBE_REMOVE;
2300 }
2301 
2302 static MultiQueueSlot *
find_slot_for_stream_id(GstDecodebin3 * dbin,const gchar * sid)2303 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
2304 {
2305   GList *tmp;
2306 
2307   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2308     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2309     const gchar *stream_id;
2310     if (slot->active_stream) {
2311       stream_id = gst_stream_get_stream_id (slot->active_stream);
2312       if (!g_strcmp0 (sid, stream_id))
2313         return slot;
2314     }
2315     if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
2316       stream_id = gst_stream_get_stream_id (slot->pending_stream);
2317       if (!g_strcmp0 (sid, stream_id))
2318         return slot;
2319     }
2320   }
2321 
2322   return NULL;
2323 }
2324 
2325 /* This function handles the reassignment of a slot. Call this from
2326  * the streaming thread of a slot. */
2327 static gboolean
reassign_slot(GstDecodebin3 * dbin,MultiQueueSlot * slot)2328 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2329 {
2330   DecodebinOutputStream *output;
2331   MultiQueueSlot *target_slot = NULL;
2332   GList *tmp;
2333   const gchar *sid, *tsid;
2334 
2335   SELECTION_LOCK (dbin);
2336   output = slot->output;
2337 
2338   if (G_UNLIKELY (slot->active_stream == NULL)) {
2339     GST_DEBUG_OBJECT (slot->src_pad,
2340         "Called on inactive slot (active_stream == NULL)");
2341     SELECTION_UNLOCK (dbin);
2342     return FALSE;
2343   }
2344 
2345   if (G_UNLIKELY (output == NULL)) {
2346     GST_DEBUG_OBJECT (slot->src_pad,
2347         "Slot doesn't have any output to be removed");
2348     SELECTION_UNLOCK (dbin);
2349     return FALSE;
2350   }
2351 
2352   sid = gst_stream_get_stream_id (slot->active_stream);
2353   GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
2354 
2355   /* Recheck whether this stream is still in the list of streams to deactivate */
2356   if (stream_in_list (dbin->requested_selection, sid)) {
2357     /* Stream is in the list of requested streams, don't remove */
2358     SELECTION_UNLOCK (dbin);
2359     GST_DEBUG_OBJECT (slot->src_pad,
2360         "Stream '%s' doesn't need to be deactivated", sid);
2361     return FALSE;
2362   }
2363 
2364   /* Unlink slot from output */
2365   /* FIXME : Handle flushing ? */
2366   /* FIXME : Handle outputs without decoders */
2367   GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
2368       output->decoder_sink);
2369   if (output->decoder_sink)
2370     gst_pad_unlink (slot->src_pad, output->decoder_sink);
2371   output->linked = FALSE;
2372   slot->output = NULL;
2373   output->slot = NULL;
2374   /* Remove sid from active selection */
2375   for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
2376     if (!g_strcmp0 (sid, tmp->data)) {
2377       dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
2378       break;
2379     }
2380 
2381   /* Can we re-assign this output to a requested stream ? */
2382   GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
2383   for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
2384     MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
2385     GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
2386         tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
2387     if (tslot && tslot->type == output->type && tslot->output == NULL) {
2388       GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
2389       target_slot = tslot;
2390       tsid = tmp->data;
2391       /* Pass target stream id to requested selection */
2392       dbin->requested_selection =
2393           g_list_append (dbin->requested_selection, g_strdup (tmp->data));
2394       dbin->to_activate = g_list_remove (dbin->to_activate, tmp->data);
2395       break;
2396     }
2397   }
2398 
2399   if (target_slot) {
2400     GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
2401         target_slot, tsid);
2402     target_slot->output = output;
2403     output->slot = target_slot;
2404     dbin->active_selection =
2405         g_list_append (dbin->active_selection, (gchar *) tsid);
2406     SELECTION_UNLOCK (dbin);
2407 
2408     /* Wakeup the target slot so that it retries to send events/buffers
2409      * thereby triggering the output reconfiguration codepath */
2410     gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2411         (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
2412     /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
2413   } else {
2414     GstMessage *msg;
2415 
2416     dbin->output_streams = g_list_remove (dbin->output_streams, output);
2417     free_output_stream (dbin, output);
2418     msg = is_selection_done (slot->dbin);
2419     SELECTION_UNLOCK (dbin);
2420 
2421     if (msg)
2422       gst_element_post_message ((GstElement *) slot->dbin, msg);
2423   }
2424 
2425   return TRUE;
2426 }
2427 
2428 /* Idle probe called when a slot should be unassigned from its output stream.
2429  * This is needed to ensure nothing is flowing when unlinking the slot.
2430  *
2431  * Also, this method will search for a pending stream which could re-use
2432  * the output stream. */
2433 static GstPadProbeReturn
slot_unassign_probe(GstPad * pad,GstPadProbeInfo * info,MultiQueueSlot * slot)2434 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
2435     MultiQueueSlot * slot)
2436 {
2437   GstDecodebin3 *dbin = slot->dbin;
2438 
2439   reassign_slot (dbin, slot);
2440 
2441   return GST_PAD_PROBE_REMOVE;
2442 }
2443 
2444 static gboolean
handle_stream_switch(GstDecodebin3 * dbin,GList * select_streams,guint32 seqnum)2445 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
2446     guint32 seqnum)
2447 {
2448   gboolean ret = TRUE;
2449   GList *tmp;
2450   /* List of slots to (de)activate. */
2451   GList *to_deactivate = NULL;
2452   GList *to_activate = NULL;
2453   /* List of unknown stream id, most likely means the event
2454    * should be sent upstream so that elements can expose the requested stream */
2455   GList *unknown = NULL;
2456   GList *to_reassign = NULL;
2457   GList *future_request_streams = NULL;
2458   GList *pending_streams = NULL;
2459   GList *slots_to_reassign = NULL;
2460 
2461   SELECTION_LOCK (dbin);
2462   if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
2463     GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
2464     SELECTION_UNLOCK (dbin);
2465     return TRUE;
2466   }
2467   /* Remove pending select_streams */
2468   g_list_free (dbin->pending_select_streams);
2469   dbin->pending_select_streams = NULL;
2470 
2471   /* COMPARE the requested streams to the active and requested streams
2472    * on multiqueue. */
2473 
2474   /* First check the slots to activate and which ones are unknown */
2475   for (tmp = select_streams; tmp; tmp = tmp->next) {
2476     const gchar *sid = (const gchar *) tmp->data;
2477     MultiQueueSlot *slot;
2478     GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
2479     slot = find_slot_for_stream_id (dbin, sid);
2480     /* Find the corresponding slot */
2481     if (slot == NULL) {
2482       if (stream_in_collection (dbin, (gchar *) sid)) {
2483         pending_streams = g_list_append (pending_streams, (gchar *) sid);
2484       } else {
2485         GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
2486         unknown = g_list_append (unknown, (gchar *) sid);
2487       }
2488     } else if (slot->output == NULL) {
2489       GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
2490           slot, sid);
2491       to_activate = g_list_append (to_activate, slot);
2492     } else {
2493       GST_DEBUG_OBJECT (dbin,
2494           "Stream '%s' from slot %p is already active on output %p", sid, slot,
2495           slot->output);
2496       future_request_streams =
2497           g_list_append (future_request_streams, (gchar *) sid);
2498     }
2499   }
2500 
2501   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2502     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2503     /* For slots that have an output, check if it's part of the streams to
2504      * be active */
2505     if (slot->output) {
2506       gboolean slot_to_deactivate = TRUE;
2507 
2508       if (slot->active_stream) {
2509         if (stream_in_list (select_streams,
2510                 gst_stream_get_stream_id (slot->active_stream)))
2511           slot_to_deactivate = FALSE;
2512       }
2513       if (slot_to_deactivate && slot->pending_stream
2514           && slot->pending_stream != slot->active_stream) {
2515         if (stream_in_list (select_streams,
2516                 gst_stream_get_stream_id (slot->pending_stream)))
2517           slot_to_deactivate = FALSE;
2518       }
2519       if (slot_to_deactivate) {
2520         GST_DEBUG_OBJECT (dbin,
2521             "Slot %p (%s) should be deactivated, no longer used", slot,
2522             slot->
2523             active_stream ? gst_stream_get_stream_id (slot->active_stream) :
2524             "NULL");
2525         to_deactivate = g_list_append (to_deactivate, slot);
2526       }
2527     }
2528   }
2529 
2530   if (to_deactivate != NULL) {
2531     GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
2532     /* We need to compare what needs to be activated and deactivated in order
2533      * to determine whether there are outputs that can be transferred */
2534     /* Take the stream-id of the slots that are to be activated, for which there
2535      * is a slot of the same type that needs to be deactivated */
2536     tmp = to_deactivate;
2537     while (tmp) {
2538       MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
2539       gboolean removeit = FALSE;
2540       GList *tmp2, *next;
2541       GST_DEBUG_OBJECT (dbin,
2542           "Checking if slot to deactivate (%p) has a candidate slot to activate",
2543           slot_to_deactivate);
2544       for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
2545         MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
2546         GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
2547         if (slot_to_activate->type == slot_to_deactivate->type) {
2548           GST_DEBUG_OBJECT (dbin, "Re-using");
2549           to_reassign = g_list_append (to_reassign, (gchar *)
2550               gst_stream_get_stream_id (slot_to_activate->active_stream));
2551           slots_to_reassign =
2552               g_list_append (slots_to_reassign, slot_to_deactivate);
2553           to_activate = g_list_remove (to_activate, slot_to_activate);
2554           removeit = TRUE;
2555           break;
2556         }
2557       }
2558       next = tmp->next;
2559       if (removeit)
2560         to_deactivate = g_list_delete_link (to_deactivate, tmp);
2561       tmp = next;
2562     }
2563   }
2564 
2565   for (tmp = to_deactivate; tmp; tmp = tmp->next) {
2566     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2567     GST_DEBUG_OBJECT (dbin,
2568         "Really need to deactivate slot %p, but no available alternative",
2569         slot);
2570 
2571     slots_to_reassign = g_list_append (slots_to_reassign, slot);
2572   }
2573 
2574   /* The only slots left to activate are the ones that won't be reassigned and
2575    * therefore really need to have a new output created */
2576   for (tmp = to_activate; tmp; tmp = tmp->next) {
2577     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2578     if (slot->active_stream)
2579       future_request_streams =
2580           g_list_append (future_request_streams,
2581           (gchar *) gst_stream_get_stream_id (slot->active_stream));
2582     else if (slot->pending_stream)
2583       future_request_streams =
2584           g_list_append (future_request_streams,
2585           (gchar *) gst_stream_get_stream_id (slot->pending_stream));
2586     else
2587       GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
2588   }
2589 
2590   if (to_activate == NULL && pending_streams != NULL) {
2591     GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
2592     if (dbin->requested_selection)
2593       g_list_free_full (dbin->requested_selection, g_free);
2594     dbin->requested_selection =
2595         g_list_copy_deep (select_streams, (GCopyFunc) g_strdup, NULL);
2596     g_list_free (to_deactivate);
2597     g_list_free (pending_streams);
2598     to_deactivate = NULL;
2599     pending_streams = NULL;
2600   } else {
2601     if (dbin->requested_selection)
2602       g_list_free_full (dbin->requested_selection, g_free);
2603     dbin->requested_selection =
2604         g_list_copy_deep (future_request_streams, (GCopyFunc) g_strdup, NULL);
2605     dbin->requested_selection =
2606         g_list_concat (dbin->requested_selection,
2607         g_list_copy_deep (pending_streams, (GCopyFunc) g_strdup, NULL));
2608     if (dbin->to_activate)
2609       g_list_free (dbin->to_activate);
2610     dbin->to_activate = g_list_copy (to_reassign);
2611   }
2612 
2613   dbin->selection_updated = TRUE;
2614   SELECTION_UNLOCK (dbin);
2615 
2616   if (unknown) {
2617     GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
2618     g_list_free (unknown);
2619   }
2620 
2621   if (to_activate && !slots_to_reassign) {
2622     for (tmp = to_activate; tmp; tmp = tmp->next) {
2623       MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2624       gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2625           (GstPadProbeCallback) idle_reconfigure, slot, NULL);
2626     }
2627   }
2628 
2629   /* For all streams to deactivate, add an idle probe where we will do
2630    * the unassignment and switch over */
2631   for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
2632     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2633     gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2634         (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
2635   }
2636 
2637   if (to_deactivate)
2638     g_list_free (to_deactivate);
2639   if (to_activate)
2640     g_list_free (to_activate);
2641   if (to_reassign)
2642     g_list_free (to_reassign);
2643   if (future_request_streams)
2644     g_list_free (future_request_streams);
2645   if (pending_streams)
2646     g_list_free (pending_streams);
2647   if (slots_to_reassign)
2648     g_list_free (slots_to_reassign);
2649 
2650   return ret;
2651 }
2652 
2653 static GstPadProbeReturn
ghost_pad_event_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinOutputStream * output)2654 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
2655     DecodebinOutputStream * output)
2656 {
2657   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2658   GstDecodebin3 *dbin = output->dbin;
2659   GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
2660 
2661   GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
2662 
2663   switch (GST_EVENT_TYPE (event)) {
2664     case GST_EVENT_SELECT_STREAMS:
2665     {
2666       GstPad *peer;
2667       GList *streams = NULL;
2668       guint32 seqnum = gst_event_get_seqnum (event);
2669 
2670       SELECTION_LOCK (dbin);
2671       if (seqnum == dbin->select_streams_seqnum) {
2672         SELECTION_UNLOCK (dbin);
2673         GST_DEBUG_OBJECT (pad,
2674             "Already handled/handling that SELECT_STREAMS event");
2675         gst_event_unref (event);
2676         ret = GST_PAD_PROBE_HANDLED;
2677         break;
2678       }
2679       dbin->select_streams_seqnum = seqnum;
2680       if (dbin->pending_select_streams != NULL) {
2681         GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2682         g_list_free (dbin->pending_select_streams);
2683         dbin->pending_select_streams = NULL;
2684       }
2685       gst_event_parse_select_streams (event, &streams);
2686       dbin->pending_select_streams = g_list_copy (streams);
2687       SELECTION_UNLOCK (dbin);
2688 
2689       /* Send event upstream */
2690       if ((peer = gst_pad_get_peer (pad))) {
2691         gst_pad_send_event (peer, event);
2692         gst_object_unref (peer);
2693       } else {
2694         gst_event_unref (event);
2695       }
2696       /* Finally handle the switch */
2697       if (streams) {
2698         handle_stream_switch (dbin, streams, seqnum);
2699         g_list_free_full (streams, g_free);
2700       }
2701       ret = GST_PAD_PROBE_HANDLED;
2702     }
2703       break;
2704     default:
2705       break;
2706   }
2707 
2708   return ret;
2709 }
2710 
2711 static gboolean
gst_decodebin3_send_event(GstElement * element,GstEvent * event)2712 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
2713 {
2714   GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
2715   if (GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
2716     GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2717     GList *streams = NULL;
2718     guint32 seqnum = gst_event_get_seqnum (event);
2719 
2720     SELECTION_LOCK (dbin);
2721     if (seqnum == dbin->select_streams_seqnum) {
2722       SELECTION_UNLOCK (dbin);
2723       GST_DEBUG_OBJECT (dbin,
2724           "Already handled/handling that SELECT_STREAMS event");
2725       return TRUE;
2726     }
2727     dbin->select_streams_seqnum = seqnum;
2728     if (dbin->pending_select_streams != NULL) {
2729       GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2730       g_list_free (dbin->pending_select_streams);
2731       dbin->pending_select_streams = NULL;
2732     }
2733     gst_event_parse_select_streams (event, &streams);
2734     dbin->pending_select_streams = g_list_copy (streams);
2735     SELECTION_UNLOCK (dbin);
2736 
2737     /* FIXME : We don't have an upstream ?? */
2738 #if 0
2739     /* Send event upstream */
2740     if ((peer = gst_pad_get_peer (pad))) {
2741       gst_pad_send_event (peer, event);
2742       gst_object_unref (peer);
2743     }
2744 #endif
2745     /* Finally handle the switch */
2746     if (streams) {
2747       handle_stream_switch (dbin, streams, seqnum);
2748       g_list_free_full (streams, g_free);
2749     }
2750 
2751     gst_event_unref (event);
2752     return TRUE;
2753   }
2754   return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
2755 }
2756 
2757 
2758 static void
free_multiqueue_slot(GstDecodebin3 * dbin,MultiQueueSlot * slot)2759 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2760 {
2761   if (slot->probe_id)
2762     gst_pad_remove_probe (slot->src_pad, slot->probe_id);
2763   if (slot->input) {
2764     if (slot->input->srcpad)
2765       gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
2766   }
2767 
2768   gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2769   gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
2770   gst_object_replace ((GstObject **) & slot->src_pad, NULL);
2771   gst_object_replace ((GstObject **) & slot->active_stream, NULL);
2772   g_free (slot);
2773 }
2774 
2775 static void
free_multiqueue_slot_async(GstDecodebin3 * dbin,MultiQueueSlot * slot)2776 free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2777 {
2778   GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
2779   gst_element_call_async (GST_ELEMENT_CAST (dbin),
2780       (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
2781 }
2782 
2783 /* Create a DecodebinOutputStream for a given type
2784  * Note: It will be empty initially, it needs to be configured
2785  * afterwards */
2786 static DecodebinOutputStream *
create_output_stream(GstDecodebin3 * dbin,GstStreamType type)2787 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
2788 {
2789   DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
2790   gchar *pad_name;
2791   const gchar *prefix;
2792   GstStaticPadTemplate *templ;
2793   GstPadTemplate *ptmpl;
2794   guint32 *counter;
2795   GstPad *internal_pad;
2796 
2797   GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
2798       res, gst_stream_type_get_name (type));
2799 
2800   res->type = type;
2801   res->dbin = dbin;
2802 
2803   if (type & GST_STREAM_TYPE_VIDEO) {
2804     templ = &video_src_template;
2805     counter = &dbin->vpadcount;
2806     prefix = "video";
2807   } else if (type & GST_STREAM_TYPE_AUDIO) {
2808     templ = &audio_src_template;
2809     counter = &dbin->apadcount;
2810     prefix = "audio";
2811   } else if (type & GST_STREAM_TYPE_TEXT) {
2812     templ = &text_src_template;
2813     counter = &dbin->tpadcount;
2814     prefix = "text";
2815   } else {
2816     templ = &src_template;
2817     counter = &dbin->opadcount;
2818     prefix = "src";
2819   }
2820 
2821   pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
2822   *counter += 1;
2823   ptmpl = gst_static_pad_template_get (templ);
2824   res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
2825   gst_object_unref (ptmpl);
2826   g_free (pad_name);
2827   gst_pad_set_active (res->src_pad, TRUE);
2828   /* Put an event probe on the internal proxy pad to detect upstream
2829    * events */
2830   internal_pad =
2831       (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
2832   gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
2833       (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
2834   gst_object_unref (internal_pad);
2835 
2836   dbin->output_streams = g_list_append (dbin->output_streams, res);
2837 
2838   return res;
2839 }
2840 
2841 static void
free_output_stream(GstDecodebin3 * dbin,DecodebinOutputStream * output)2842 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
2843 {
2844   if (output->slot) {
2845     if (output->decoder_sink && output->decoder)
2846       gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
2847 
2848     output->slot->output = NULL;
2849     output->slot = NULL;
2850   }
2851   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2852   gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
2853   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2854   if (output->src_exposed) {
2855     gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
2856   }
2857   if (output->decoder) {
2858     gst_element_set_locked_state (output->decoder, TRUE);
2859     gst_element_set_state (output->decoder, GST_STATE_NULL);
2860     gst_bin_remove ((GstBin *) dbin, output->decoder);
2861   }
2862   g_free (output);
2863 }
2864 
2865 static GstStateChangeReturn
gst_decodebin3_change_state(GstElement * element,GstStateChange transition)2866 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
2867 {
2868   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2869   GstStateChangeReturn ret;
2870 
2871   /* Upwards */
2872   switch (transition) {
2873     default:
2874       break;
2875   }
2876   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2877   if (ret == GST_STATE_CHANGE_FAILURE)
2878     goto beach;
2879 
2880   switch (transition) {
2881     case GST_STATE_CHANGE_PAUSED_TO_READY:
2882     {
2883       GList *tmp;
2884 
2885       /* Free output streams */
2886       for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
2887         DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
2888         free_output_stream (dbin, output);
2889       }
2890       g_list_free (dbin->output_streams);
2891       dbin->output_streams = NULL;
2892       /* Free multiqueue slots */
2893       for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2894         MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2895         free_multiqueue_slot (dbin, slot);
2896       }
2897       g_list_free (dbin->slots);
2898       dbin->slots = NULL;
2899       dbin->current_group_id = GST_GROUP_ID_INVALID;
2900       /* Free inputs */
2901     }
2902       break;
2903     default:
2904       break;
2905   }
2906 beach:
2907   return ret;
2908 }
2909 
2910 gboolean
gst_decodebin3_plugin_init(GstPlugin * plugin)2911 gst_decodebin3_plugin_init (GstPlugin * plugin)
2912 {
2913   GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");
2914 
2915   return gst_element_register (plugin, "decodebin3", GST_RANK_NONE,
2916       GST_TYPE_DECODEBIN3);
2917 }
2918