1 /* GStreamer
2 *
3 * Copyright (C) <2015> Centricular Ltd
4 * @author: Edward Hervey <edward@centricular.com>
5 * @author: Jan Schmidt <jan@centricular.com>
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
16 *
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <glib.h>
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
30 #include <gst/gst.h>
31 #include <gst/pbutils/pbutils.h>
32
33 #include "gstplayback.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
36
37 /**
38 * SECTION:element-decodebin3
39 * @title: decodebin3
40 *
41 * #GstBin that auto-magically constructs a decoding pipeline using available
42 * decoders and demuxers via auto-plugging. The output is raw audio, video
43 * or subtitle streams.
44 *
45 * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
46 *
47 * * supports publication and selection of stream information via
48 * GstStreamCollection messages and #GST_EVENT_SELECT_STREAM events.
49 *
50 * * dynamically switches stream connections internally, and
51 * reuses decoder elements when stream selections change, so that in
52 * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
53 * and only creates new elements when streams change and an existing decoder
54 * is not capable of handling the new format.
55 *
56 * * supports multiple input pads for the parallel decoding of auxilliary streams
57 * not muxed with the primary stream.
58 *
59 * * does not handle network stream buffering. decodebin3 expects that network stream
60 * buffering is handled upstream, before data is passed to it.
61 *
62 * <emphasis>decodebin3 is still experimental API and a technology preview.
63 * Its behaviour and exposed API is subject to change.</emphasis>
64 *
65 */
66
67 /**
68 * Global design
69 *
70 * 1) From sink pad to elementary streams (GstParseBin)
71 *
72 * The input sink pads are fed to GstParseBin. GstParseBin will feed them
73 * through typefind. When the caps are detected (or changed) we recursively
74 * figure out which demuxer, parser or depayloader is needed until we get to
75 * elementary streams.
76 *
77 * All elementary streams (whether decoded or not, whether exposed or not) are
78 * fed through multiqueue. There is only *one* multiqueue in decodebin3.
79 *
80 * => MultiQueue is the cornerstone.
81 * => No buffering before multiqueue
82 *
83 * 2) Elementary streams
84 *
85 * After GstParseBin, there are 3 main components:
86 * 1) Input Streams (provided by GstParseBin)
87 * 2) Multiqueue slots
88 * 3) Output Streams
89 *
90 * Input Streams correspond to the stream coming from GstParseBin and that gets
91 * fed into a multiqueue slot.
92 *
93 * Output Streams correspond to the combination of a (optional) decoder and an
94 * output ghostpad. Output Streams can be moved from one multiqueue slot to
95 * another, can reconfigure itself (different decoders), and can be
96 * added/removed depending on the configuration (all streams outputted, only one
97 * of each type, ...).
98 *
99 * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
100 * each 'active' Input Stream there is a corresponding slot.
101 * Slots might have different streams on input and output (due to internal
102 * buffering).
103 *
104 * Due to internal queuing/buffering/..., all those components (might) behave
105 * asynchronously. Therefore probes will be used on each component source pad to
106 * detect various key-points:
107 * * EOS :
108 * the stream is done => Mark that component as done, optionally freeing/removing it
109 * * STREAM_START :
110 * a new stream is starting => link it further if needed
111 *
112 * 3) Gradual replacement
113 *
114 * If the caps change at any point in decodebin (input sink pad, demuxer output,
115 * multiqueue output, ..), we gradually replace (if needed) the following elements.
116 *
117 * This is handled by the probes in various locations:
118 * a) typefind output
119 * b) multiqueue input (source pad of Input Streams)
120 * c) multiqueue output (source pad of Multiqueue Slots)
121 * d) final output (target of source ghostpads)
122 *
123 * When CAPS event arrive at those points, one of three things can happen:
124 * a) There is no elements downstream yet, just create/link-to following elements
125 * b) There are downstream elements, do a ACCEPT_CAPS query
126 * b.1) The new CAPS are accepted, keep current configuration
127 * b.2) The new CAPS are not accepted, remove following elements then do a)
128 *
129 * Components:
130 *
131 * MultiQ Output
132 * Input(s) Slots Streams
133 * /-------------------------------------------\ /-----\ /------------- \
134 *
135 * +-------------------------------------------------------------------------+
136 * | |
137 * | +---------------------------------------------+ |
138 * | | GstParseBin(s) | |
139 * | | +--------------+ | +-----+ |
140 * | | | |---[parser]-[|--| Mul |---[ decoder ]-[|
141 * |]--[ typefind ]---| demuxer(s) |------------[| | ti | |
142 * | | | (if needed) |---[parser]-[|--| qu | |
143 * | | | |---[parser]-[|--| eu |---[ decoder ]-[|
144 * | | +--------------+ | +------ ^ |
145 * | +---------------------------------------------+ ^ | |
146 * | ^ | | |
147 * +-----------------------------------------------+--------+-------------+--+
148 * | | |
149 * | | |
150 * Probes --/--------/-------------/
151 *
152 * ATOMIC SWITCHING
153 *
154 * We want to ensure we re-use decoders when switching streams. This takes place
155 * at the multiqueue output level.
156 *
157 * MAIN CONCEPTS
158 * 1) Activating a stream (i.e. linking a slot to an output) is only done within
159 * the streaming thread in the multiqueue_src_probe() and only if the
160 stream is in the REQUESTED selection.
161 * 2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
162 * within the stream thread, but only in a purposefully called IDLE probe
163 * that calls reassign_slot().
164 *
165 * Based on those two principles, 3 "selection" of streams (stream-id) are used:
166 * 1) requested_selection
167 * All streams within that list should be activated
168 * 2) active_selection
169 * List of streams that are exposed by decodebin
170 * 3) to_activate
171 * List of streams that will be moved to requested_selection in the
172 * reassign_slot() method (i.e. once a stream was deactivated, and the output
173 * was retargetted)
174 */
175
176
177 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
178 #define GST_CAT_DEFAULT decodebin3_debug
179
180 #define GST_TYPE_DECODEBIN3 (gst_decodebin3_get_type ())
181
182 #define EXTRA_DEBUG 1
183
184 #define CUSTOM_FINAL_EOS_QUARK _custom_final_eos_quark_get ()
185 #define CUSTOM_FINAL_EOS_QUARK_DATA "custom-final-eos"
186 static GQuark
_custom_final_eos_quark_get(void)187 _custom_final_eos_quark_get (void)
188 {
189 static gsize g_quark;
190
191 if (g_once_init_enter (&g_quark)) {
192 gsize quark =
193 (gsize) g_quark_from_static_string ("decodebin3-custom-final-eos");
194 g_once_init_leave (&g_quark, quark);
195 }
196 return g_quark;
197 }
198
199 typedef struct _GstDecodebin3 GstDecodebin3;
200 typedef struct _GstDecodebin3Class GstDecodebin3Class;
201
202 typedef struct _DecodebinInputStream DecodebinInputStream;
203 typedef struct _DecodebinInput DecodebinInput;
204 typedef struct _DecodebinOutputStream DecodebinOutputStream;
205
206 struct _GstDecodebin3
207 {
208 GstBin bin;
209
210 /* input_lock protects the following variables */
211 GMutex input_lock;
212 /* Main input (static sink pad) */
213 DecodebinInput *main_input;
214 /* Supplementary input (request sink pads) */
215 GList *other_inputs;
216 /* counter for input */
217 guint32 input_counter;
218 /* Current stream group_id (default : GST_GROUP_ID_INVALID) */
219 /* FIXME : Needs to be resetted appropriately (when upstream changes ?) */
220 guint32 current_group_id;
221 /* End of variables protected by input_lock */
222
223 GstElement *multiqueue;
224
225 /* selection_lock protects access to following variables */
226 GMutex selection_lock;
227 GList *input_streams; /* List of DecodebinInputStream for active collection */
228 GList *output_streams; /* List of DecodebinOutputStream used for output */
229 GList *slots; /* List of MultiQueueSlot */
230 guint slot_id;
231
232 /* Active collection */
233 GstStreamCollection *collection;
234 /* requested selection of stream-id to activate post-multiqueue */
235 GList *requested_selection;
236 /* list of stream-id currently activated in output */
237 GList *active_selection;
238 /* List of stream-id that need to be activated (after a stream switch for ex) */
239 GList *to_activate;
240 /* Pending select streams event */
241 guint32 select_streams_seqnum;
242 /* pending list of streams to select (from downstream) */
243 GList *pending_select_streams;
244 /* TRUE if requested_selection was updated, will become FALSE once
245 * it has fully transitioned to active */
246 gboolean selection_updated;
247 /* End of variables protected by selection_lock */
248
249 /* List of pending collections.
250 * FIXME : Is this really needed ? */
251 GList *pending_collection;
252
253 /* Factories */
254 GMutex factories_lock;
255 guint32 factories_cookie;
256 /* All DECODABLE factories */
257 GList *factories;
258 /* Only DECODER factories */
259 GList *decoder_factories;
260 /* DECODABLE but not DECODER factories */
261 GList *decodable_factories;
262
263 /* counters for pads */
264 guint32 apadcount, vpadcount, tpadcount, opadcount;
265
266 /* Properties */
267 GstCaps *caps;
268 };
269
270 struct _GstDecodebin3Class
271 {
272 GstBinClass class;
273
274 gint (*select_stream) (GstDecodebin3 * dbin,
275 GstStreamCollection * collection, GstStream * stream);
276 };
277
278 /* Input of decodebin, controls input pad and parsebin */
279 struct _DecodebinInput
280 {
281 GstDecodebin3 *dbin;
282
283 gboolean is_main;
284
285 GstPad *ghost_sink;
286 GstPad *parsebin_sink;
287
288 GstStreamCollection *collection; /* Active collection */
289
290 guint group_id;
291
292 GstElement *parsebin;
293
294 gulong pad_added_sigid;
295 gulong pad_removed_sigid;
296 gulong drained_sigid;
297
298 /* TRUE if the input got drained
299 * FIXME : When do we reset it if re-used ?
300 */
301 gboolean drained;
302
303 /* HACK : Remove these fields */
304 /* List of PendingPad structures */
305 GList *pending_pads;
306 };
307
308 /* Multiqueue Slots */
309 typedef struct _MultiQueueSlot
310 {
311 guint id;
312
313 GstDecodebin3 *dbin;
314 /* Type of stream handled by this slot */
315 GstStreamType type;
316
317 /* Linked input and output */
318 DecodebinInputStream *input;
319
320 /* pending => last stream received on sink pad */
321 GstStream *pending_stream;
322 /* active => last stream outputted on source pad */
323 GstStream *active_stream;
324
325 GstPad *sink_pad, *src_pad;
326
327 /* id of the MQ src_pad event probe */
328 gulong probe_id;
329
330 gboolean is_drained;
331
332 DecodebinOutputStream *output;
333 } MultiQueueSlot;
334
335 /* Streams that are exposed downstream (i.e. output) */
336 struct _DecodebinOutputStream
337 {
338 GstDecodebin3 *dbin;
339 /* The type of stream handled by this output stream */
340 GstStreamType type;
341
342 /* The slot to which this output stream is currently connected to */
343 MultiQueueSlot *slot;
344
345 GstElement *decoder; /* Optional */
346 GstPad *decoder_sink, *decoder_src;
347 gboolean linked;
348
349 /* ghostpad */
350 GstPad *src_pad;
351 /* Flag if ghost pad is exposed */
352 gboolean src_exposed;
353
354 /* keyframe dropping probe */
355 gulong drop_probe_id;
356 };
357
358 /* Pending pads from parsebin */
359 typedef struct _PendingPad
360 {
361 GstDecodebin3 *dbin;
362 DecodebinInput *input;
363 GstPad *pad;
364
365 gulong buffer_probe;
366 gulong event_probe;
367 gboolean saw_eos;
368 } PendingPad;
369
370 /* properties */
371 enum
372 {
373 PROP_0,
374 PROP_CAPS
375 };
376
377 /* signals */
378 enum
379 {
380 SIGNAL_SELECT_STREAM,
381 SIGNAL_ABOUT_TO_FINISH,
382 LAST_SIGNAL
383 };
384 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
385
386 #define SELECTION_LOCK(dbin) G_STMT_START { \
387 GST_LOG_OBJECT (dbin, \
388 "selection locking from thread %p", \
389 g_thread_self ()); \
390 g_mutex_lock (&dbin->selection_lock); \
391 GST_LOG_OBJECT (dbin, \
392 "selection locked from thread %p", \
393 g_thread_self ()); \
394 } G_STMT_END
395
396 #define SELECTION_UNLOCK(dbin) G_STMT_START { \
397 GST_LOG_OBJECT (dbin, \
398 "selection unlocking from thread %p", \
399 g_thread_self ()); \
400 g_mutex_unlock (&dbin->selection_lock); \
401 } G_STMT_END
402
403 #define INPUT_LOCK(dbin) G_STMT_START { \
404 GST_LOG_OBJECT (dbin, \
405 "input locking from thread %p", \
406 g_thread_self ()); \
407 g_mutex_lock (&dbin->input_lock); \
408 GST_LOG_OBJECT (dbin, \
409 "input locked from thread %p", \
410 g_thread_self ()); \
411 } G_STMT_END
412
413 #define INPUT_UNLOCK(dbin) G_STMT_START { \
414 GST_LOG_OBJECT (dbin, \
415 "input unlocking from thread %p", \
416 g_thread_self ()); \
417 g_mutex_unlock (&dbin->input_lock); \
418 } G_STMT_END
419
420 GType gst_decodebin3_get_type (void);
421 #define gst_decodebin3_parent_class parent_class
422 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
423
424 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
425
426 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
427 GST_PAD_SINK,
428 GST_PAD_ALWAYS,
429 GST_STATIC_CAPS_ANY);
430
431 static GstStaticPadTemplate request_sink_template =
432 GST_STATIC_PAD_TEMPLATE ("sink_%u",
433 GST_PAD_SINK,
434 GST_PAD_REQUEST,
435 GST_STATIC_CAPS_ANY);
436
437 static GstStaticPadTemplate video_src_template =
438 GST_STATIC_PAD_TEMPLATE ("video_%u",
439 GST_PAD_SRC,
440 GST_PAD_SOMETIMES,
441 GST_STATIC_CAPS_ANY);
442
443 static GstStaticPadTemplate audio_src_template =
444 GST_STATIC_PAD_TEMPLATE ("audio_%u",
445 GST_PAD_SRC,
446 GST_PAD_SOMETIMES,
447 GST_STATIC_CAPS_ANY);
448
449 static GstStaticPadTemplate text_src_template =
450 GST_STATIC_PAD_TEMPLATE ("text_%u",
451 GST_PAD_SRC,
452 GST_PAD_SOMETIMES,
453 GST_STATIC_CAPS_ANY);
454
455 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
456 GST_PAD_SRC,
457 GST_PAD_SOMETIMES,
458 GST_STATIC_CAPS_ANY);
459
460
461 static void gst_decodebin3_dispose (GObject * object);
462 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
463 const GValue * value, GParamSpec * pspec);
464 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
465 GValue * value, GParamSpec * pspec);
466
467 static gboolean parsebin_autoplug_continue_cb (GstElement *
468 parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
469
470 static gint
gst_decodebin3_select_stream(GstDecodebin3 * dbin,GstStreamCollection * collection,GstStream * stream)471 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
472 GstStreamCollection * collection, GstStream * stream)
473 {
474 GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
475
476 return -1;
477 }
478
479 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
480 GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
481 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
482 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
483 GstStateChange transition);
484 static gboolean gst_decodebin3_send_event (GstElement * element,
485 GstEvent * event);
486
487 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
488 #if 0
489 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
490 GstElementFactoryListType ftype);
491 #endif
492
493 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
494 static void free_input_async (GstDecodebin3 * dbin, DecodebinInput * input);
495 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
496 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
497
498 static void reconfigure_output_stream (DecodebinOutputStream * output,
499 MultiQueueSlot * slot);
500 static void free_output_stream (GstDecodebin3 * dbin,
501 DecodebinOutputStream * output);
502 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
503 GstStreamType type);
504
505 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
506 GstPadProbeInfo * info, MultiQueueSlot * slot);
507 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
508 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
509 DecodebinInputStream * input);
510 static void link_input_to_slot (DecodebinInputStream * input,
511 MultiQueueSlot * slot);
512 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
513 static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
514 MultiQueueSlot * slot);
515
516 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
517 static void update_requested_selection (GstDecodebin3 * dbin);
518
519 /* FIXME: Really make all the parser stuff a self-contained helper object */
520 #include "gstdecodebin3-parse.c"
521
522 static gboolean
_gst_int_accumulator(GSignalInvocationHint * ihint,GValue * return_accu,const GValue * handler_return,gpointer dummy)523 _gst_int_accumulator (GSignalInvocationHint * ihint,
524 GValue * return_accu, const GValue * handler_return, gpointer dummy)
525 {
526 gint res = g_value_get_int (handler_return);
527
528 if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP))
529 g_value_set_int (return_accu, res);
530
531 if (res == -1)
532 return TRUE;
533
534 return FALSE;
535 }
536
537 static void
gst_decodebin3_class_init(GstDecodebin3Class * klass)538 gst_decodebin3_class_init (GstDecodebin3Class * klass)
539 {
540 GObjectClass *gobject_klass = (GObjectClass *) klass;
541 GstElementClass *element_class = (GstElementClass *) klass;
542 GstBinClass *bin_klass = (GstBinClass *) klass;
543
544 gobject_klass->dispose = gst_decodebin3_dispose;
545 gobject_klass->set_property = gst_decodebin3_set_property;
546 gobject_klass->get_property = gst_decodebin3_get_property;
547
548 /* FIXME : ADD PROPERTIES ! */
549 g_object_class_install_property (gobject_klass, PROP_CAPS,
550 g_param_spec_boxed ("caps", "Caps",
551 "The caps on which to stop decoding. (NULL = default)",
552 GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
553
554 /* FIXME : ADD SIGNALS ! */
555 /**
556 * GstDecodebin3::select-stream
557 * @decodebin: a #GstDecodebin3
558 * @collection: a #GstStreamCollection
559 * @stream: a #GstStream
560 *
561 * This signal is emitted whenever @decodebin needs to decide whether
562 * to expose a @stream of a given @collection.
563 *
564 * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
565 * A value of -1 (default) lets @decodebin decide what to do with the stream.
566 * */
567 gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
568 g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
569 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
570 _gst_int_accumulator, NULL, g_cclosure_marshal_generic,
571 G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
572
573 /**
574 * GstDecodebin3::about-to-finish:
575 *
576 * This signal is emitted when the data for the selected URI is
577 * entirely buffered and it is safe to specify anothe URI.
578 */
579 gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH] =
580 g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
581 G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE,
582 0, G_TYPE_NONE);
583
584
585 element_class->request_new_pad =
586 GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
587 element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
588 element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
589
590 gst_element_class_add_pad_template (element_class,
591 gst_static_pad_template_get (&sink_template));
592 gst_element_class_add_pad_template (element_class,
593 gst_static_pad_template_get (&request_sink_template));
594 gst_element_class_add_pad_template (element_class,
595 gst_static_pad_template_get (&video_src_template));
596 gst_element_class_add_pad_template (element_class,
597 gst_static_pad_template_get (&audio_src_template));
598 gst_element_class_add_pad_template (element_class,
599 gst_static_pad_template_get (&text_src_template));
600 gst_element_class_add_pad_template (element_class,
601 gst_static_pad_template_get (&src_template));
602
603 gst_element_class_set_static_metadata (element_class,
604 "Decoder Bin 3", "Generic/Bin/Decoder",
605 "Autoplug and decode to raw media",
606 "Edward Hervey <edward@centricular.com>");
607
608 bin_klass->handle_message = gst_decodebin3_handle_message;
609
610 klass->select_stream = gst_decodebin3_select_stream;
611 }
612
613 static void
gst_decodebin3_init(GstDecodebin3 * dbin)614 gst_decodebin3_init (GstDecodebin3 * dbin)
615 {
616 /* Create main input */
617 dbin->main_input = create_new_input (dbin, TRUE);
618
619 dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
620 g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
621 "max-size-buffers", 0, "use-interleave", TRUE, NULL);
622 gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
623
624 dbin->current_group_id = GST_GROUP_ID_INVALID;
625
626 g_mutex_init (&dbin->factories_lock);
627 g_mutex_init (&dbin->selection_lock);
628 g_mutex_init (&dbin->input_lock);
629
630 dbin->caps = gst_static_caps_get (&default_raw_caps);
631
632 GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE);
633 }
634
635 static void
gst_decodebin3_dispose(GObject * object)636 gst_decodebin3_dispose (GObject * object)
637 {
638 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
639 GList *walk, *next;
640
641 if (dbin->factories)
642 gst_plugin_feature_list_free (dbin->factories);
643 if (dbin->decoder_factories)
644 g_list_free (dbin->decoder_factories);
645 if (dbin->decodable_factories)
646 g_list_free (dbin->decodable_factories);
647 g_list_free_full (dbin->requested_selection, g_free);
648 g_list_free (dbin->active_selection);
649 g_list_free (dbin->to_activate);
650 g_list_free (dbin->pending_select_streams);
651 g_clear_object (&dbin->collection);
652
653 free_input (dbin, dbin->main_input);
654
655 for (walk = dbin->other_inputs; walk; walk = next) {
656 DecodebinInput *input = walk->data;
657
658 next = g_list_next (walk);
659
660 free_input (dbin, input);
661 dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk);
662 }
663
664 G_OBJECT_CLASS (parent_class)->dispose (object);
665 }
666
667 static void
gst_decodebin3_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)668 gst_decodebin3_set_property (GObject * object, guint prop_id,
669 const GValue * value, GParamSpec * pspec)
670 {
671 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
672
673 /* FIXME : IMPLEMENT */
674 switch (prop_id) {
675 case PROP_CAPS:
676 GST_OBJECT_LOCK (dbin);
677 if (dbin->caps)
678 gst_caps_unref (dbin->caps);
679 dbin->caps = g_value_dup_boxed (value);
680 GST_OBJECT_UNLOCK (dbin);
681 break;
682 default:
683 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
684 break;
685 }
686 }
687
688 static void
gst_decodebin3_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)689 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
690 GParamSpec * pspec)
691 {
692 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
693
694 /* FIXME : IMPLEMENT */
695 switch (prop_id) {
696 case PROP_CAPS:
697 GST_OBJECT_LOCK (dbin);
698 g_value_set_boxed (value, dbin->caps);
699 GST_OBJECT_UNLOCK (dbin);
700 break;
701 default:
702 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
703 break;
704 }
705 }
706
707 static gboolean
parsebin_autoplug_continue_cb(GstElement * parsebin,GstPad * pad,GstCaps * caps,GstDecodebin3 * dbin)708 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
709 GstCaps * caps, GstDecodebin3 * dbin)
710 {
711 GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
712
713 /* If it matches our target caps, expose it */
714 if (gst_caps_can_intersect (caps, dbin->caps))
715 return FALSE;
716
717 return TRUE;
718 }
719
720 /* This method should be called whenever a STREAM_START event
721 * comes out of a given parsebin.
722 * The caller shall replace the group_id if the function returns TRUE */
723 static gboolean
set_input_group_id(DecodebinInput * input,guint32 * group_id)724 set_input_group_id (DecodebinInput * input, guint32 * group_id)
725 {
726 GstDecodebin3 *dbin = input->dbin;
727
728 if (input->group_id != *group_id) {
729 if (input->group_id != GST_GROUP_ID_INVALID)
730 GST_WARNING_OBJECT (dbin,
731 "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
732 ") on input %p ", input->group_id, *group_id, input);
733 input->group_id = *group_id;
734 }
735
736 if (*group_id != dbin->current_group_id) {
737 if (dbin->current_group_id == GST_GROUP_ID_INVALID) {
738 GST_DEBUG_OBJECT (dbin, "Setting current group id to %" G_GUINT32_FORMAT,
739 *group_id);
740 dbin->current_group_id = *group_id;
741 }
742 *group_id = dbin->current_group_id;
743 return TRUE;
744 }
745
746 return FALSE;
747 }
748
749 static void
parsebin_drained_cb(GstElement * parsebin,DecodebinInput * input)750 parsebin_drained_cb (GstElement * parsebin, DecodebinInput * input)
751 {
752 GstDecodebin3 *dbin = input->dbin;
753 gboolean all_drained;
754 GList *tmp;
755
756 GST_WARNING_OBJECT (dbin, "input %p drained", input);
757 input->drained = TRUE;
758
759 all_drained = dbin->main_input->drained;
760 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
761 DecodebinInput *data = (DecodebinInput *) tmp->data;
762
763 all_drained &= data->drained;
764 }
765
766 if (all_drained) {
767 GST_WARNING_OBJECT (dbin, "All inputs drained. Posting about-to-finish");
768 g_signal_emit (dbin, gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH], 0,
769 NULL);
770 }
771 }
772
773 /* Call with INPUT_LOCK taken */
774 static gboolean
ensure_input_parsebin(GstDecodebin3 * dbin,DecodebinInput * input)775 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
776 {
777 gboolean set_state = FALSE;
778
779 if (input->parsebin == NULL) {
780 input->parsebin = gst_element_factory_make ("parsebin", NULL);
781 if (input->parsebin == NULL)
782 goto no_parsebin;
783 input->parsebin = gst_object_ref (input->parsebin);
784 input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
785 input->pad_added_sigid =
786 g_signal_connect (input->parsebin, "pad-added",
787 (GCallback) parsebin_pad_added_cb, input);
788 input->pad_removed_sigid =
789 g_signal_connect (input->parsebin, "pad-removed",
790 (GCallback) parsebin_pad_removed_cb, input);
791 input->drained_sigid =
792 g_signal_connect (input->parsebin, "drained",
793 (GCallback) parsebin_drained_cb, input);
794 g_signal_connect (input->parsebin, "autoplug-continue",
795 (GCallback) parsebin_autoplug_continue_cb, dbin);
796 }
797
798 if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
799 gst_bin_add (GST_BIN (dbin), input->parsebin);
800 set_state = TRUE;
801 }
802
803 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
804 input->parsebin_sink);
805 if (set_state)
806 gst_element_sync_state_with_parent (input->parsebin);
807
808 return TRUE;
809
810 /* ERRORS */
811 no_parsebin:
812 {
813 gst_element_post_message ((GstElement *) dbin,
814 gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
815 return FALSE;
816 }
817 }
818
819 static GstPadLinkReturn
gst_decodebin3_input_pad_link(GstPad * pad,GstObject * parent,GstPad * peer)820 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
821 {
822 GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
823 GstPadLinkReturn res = GST_PAD_LINK_OK;
824 DecodebinInput *input;
825
826 GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT
827 ". Creating parsebin if needed", pad);
828
829 if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
830 goto fail;
831
832 INPUT_LOCK (dbin);
833 if (!ensure_input_parsebin (dbin, input))
834 res = GST_PAD_LINK_REFUSED;
835 INPUT_UNLOCK (dbin);
836
837 return res;
838 fail:
839 GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
840 return GST_PAD_LINK_REFUSED;
841 }
842
843 /* Drop duration query during _input_pad_unlink */
844 static GstPadProbeReturn
query_duration_drop_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinInput * input)845 query_duration_drop_probe (GstPad * pad, GstPadProbeInfo * info,
846 DecodebinInput * input)
847 {
848 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
849
850 if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
851 GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
852 if (GST_QUERY_TYPE (query) == GST_QUERY_DURATION) {
853 GST_LOG_OBJECT (pad, "stop forwarding query duration");
854 ret = GST_PAD_PROBE_HANDLED;
855 }
856 }
857
858 return ret;
859 }
860
861 static void
gst_decodebin3_input_pad_unlink(GstPad * pad,GstObject * parent)862 gst_decodebin3_input_pad_unlink (GstPad * pad, GstObject * parent)
863 {
864 GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
865 DecodebinInput *input;
866
867 GST_LOG_OBJECT (parent, "Got unlink on input pad %" GST_PTR_FORMAT
868 ". Removing parsebin.", pad);
869
870 if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
871 goto fail;
872
873 INPUT_LOCK (dbin);
874 if (input->parsebin == NULL) {
875 INPUT_UNLOCK (dbin);
876 return;
877 }
878
879 if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) == GST_OBJECT (dbin)) {
880 GstStreamCollection *collection = NULL;
881 gulong probe_id = gst_pad_add_probe (input->parsebin_sink,
882 GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
883 (GstPadProbeCallback) query_duration_drop_probe, input, NULL);
884
885 /* Clear stream-collection corresponding to current INPUT and post new
886 * stream-collection message, if needed */
887 if (input->collection) {
888 gst_object_unref (input->collection);
889 input->collection = NULL;
890 }
891
892 SELECTION_LOCK (dbin);
893 collection = get_merged_collection (dbin);
894 if (collection && collection != dbin->collection) {
895 GstMessage *msg;
896 GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
897
898 if (dbin->collection)
899 gst_object_unref (dbin->collection);
900 dbin->collection = collection;
901
902 msg =
903 gst_message_new_stream_collection ((GstObject *) dbin,
904 dbin->collection);
905
906 SELECTION_UNLOCK (dbin);
907 gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
908 update_requested_selection (dbin);
909 } else
910 SELECTION_UNLOCK (dbin);
911
912 gst_bin_remove (GST_BIN (dbin), input->parsebin);
913 gst_element_set_state (input->parsebin, GST_STATE_NULL);
914 g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
915 g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
916 g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
917 gst_pad_remove_probe (input->parsebin_sink, probe_id);
918 gst_object_unref (input->parsebin);
919 gst_object_unref (input->parsebin_sink);
920
921 input->parsebin = NULL;
922 input->parsebin_sink = NULL;
923
924 if (!input->is_main) {
925 dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
926 free_input_async (dbin, input);
927 }
928 }
929 INPUT_UNLOCK (dbin);
930 return;
931
932 fail:
933 GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
934 return;
935 }
936
937 static void
free_input(GstDecodebin3 * dbin,DecodebinInput * input)938 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
939 {
940 GST_DEBUG ("Freeing input %p", input);
941 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
942 gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
943 if (input->parsebin) {
944 g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
945 g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
946 g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
947 gst_element_set_state (input->parsebin, GST_STATE_NULL);
948 gst_object_unref (input->parsebin);
949 gst_object_unref (input->parsebin_sink);
950 }
951 if (input->collection)
952 gst_object_unref (input->collection);
953 g_free (input);
954 }
955
956 static void
free_input_async(GstDecodebin3 * dbin,DecodebinInput * input)957 free_input_async (GstDecodebin3 * dbin, DecodebinInput * input)
958 {
959 GST_LOG_OBJECT (dbin, "pushing input %p on thread pool to free", input);
960 gst_element_call_async (GST_ELEMENT_CAST (dbin),
961 (GstElementCallAsyncFunc) free_input, input, NULL);
962 }
963
964 /* Call with INPUT_LOCK taken */
965 static DecodebinInput *
create_new_input(GstDecodebin3 * dbin,gboolean main)966 create_new_input (GstDecodebin3 * dbin, gboolean main)
967 {
968 DecodebinInput *input;
969
970 input = g_new0 (DecodebinInput, 1);
971 input->dbin = dbin;
972 input->is_main = main;
973 input->group_id = GST_GROUP_ID_INVALID;
974 if (main)
975 input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
976 else {
977 gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
978 input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
979 g_free (pad_name);
980 }
981 g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
982 gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
983 gst_pad_set_unlink_function (input->ghost_sink,
984 gst_decodebin3_input_pad_unlink);
985
986 gst_pad_set_active (input->ghost_sink, TRUE);
987 gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
988
989 return input;
990
991 }
992
993 static GstPad *
gst_decodebin3_request_new_pad(GstElement * element,GstPadTemplate * temp,const gchar * name,const GstCaps * caps)994 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
995 const gchar * name, const GstCaps * caps)
996 {
997 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
998 DecodebinInput *input;
999 GstPad *res = NULL;
1000
1001 /* We are ignoring names for the time being, not sure it makes any sense
1002 * within the context of decodebin3 ... */
1003 input = create_new_input (dbin, FALSE);
1004 if (input) {
1005 INPUT_LOCK (dbin);
1006 dbin->other_inputs = g_list_append (dbin->other_inputs, input);
1007 res = input->ghost_sink;
1008 INPUT_UNLOCK (dbin);
1009 }
1010
1011 return res;
1012 }
1013
1014 /* Must be called with factories lock! */
1015 static void
gst_decode_bin_update_factories_list(GstDecodebin3 * dbin)1016 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
1017 {
1018 guint cookie;
1019
1020 cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
1021 if (!dbin->factories || dbin->factories_cookie != cookie) {
1022 GList *tmp;
1023 if (dbin->factories)
1024 gst_plugin_feature_list_free (dbin->factories);
1025 if (dbin->decoder_factories)
1026 g_list_free (dbin->decoder_factories);
1027 if (dbin->decodable_factories)
1028 g_list_free (dbin->decodable_factories);
1029 dbin->factories =
1030 gst_element_factory_list_get_elements
1031 (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
1032 dbin->factories =
1033 g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
1034 dbin->factories_cookie = cookie;
1035
1036 /* Filter decoder and other decodables */
1037 dbin->decoder_factories = NULL;
1038 dbin->decodable_factories = NULL;
1039 for (tmp = dbin->factories; tmp; tmp = tmp->next) {
1040 GstElementFactory *fact = (GstElementFactory *) tmp->data;
1041 if (gst_element_factory_list_is_type (fact,
1042 GST_ELEMENT_FACTORY_TYPE_DECODER))
1043 dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
1044 else
1045 dbin->decodable_factories =
1046 g_list_append (dbin->decodable_factories, fact);
1047 }
1048 }
1049 }
1050
1051 /* Must be called with appropriate lock if list is a protected variable */
1052 static const gchar *
stream_in_list(GList * list,const gchar * sid)1053 stream_in_list (GList * list, const gchar * sid)
1054 {
1055 GList *tmp;
1056
1057 #if EXTRA_DEBUG
1058 for (tmp = list; tmp; tmp = tmp->next) {
1059 gchar *osid = (gchar *) tmp->data;
1060 GST_DEBUG ("Checking %s against %s", sid, osid);
1061 }
1062 #endif
1063
1064 for (tmp = list; tmp; tmp = tmp->next) {
1065 const gchar *osid = (gchar *) tmp->data;
1066 if (!g_strcmp0 (sid, osid))
1067 return osid;
1068 }
1069
1070 return NULL;
1071 }
1072
1073 static void
update_requested_selection(GstDecodebin3 * dbin)1074 update_requested_selection (GstDecodebin3 * dbin)
1075 {
1076 guint i, nb;
1077 GList *tmp = NULL;
1078 GstStreamType used_types = 0;
1079 GstStreamCollection *collection;
1080
1081 /* 1. Is there a pending SELECT_STREAMS we can return straight away since
1082 * the switch handler will take care of the pending selection */
1083 SELECTION_LOCK (dbin);
1084 if (dbin->pending_select_streams) {
1085 GST_DEBUG_OBJECT (dbin,
1086 "No need to create pending selection, SELECT_STREAMS underway");
1087 goto beach;
1088 }
1089
1090 collection = dbin->collection;
1091 if (G_UNLIKELY (collection == NULL)) {
1092 GST_DEBUG_OBJECT (dbin, "No current GstStreamCollection");
1093 goto beach;
1094 }
1095 nb = gst_stream_collection_get_size (collection);
1096
1097 /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
1098 GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
1099
1100 /* 3. If not, check if we already have some of the streams in the
1101 * existing active/requested selection */
1102 for (i = 0; i < nb; i++) {
1103 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1104 const gchar *sid = gst_stream_get_stream_id (stream);
1105 gint request = -1;
1106 /* Fire select-stream signal to see if outside components want to
1107 * hint at which streams should be selected */
1108 g_signal_emit (G_OBJECT (dbin),
1109 gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
1110 &request);
1111 GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
1112 if (request == 1 || (request == -1
1113 && (stream_in_list (dbin->requested_selection, sid)
1114 || stream_in_list (dbin->active_selection, sid)))) {
1115 GstStreamType curtype = gst_stream_get_stream_type (stream);
1116 if (request == 1)
1117 GST_DEBUG_OBJECT (dbin,
1118 "Using stream requested by 'select-stream' signal : %s", sid);
1119 else
1120 GST_DEBUG_OBJECT (dbin,
1121 "Re-using stream already present in requested or active selection : %s",
1122 sid);
1123 tmp = g_list_append (tmp, (gchar *) sid);
1124 used_types |= curtype;
1125 }
1126 }
1127
1128 /* 4. If not, match one stream of each type */
1129 for (i = 0; i < nb; i++) {
1130 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1131 GstStreamType curtype = gst_stream_get_stream_type (stream);
1132 if (!(used_types & curtype)) {
1133 const gchar *sid = gst_stream_get_stream_id (stream);
1134 GST_DEBUG_OBJECT (dbin, "Selecting stream '%s' of type %s",
1135 sid, gst_stream_type_get_name (curtype));
1136 tmp = g_list_append (tmp, (gchar *) sid);
1137 used_types |= curtype;
1138 }
1139 }
1140
1141 beach:
1142 /* Finally set the requested selection */
1143 if (tmp) {
1144 if (dbin->requested_selection) {
1145 GST_FIXME_OBJECT (dbin,
1146 "Replacing non-NULL requested_selection, what should we do ??");
1147 g_list_free_full (dbin->requested_selection, g_free);
1148 }
1149 dbin->requested_selection =
1150 g_list_copy_deep (tmp, (GCopyFunc) g_strdup, NULL);
1151 dbin->selection_updated = TRUE;
1152 g_list_free (tmp);
1153 }
1154 SELECTION_UNLOCK (dbin);
1155 }
1156
1157 /* sort_streams:
1158 * GCompareFunc to use with lists of GstStream.
1159 * Sorts GstStreams by stream type and SELECT flag and stream-id
1160 * First video, then audio, then others.
1161 *
1162 * Return: negative if a<b, 0 if a==b, positive if a>b
1163 */
1164 static gint
sort_streams(GstStream * sa,GstStream * sb)1165 sort_streams (GstStream * sa, GstStream * sb)
1166 {
1167 GstStreamType typea, typeb;
1168 GstStreamFlags flaga, flagb;
1169 const gchar *ida, *idb;
1170 gint ret = 0;
1171
1172 typea = gst_stream_get_stream_type (sa);
1173 typeb = gst_stream_get_stream_type (sb);
1174
1175 GST_LOG ("sa(%s), sb(%s)", gst_stream_get_stream_id (sa),
1176 gst_stream_get_stream_id (sb));
1177
1178 /* Sort by stream type. First video, then audio, then others(text, container, unknown) */
1179 if (typea != typeb) {
1180 if (typea & GST_STREAM_TYPE_VIDEO)
1181 ret = -1;
1182 else if (typea & GST_STREAM_TYPE_AUDIO)
1183 ret = (!(typeb & GST_STREAM_TYPE_VIDEO)) ? -1 : 1;
1184 else if (typea & GST_STREAM_TYPE_TEXT)
1185 ret = (!(typeb & GST_STREAM_TYPE_VIDEO)
1186 && !(typeb & GST_STREAM_TYPE_AUDIO)) ? -1 : 1;
1187 else if (typea & GST_STREAM_TYPE_CONTAINER)
1188 ret = (typeb & GST_STREAM_TYPE_UNKNOWN) ? -1 : 1;
1189 else
1190 ret = 1;
1191
1192 if (ret != 0) {
1193 GST_LOG ("Sort by stream-type: %d", ret);
1194 return ret;
1195 }
1196 }
1197
1198 /* Sort by SELECT flag, if stream type is same. */
1199 flaga = gst_stream_get_stream_flags (sa);
1200 flagb = gst_stream_get_stream_flags (sb);
1201
1202 ret =
1203 (flaga & GST_STREAM_FLAG_SELECT) ? ((flagb & GST_STREAM_FLAG_SELECT) ? 0 :
1204 -1) : ((flagb & GST_STREAM_FLAG_SELECT) ? 1 : 0);
1205
1206 if (ret != 0) {
1207 GST_LOG ("Sort by SELECT flag: %d", ret);
1208 return ret;
1209 }
1210
1211 /* Sort by stream-id, if otherwise the same. */
1212 ida = gst_stream_get_stream_id (sa);
1213 idb = gst_stream_get_stream_id (sb);
1214 ret = g_strcmp0 (ida, idb);
1215
1216 GST_LOG ("Sort by stream-id: %d", ret);
1217
1218 return ret;
1219 }
1220
1221 /* Call with INPUT_LOCK taken */
1222 static GstStreamCollection *
get_merged_collection(GstDecodebin3 * dbin)1223 get_merged_collection (GstDecodebin3 * dbin)
1224 {
1225 gboolean needs_merge = FALSE;
1226 GstStreamCollection *res = NULL;
1227 GList *tmp;
1228 GList *unsorted_streams = NULL;
1229 guint i, nb_stream;
1230
1231 /* First check if we need to do a merge or just return the only collection */
1232 res = dbin->main_input->collection;
1233
1234 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1235 DecodebinInput *input = (DecodebinInput *) tmp->data;
1236 if (input->collection) {
1237 if (res) {
1238 needs_merge = TRUE;
1239 break;
1240 }
1241 res = input->collection;
1242 }
1243 }
1244
1245 if (!needs_merge) {
1246 GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1247 return res ? gst_object_ref (res) : NULL;
1248 }
1249
1250 /* We really need to create a new collection */
1251 /* FIXME : Some numbering scheme maybe ?? */
1252 res = gst_stream_collection_new ("decodebin3");
1253 if (dbin->main_input->collection) {
1254 nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1255 GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1256 for (i = 0; i < nb_stream; i++) {
1257 GstStream *stream =
1258 gst_stream_collection_get_stream (dbin->main_input->collection, i);
1259 unsorted_streams = g_list_append (unsorted_streams, stream);
1260 }
1261 }
1262
1263 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1264 DecodebinInput *input = (DecodebinInput *) tmp->data;
1265 GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1266 input->collection);
1267 if (input->collection) {
1268 nb_stream = gst_stream_collection_get_size (input->collection);
1269 GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1270 for (i = 0; i < nb_stream; i++) {
1271 GstStream *stream =
1272 gst_stream_collection_get_stream (input->collection, i);
1273 unsorted_streams = g_list_append (unsorted_streams, stream);
1274 }
1275 }
1276 }
1277
1278 /* re-order streams : video, then audio, then others */
1279 unsorted_streams =
1280 g_list_sort (unsorted_streams, (GCompareFunc) sort_streams);
1281 for (tmp = unsorted_streams; tmp; tmp = tmp->next) {
1282 GstStream *stream = (GstStream *) tmp->data;
1283 GST_DEBUG_OBJECT (dbin, "Adding #stream(%s) to collection",
1284 gst_stream_get_stream_id (stream));
1285 gst_stream_collection_add_stream (res, gst_object_ref (stream));
1286 }
1287
1288 if (unsorted_streams)
1289 g_list_free (unsorted_streams);
1290
1291 return res;
1292 }
1293
1294 /* Call with INPUT_LOCK taken */
1295 static DecodebinInput *
find_message_parsebin(GstDecodebin3 * dbin,GstElement * child)1296 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1297 {
1298 DecodebinInput *input = NULL;
1299 GstElement *parent = gst_object_ref (child);
1300 GList *tmp;
1301
1302 do {
1303 GstElement *next_parent;
1304
1305 GST_DEBUG_OBJECT (dbin, "parent %s",
1306 parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1307
1308 if (parent == dbin->main_input->parsebin) {
1309 input = dbin->main_input;
1310 break;
1311 }
1312 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1313 DecodebinInput *cur = (DecodebinInput *) tmp->data;
1314 if (parent == cur->parsebin) {
1315 input = cur;
1316 break;
1317 }
1318 }
1319 next_parent = (GstElement *) gst_element_get_parent (parent);
1320 gst_object_unref (parent);
1321 parent = next_parent;
1322
1323 } while (parent && parent != (GstElement *) dbin);
1324
1325 if (parent)
1326 gst_object_unref (parent);
1327
1328 return input;
1329 }
1330
1331 static const gchar *
stream_in_collection(GstDecodebin3 * dbin,gchar * sid)1332 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1333 {
1334 guint i, len;
1335
1336 if (dbin->collection == NULL)
1337 return NULL;
1338 len = gst_stream_collection_get_size (dbin->collection);
1339 for (i = 0; i < len; i++) {
1340 GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1341 const gchar *osid = gst_stream_get_stream_id (stream);
1342 if (!g_strcmp0 (sid, osid))
1343 return osid;
1344 }
1345
1346 return NULL;
1347 }
1348
1349 /* Call with INPUT_LOCK taken */
1350 static void
handle_stream_collection(GstDecodebin3 * dbin,GstStreamCollection * collection,GstElement * child)1351 handle_stream_collection (GstDecodebin3 * dbin,
1352 GstStreamCollection * collection, GstElement * child)
1353 {
1354 #ifndef GST_DISABLE_GST_DEBUG
1355 const gchar *upstream_id;
1356 guint i;
1357 #endif
1358 DecodebinInput *input = find_message_parsebin (dbin, child);
1359
1360 if (!input) {
1361 GST_DEBUG_OBJECT (dbin,
1362 "Couldn't find corresponding input, most likely shutting down");
1363 return;
1364 }
1365
1366 /* Replace collection in input */
1367 if (input->collection)
1368 gst_object_unref (input->collection);
1369 input->collection = gst_object_ref (collection);
1370 GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1371 input);
1372
1373 /* Merge collection if needed */
1374 collection = get_merged_collection (dbin);
1375
1376 #ifndef GST_DISABLE_GST_DEBUG
1377 /* Just some debugging */
1378 upstream_id = gst_stream_collection_get_upstream_id (collection);
1379 GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1380 GST_DEBUG ("From input %p", input);
1381 GST_DEBUG (" %d streams", gst_stream_collection_get_size (collection));
1382 for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1383 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1384 GstTagList *taglist;
1385 GstCaps *caps;
1386
1387 GST_DEBUG (" Stream '%s'", gst_stream_get_stream_id (stream));
1388 GST_DEBUG (" type : %s",
1389 gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1390 GST_DEBUG (" flags : 0x%x", gst_stream_get_stream_flags (stream));
1391 taglist = gst_stream_get_tags (stream);
1392 GST_DEBUG (" tags : %" GST_PTR_FORMAT, taglist);
1393 caps = gst_stream_get_caps (stream);
1394 GST_DEBUG (" caps : %" GST_PTR_FORMAT, caps);
1395 if (taglist)
1396 gst_tag_list_unref (taglist);
1397 if (caps)
1398 gst_caps_unref (caps);
1399 }
1400 #endif
1401
1402 /* Store collection for later usage */
1403 SELECTION_LOCK (dbin);
1404 if (dbin->collection == NULL) {
1405 dbin->collection = collection;
1406 } else {
1407 /* We need to check who emitted this collection (the owner).
1408 * If we already had a collection from that user, this one is an update,
1409 * that is to say that we need to figure out how we are going to re-use
1410 * the streams/slot */
1411 GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1412 /* FIXME : When do we switch from pending collection to active collection ?
1413 * When all streams from active collection are drained in multiqueue output ? */
1414 gst_object_unref (dbin->collection);
1415 dbin->collection = collection;
1416 /* dbin->pending_collection = */
1417 /* g_list_append (dbin->pending_collection, collection); */
1418 }
1419 SELECTION_UNLOCK (dbin);
1420 }
1421
1422 static void
gst_decodebin3_handle_message(GstBin * bin,GstMessage * message)1423 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1424 {
1425 GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1426 gboolean posting_collection = FALSE;
1427
1428 GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1429
1430 switch (GST_MESSAGE_TYPE (message)) {
1431 case GST_MESSAGE_STREAM_COLLECTION:
1432 {
1433 GstStreamCollection *collection = NULL;
1434 gst_message_parse_stream_collection (message, &collection);
1435 if (collection) {
1436 INPUT_LOCK (dbin);
1437 handle_stream_collection (dbin, collection,
1438 (GstElement *) GST_MESSAGE_SRC (message));
1439 posting_collection = TRUE;
1440 INPUT_UNLOCK (dbin);
1441 }
1442
1443 SELECTION_LOCK (dbin);
1444 if (dbin->collection && collection != dbin->collection) {
1445 /* Replace collection message, we most likely aggregated it */
1446 GstMessage *new_msg;
1447 new_msg =
1448 gst_message_new_stream_collection ((GstObject *) dbin,
1449 dbin->collection);
1450 gst_message_unref (message);
1451 message = new_msg;
1452 }
1453 SELECTION_UNLOCK (dbin);
1454
1455 if (collection)
1456 gst_object_unref (collection);
1457 break;
1458 }
1459 default:
1460 break;
1461 }
1462
1463 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1464
1465 if (posting_collection) {
1466 /* Figure out a selection for that collection */
1467 update_requested_selection (dbin);
1468 }
1469 }
1470
1471 static DecodebinOutputStream *
find_free_compatible_output(GstDecodebin3 * dbin,GstStream * stream)1472 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
1473 {
1474 GList *tmp;
1475 GstStreamType stype = gst_stream_get_stream_type (stream);
1476
1477 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1478 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1479 if (output->type == stype && output->slot && output->slot->active_stream) {
1480 GstStream *tstream = output->slot->active_stream;
1481 if (!stream_in_list (dbin->requested_selection,
1482 (gchar *) gst_stream_get_stream_id (tstream))) {
1483 return output;
1484 }
1485 }
1486 }
1487
1488 return NULL;
1489 }
1490
1491 /* Give a certain slot, figure out if it should be linked to an
1492 * output stream
1493 * CALL WITH SELECTION LOCK TAKEN !*/
1494 static DecodebinOutputStream *
get_output_for_slot(MultiQueueSlot * slot)1495 get_output_for_slot (MultiQueueSlot * slot)
1496 {
1497 GstDecodebin3 *dbin = slot->dbin;
1498 DecodebinOutputStream *output = NULL;
1499 const gchar *stream_id;
1500 GstCaps *caps;
1501 gchar *id_in_list = NULL;
1502
1503 /* If we already have a configured output, just use it */
1504 if (slot->output != NULL)
1505 return slot->output;
1506
1507 /*
1508 * FIXME
1509 *
1510 * This method needs to be split into multiple parts
1511 *
1512 * 1) Figure out whether stream should be exposed or not
1513 * This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
1514 * in the default stream attribution
1515 *
1516 * 2) Figure out whether an output stream should be created, whether
1517 * we can re-use the output stream already linked to the slot, or
1518 * whether we need to get re-assigned another (currently used) output
1519 * stream.
1520 */
1521
1522 stream_id = gst_stream_get_stream_id (slot->active_stream);
1523 caps = gst_stream_get_caps (slot->active_stream);
1524 GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
1525 gst_caps_unref (caps);
1526
1527 /* 0. Emit autoplug-continue signal for pending caps ? */
1528 GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
1529
1530 /* 1. if in EXPOSE_ALL_MODE, just accept */
1531 GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
1532
1533 #if 0
1534 /* FIXME : The idea around this was to avoid activating a stream for
1535 * which we have no decoder. Unfortunately it is way too
1536 * expensive. Need to figure out a better solution */
1537 /* 2. Is there a potential decoder (if one is required) */
1538 if (!gst_caps_can_intersect (caps, dbin->caps)
1539 && !have_factory (dbin, (GstCaps *) caps,
1540 GST_ELEMENT_FACTORY_TYPE_DECODER)) {
1541 GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
1542 caps);
1543 SELECTION_UNLOCK (dbin);
1544 gst_element_post_message (GST_ELEMENT_CAST (dbin),
1545 gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
1546 SELECTION_LOCK (dbin);
1547 return NULL;
1548 }
1549 #endif
1550
1551 /* 3. In default mode check if we should expose */
1552 id_in_list = (gchar *) stream_in_list (dbin->requested_selection, stream_id);
1553 if (id_in_list) {
1554 /* Check if we can steal an existing output stream we could re-use.
1555 * that is:
1556 * * an output stream whose slot->stream is not in requested
1557 * * and is of the same type as this stream
1558 */
1559 output = find_free_compatible_output (dbin, slot->active_stream);
1560 if (output) {
1561 /* Move this output from its current slot to this slot */
1562 dbin->to_activate =
1563 g_list_append (dbin->to_activate, (gchar *) stream_id);
1564 dbin->requested_selection =
1565 g_list_remove (dbin->requested_selection, id_in_list);
1566 g_free (id_in_list);
1567 SELECTION_UNLOCK (dbin);
1568 gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
1569 (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
1570 SELECTION_LOCK (dbin);
1571 return NULL;
1572 }
1573
1574 output = create_output_stream (dbin, slot->type);
1575 output->slot = slot;
1576 GST_DEBUG ("Linking slot %p to new output %p", slot, output);
1577 slot->output = output;
1578 dbin->active_selection =
1579 g_list_append (dbin->active_selection, (gchar *) stream_id);
1580 } else
1581 GST_DEBUG ("Not creating any output for slot %p", slot);
1582
1583 return output;
1584 }
1585
1586 /* Returns SELECTED_STREAMS message if active_selection is equal to
1587 * requested_selection, else NULL.
1588 * Must be called with LOCK taken */
1589 static GstMessage *
is_selection_done(GstDecodebin3 * dbin)1590 is_selection_done (GstDecodebin3 * dbin)
1591 {
1592 GList *tmp;
1593 GstMessage *msg;
1594
1595 if (!dbin->selection_updated)
1596 return NULL;
1597
1598 GST_LOG_OBJECT (dbin, "Checking");
1599
1600 if (dbin->to_activate != NULL) {
1601 GST_DEBUG ("Still have streams to activate");
1602 return NULL;
1603 }
1604 for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
1605 GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
1606 if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
1607 GST_DEBUG ("Not in active selection, returning");
1608 return NULL;
1609 }
1610 }
1611
1612 GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
1613
1614 /* We are completely active */
1615 msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
1616 GST_MESSAGE_SEQNUM (msg) = dbin->select_streams_seqnum;
1617 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1618 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1619 if (output->slot) {
1620 GST_DEBUG_OBJECT (dbin, "Adding stream %s",
1621 gst_stream_get_stream_id (output->slot->active_stream));
1622
1623 gst_message_streams_selected_add (msg, output->slot->active_stream);
1624 } else
1625 GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
1626 }
1627 dbin->selection_updated = FALSE;
1628 return msg;
1629 }
1630
1631 /* Must be called with SELECTION_LOCK taken */
1632 static void
check_all_slot_for_eos(GstDecodebin3 * dbin)1633 check_all_slot_for_eos (GstDecodebin3 * dbin)
1634 {
1635 gboolean all_drained = TRUE;
1636 GList *iter;
1637
1638 GST_DEBUG_OBJECT (dbin, "check slot for eos");
1639
1640 for (iter = dbin->slots; iter; iter = iter->next) {
1641 MultiQueueSlot *slot = iter->data;
1642
1643 if (!slot->output)
1644 continue;
1645
1646 if (slot->is_drained) {
1647 GST_LOG_OBJECT (slot->sink_pad, "slot %p is drained", slot);
1648 continue;
1649 }
1650
1651 all_drained = FALSE;
1652 break;
1653 }
1654
1655 if (all_drained) {
1656 INPUT_LOCK (dbin);
1657 if (!pending_pads_are_eos (dbin->main_input))
1658 all_drained = FALSE;
1659
1660 if (all_drained) {
1661 for (iter = dbin->other_inputs; iter; iter = iter->next) {
1662 if (!pending_pads_are_eos ((DecodebinInput *) iter->data)) {
1663 all_drained = FALSE;
1664 break;
1665 }
1666 }
1667 }
1668 INPUT_UNLOCK (dbin);
1669 }
1670
1671 if (all_drained) {
1672 GST_DEBUG_OBJECT (dbin,
1673 "All active slots are drained, and no pending input, push EOS");
1674
1675 for (iter = dbin->input_streams; iter; iter = iter->next) {
1676 DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
1677 GstPad *peer = gst_pad_get_peer (input->srcpad);
1678
1679 /* Send EOS to all slots */
1680 if (peer) {
1681 GstEvent *stream_start, *eos;
1682
1683 stream_start =
1684 gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
1685
1686 /* First forward a custom STREAM_START event to reset the EOS status (if any) */
1687 if (stream_start) {
1688 GstStructure *s;
1689 GstEvent *custom_stream_start = gst_event_copy (stream_start);
1690 gst_event_unref (stream_start);
1691 s = (GstStructure *) gst_event_get_structure (custom_stream_start);
1692 gst_structure_set (s, "decodebin3-flushing-stream-start",
1693 G_TYPE_BOOLEAN, TRUE, NULL);
1694 gst_pad_send_event (peer, custom_stream_start);
1695 }
1696
1697 eos = gst_event_new_eos ();
1698 gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (eos),
1699 CUSTOM_FINAL_EOS_QUARK, (gchar *) CUSTOM_FINAL_EOS_QUARK_DATA,
1700 NULL);
1701 gst_pad_send_event (peer, eos);
1702 gst_object_unref (peer);
1703 } else
1704 GST_DEBUG_OBJECT (dbin, "no output");
1705 }
1706 }
1707 }
1708
1709 static GstPadProbeReturn
multiqueue_src_probe(GstPad * pad,GstPadProbeInfo * info,MultiQueueSlot * slot)1710 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
1711 MultiQueueSlot * slot)
1712 {
1713 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
1714 GstDecodebin3 *dbin = slot->dbin;
1715
1716 if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
1717 GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
1718
1719 GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
1720 switch (GST_EVENT_TYPE (ev)) {
1721 case GST_EVENT_STREAM_START:
1722 {
1723 GstStream *stream = NULL;
1724 const GstStructure *s = gst_event_get_structure (ev);
1725
1726 /* Drop STREAM_START events used to cleanup multiqueue */
1727 if (s
1728 && gst_structure_has_field (s,
1729 "decodebin3-flushing-stream-start")) {
1730 ret = GST_PAD_PROBE_HANDLED;
1731 gst_event_unref (ev);
1732 break;
1733 }
1734
1735 gst_event_parse_stream (ev, &stream);
1736 if (stream == NULL) {
1737 GST_ERROR_OBJECT (pad,
1738 "Got a STREAM_START event without a GstStream");
1739 break;
1740 }
1741 slot->is_drained = FALSE;
1742 GST_DEBUG_OBJECT (pad, "Stream Start '%s'",
1743 gst_stream_get_stream_id (stream));
1744 if (slot->active_stream == NULL) {
1745 slot->active_stream = stream;
1746 } else if (slot->active_stream != stream) {
1747 GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
1748 gst_stream_get_stream_id (slot->active_stream),
1749 gst_stream_get_stream_id (stream));
1750 gst_object_unref (slot->active_stream);
1751 slot->active_stream = stream;
1752 } else
1753 gst_object_unref (stream);
1754 #if 0 /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
1755 {
1756 gboolean is_active, is_requested;
1757 /* Quick check to see if we're in the current selection */
1758 /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
1759 SELECTION_LOCK (dbin);
1760 GST_DEBUG_OBJECT (dbin, "Checking active selection");
1761 is_active = stream_in_list (dbin->active_selection, stream_id);
1762 GST_DEBUG_OBJECT (dbin, "Checking requested selection");
1763 is_requested = stream_in_list (dbin->requested_selection, stream_id);
1764 SELECTION_UNLOCK (dbin);
1765 if (is_active)
1766 GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
1767 slot->output);
1768 if (is_requested)
1769 GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
1770 slot->output);
1771 else if (slot->output) {
1772 GST_DEBUG_OBJECT (pad,
1773 "Slot needs to be deactivated ? It's no longer in requested selection");
1774 } else if (!is_active)
1775 GST_DEBUG_OBJECT (pad,
1776 "Slot in neither active nor requested selection");
1777 }
1778 #endif
1779 }
1780 break;
1781 case GST_EVENT_CAPS:
1782 {
1783 /* Configure the output slot if needed */
1784 DecodebinOutputStream *output;
1785 GstMessage *msg = NULL;
1786 SELECTION_LOCK (dbin);
1787 output = get_output_for_slot (slot);
1788 if (output) {
1789 reconfigure_output_stream (output, slot);
1790 msg = is_selection_done (dbin);
1791 }
1792 SELECTION_UNLOCK (dbin);
1793 if (msg)
1794 gst_element_post_message ((GstElement *) slot->dbin, msg);
1795 }
1796 break;
1797 case GST_EVENT_EOS:
1798 {
1799 gboolean was_drained = slot->is_drained;
1800 slot->is_drained = TRUE;
1801
1802 /* Custom EOS handling first */
1803 if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
1804 CUSTOM_EOS_QUARK)) {
1805 /* remove custom-eos */
1806 gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev),
1807 CUSTOM_EOS_QUARK, NULL, NULL);
1808 GST_LOG_OBJECT (pad, "Received custom EOS");
1809 ret = GST_PAD_PROBE_HANDLED;
1810 SELECTION_LOCK (dbin);
1811 if (slot->input == NULL) {
1812 GST_DEBUG_OBJECT (pad,
1813 "Got custom-eos from null input stream, remove output stream");
1814 /* Remove the output */
1815 if (slot->output) {
1816 DecodebinOutputStream *output = slot->output;
1817 dbin->output_streams =
1818 g_list_remove (dbin->output_streams, output);
1819 free_output_stream (dbin, output);
1820 }
1821 slot->probe_id = 0;
1822 dbin->slots = g_list_remove (dbin->slots, slot);
1823 free_multiqueue_slot_async (dbin, slot);
1824 ret = GST_PAD_PROBE_REMOVE;
1825 } else if (!was_drained) {
1826 check_all_slot_for_eos (dbin);
1827 }
1828 if (ret == GST_PAD_PROBE_HANDLED)
1829 gst_event_unref (ev);
1830 SELECTION_UNLOCK (dbin);
1831 break;
1832 }
1833
1834 GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
1835 slot->input);
1836 if (slot->input == NULL) {
1837 GstPad *peer;
1838 GST_DEBUG_OBJECT (pad,
1839 "last EOS for input, forwarding and removing slot");
1840 peer = gst_pad_get_peer (pad);
1841 if (peer) {
1842 gst_pad_send_event (peer, ev);
1843 gst_object_unref (peer);
1844 } else {
1845 gst_event_unref (ev);
1846 }
1847 SELECTION_LOCK (dbin);
1848 /* FIXME : Shouldn't we try to re-assign the output instead of just
1849 * removing it ? */
1850 /* Remove the output */
1851 if (slot->output) {
1852 DecodebinOutputStream *output = slot->output;
1853 dbin->output_streams = g_list_remove (dbin->output_streams, output);
1854 free_output_stream (dbin, output);
1855 }
1856 slot->probe_id = 0;
1857 dbin->slots = g_list_remove (dbin->slots, slot);
1858 SELECTION_UNLOCK (dbin);
1859
1860 free_multiqueue_slot_async (dbin, slot);
1861 ret = GST_PAD_PROBE_REMOVE;
1862 } else if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
1863 CUSTOM_FINAL_EOS_QUARK)) {
1864 GST_DEBUG_OBJECT (pad, "Got final eos, propagating downstream");
1865 } else {
1866 GST_DEBUG_OBJECT (pad, "Got regular eos (all_inputs_are_eos)");
1867 /* drop current event as eos will be sent in check_all_slot_for_eos
1868 * when all output streams are also eos */
1869 ret = GST_PAD_PROBE_DROP;
1870 SELECTION_LOCK (dbin);
1871 check_all_slot_for_eos (dbin);
1872 SELECTION_UNLOCK (dbin);
1873 }
1874 }
1875 break;
1876 default:
1877 break;
1878 }
1879 } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
1880 GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
1881 switch (GST_QUERY_TYPE (query)) {
1882 case GST_QUERY_CAPS:
1883 {
1884 GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
1885 gst_query_set_caps_result (query, GST_CAPS_ANY);
1886 ret = GST_PAD_PROBE_HANDLED;
1887 }
1888 break;
1889
1890 case GST_QUERY_ACCEPT_CAPS:
1891 {
1892 GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
1893 /* If the current decoder doesn't accept caps, we'll reconfigure
1894 * on the actual caps event. So accept any caps. */
1895 gst_query_set_accept_caps_result (query, TRUE);
1896 ret = GST_PAD_PROBE_HANDLED;
1897 }
1898 default:
1899 break;
1900 }
1901 }
1902
1903 return ret;
1904 }
1905
1906 /* Create a new multiqueue slot for the given type
1907 *
1908 * It is up to the caller to know whether that slot is needed or not
1909 * (and release it when no longer needed) */
1910 static MultiQueueSlot *
create_new_slot(GstDecodebin3 * dbin,GstStreamType type)1911 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
1912 {
1913 MultiQueueSlot *slot;
1914 GstIterator *it = NULL;
1915 GValue item = { 0, };
1916
1917 GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
1918 gst_stream_type_get_name (type));
1919 slot = g_new0 (MultiQueueSlot, 1);
1920 slot->dbin = dbin;
1921
1922 slot->id = dbin->slot_id++;
1923
1924 slot->type = type;
1925 slot->sink_pad = gst_element_get_request_pad (dbin->multiqueue, "sink_%u");
1926 if (slot->sink_pad == NULL)
1927 goto fail;
1928
1929 it = gst_pad_iterate_internal_links (slot->sink_pad);
1930 if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
1931 || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
1932 GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
1933 GST_DEBUG_PAD_NAME (slot->src_pad));
1934 goto fail;
1935 }
1936 gst_iterator_free (it);
1937 g_value_reset (&item);
1938
1939 g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
1940
1941 /* Add event probe */
1942 slot->probe_id =
1943 gst_pad_add_probe (slot->src_pad,
1944 GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
1945 (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
1946
1947 GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
1948 GST_DEBUG_PAD_NAME (slot->src_pad));
1949
1950 dbin->slots = g_list_append (dbin->slots, slot);
1951
1952 return slot;
1953
1954 /* ERRORS */
1955 fail:
1956 {
1957 if (slot->sink_pad)
1958 gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
1959 g_free (slot);
1960 return NULL;
1961 }
1962 }
1963
1964 /* Must be called with SELECTION_LOCK */
1965 static MultiQueueSlot *
get_slot_for_input(GstDecodebin3 * dbin,DecodebinInputStream * input)1966 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
1967 {
1968 GList *tmp;
1969 MultiQueueSlot *empty_slot = NULL;
1970 GstStreamType input_type = 0;
1971 gchar *stream_id = NULL;
1972
1973 GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
1974 input, input->active_stream,
1975 input->
1976 active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
1977
1978 if (input->active_stream) {
1979 input_type = gst_stream_get_stream_type (input->active_stream);
1980 stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
1981 }
1982
1983 /* Go over existing slots and check if there is already one for it */
1984 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
1985 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
1986 /* Already used input, return that one */
1987 if (slot->input == input) {
1988 GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
1989 return slot;
1990 }
1991 }
1992
1993 /* Go amongst all unused slots of the right type and try to find a candidate */
1994 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
1995 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
1996 if (slot->input == NULL && input_type == slot->type) {
1997 /* Remember this empty slot for later */
1998 empty_slot = slot;
1999 /* Check if available slot is of the same stream_id */
2000 GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
2001 slot->id, slot->active_stream);
2002 if (stream_id && slot->active_stream) {
2003 gchar *ostream_id =
2004 (gchar *) gst_stream_get_stream_id (slot->active_stream);
2005 GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
2006 ostream_id, stream_id);
2007 if (!g_strcmp0 (stream_id, ostream_id))
2008 break;
2009 }
2010 }
2011 }
2012
2013 if (empty_slot) {
2014 GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
2015 empty_slot->input = input;
2016 return empty_slot;
2017 }
2018
2019 if (input_type)
2020 return create_new_slot (dbin, input_type);
2021
2022 return NULL;
2023 }
2024
2025 static void
link_input_to_slot(DecodebinInputStream * input,MultiQueueSlot * slot)2026 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
2027 {
2028 if (slot->input != NULL && slot->input != input) {
2029 GST_ERROR_OBJECT (slot->dbin,
2030 "Trying to link input to an already used slot");
2031 return;
2032 }
2033 gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
2034 slot->pending_stream = input->active_stream;
2035 slot->input = input;
2036 }
2037
2038 #if 0
2039 static gboolean
2040 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
2041 GstElementFactoryListType ftype)
2042 {
2043 gboolean ret = FALSE;
2044 GList *res;
2045
2046 g_mutex_lock (&dbin->factories_lock);
2047 gst_decode_bin_update_factories_list (dbin);
2048 if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
2049 res =
2050 gst_element_factory_list_filter (dbin->decoder_factories,
2051 caps, GST_PAD_SINK, TRUE);
2052 else
2053 res =
2054 gst_element_factory_list_filter (dbin->decodable_factories,
2055 caps, GST_PAD_SINK, TRUE);
2056 g_mutex_unlock (&dbin->factories_lock);
2057
2058 if (res) {
2059 ret = TRUE;
2060 gst_plugin_feature_list_free (res);
2061 }
2062
2063 return ret;
2064 }
2065 #endif
2066
2067 static GstElement *
create_element(GstDecodebin3 * dbin,GstStream * stream,GstElementFactoryListType ftype)2068 create_element (GstDecodebin3 * dbin, GstStream * stream,
2069 GstElementFactoryListType ftype)
2070 {
2071 GList *res;
2072 GstElement *element = NULL;
2073 GstCaps *caps;
2074
2075 g_mutex_lock (&dbin->factories_lock);
2076 gst_decode_bin_update_factories_list (dbin);
2077 caps = gst_stream_get_caps (stream);
2078 if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
2079 res =
2080 gst_element_factory_list_filter (dbin->decoder_factories,
2081 caps, GST_PAD_SINK, TRUE);
2082 else
2083 res =
2084 gst_element_factory_list_filter (dbin->decodable_factories,
2085 caps, GST_PAD_SINK, TRUE);
2086 g_mutex_unlock (&dbin->factories_lock);
2087
2088 if (res) {
2089 element =
2090 gst_element_factory_create ((GstElementFactory *) res->data, NULL);
2091 GST_DEBUG ("Created element '%s'", GST_ELEMENT_NAME (element));
2092 gst_plugin_feature_list_free (res);
2093 } else {
2094 GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT, caps);
2095 }
2096
2097 gst_caps_unref (caps);
2098 return element;
2099 }
2100
2101 /* FIXME : VERY NAIVE. ASSUMING FIRST ONE WILL WORK */
2102 static GstElement *
create_decoder(GstDecodebin3 * dbin,GstStream * stream)2103 create_decoder (GstDecodebin3 * dbin, GstStream * stream)
2104 {
2105 return create_element (dbin, stream, GST_ELEMENT_FACTORY_TYPE_DECODER);
2106 }
2107
2108 static GstPadProbeReturn
keyframe_waiter_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinOutputStream * output)2109 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
2110 DecodebinOutputStream * output)
2111 {
2112 GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2113 /* If we have a keyframe, remove the probe and let all data through */
2114 /* FIXME : HANDLE HEADER BUFFER ?? */
2115 if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
2116 GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
2117 GST_DEBUG_OBJECT (pad,
2118 "Buffer is keyframe or header, letting through and removing probe");
2119 output->drop_probe_id = 0;
2120 return GST_PAD_PROBE_REMOVE;
2121 }
2122 GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
2123 return GST_PAD_PROBE_DROP;
2124 }
2125
2126 static void
reconfigure_output_stream(DecodebinOutputStream * output,MultiQueueSlot * slot)2127 reconfigure_output_stream (DecodebinOutputStream * output,
2128 MultiQueueSlot * slot)
2129 {
2130 GstDecodebin3 *dbin = output->dbin;
2131 GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
2132 gboolean needs_decoder;
2133
2134 needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
2135
2136 GST_DEBUG_OBJECT (dbin,
2137 "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
2138 needs_decoder);
2139
2140 /* FIXME : Maybe make the output un-hook itself automatically ? */
2141 if (output->slot != NULL && output->slot != slot) {
2142 GST_WARNING_OBJECT (dbin,
2143 "Output still linked to another slot (%p)", output->slot);
2144 gst_caps_unref (new_caps);
2145 return;
2146 }
2147
2148 /* Check if existing config is reusable as-is by checking if
2149 * the existing decoder accepts the new caps, if not delete
2150 * it and create a new one */
2151 if (output->decoder) {
2152 gboolean can_reuse_decoder;
2153
2154 if (needs_decoder) {
2155 can_reuse_decoder =
2156 gst_pad_query_accept_caps (output->decoder_sink, new_caps);
2157 } else
2158 can_reuse_decoder = FALSE;
2159
2160 if (can_reuse_decoder) {
2161 if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
2162 GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2163 output->drop_probe_id =
2164 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2165 (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2166 }
2167 GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
2168 if (output->linked == FALSE) {
2169 gst_pad_link_full (slot->src_pad, output->decoder_sink,
2170 GST_PAD_LINK_CHECK_NOTHING);
2171 output->linked = TRUE;
2172 }
2173 gst_caps_unref (new_caps);
2174 return;
2175 }
2176
2177 GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
2178
2179 if (output->linked)
2180 gst_pad_unlink (slot->src_pad, output->decoder_sink);
2181 output->linked = FALSE;
2182 if (output->drop_probe_id) {
2183 gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2184 output->drop_probe_id = 0;
2185 }
2186
2187 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2188 GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
2189 gst_caps_unref (new_caps);
2190 goto cleanup;
2191 }
2192
2193 gst_element_set_locked_state (output->decoder, TRUE);
2194 gst_element_set_state (output->decoder, GST_STATE_NULL);
2195
2196 gst_bin_remove ((GstBin *) dbin, output->decoder);
2197 output->decoder = NULL;
2198 }
2199
2200 gst_caps_unref (new_caps);
2201
2202 gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2203 gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2204
2205 /* If a decoder is required, create one */
2206 if (needs_decoder) {
2207 /* If we don't have a decoder yet, instantiate one */
2208 output->decoder = create_decoder (dbin, slot->active_stream);
2209 if (output->decoder == NULL) {
2210 GstCaps *caps;
2211
2212 SELECTION_UNLOCK (dbin);
2213 /* FIXME : Should we be smarter if there's a missing decoder ?
2214 * Should we deactivate that stream ? */
2215 caps = gst_stream_get_caps (slot->active_stream);
2216 gst_element_post_message (GST_ELEMENT_CAST (dbin),
2217 gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2218 gst_caps_unref (caps);
2219 SELECTION_LOCK (dbin);
2220 goto cleanup;
2221 }
2222 if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
2223 GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
2224 goto cleanup;
2225 }
2226 output->decoder_sink = gst_element_get_static_pad (output->decoder, "sink");
2227 output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
2228 if (output->type & GST_STREAM_TYPE_VIDEO) {
2229 GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2230 output->drop_probe_id =
2231 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2232 (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2233 }
2234 if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
2235 GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
2236 GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
2237 GST_DEBUG_PAD_NAME (output->decoder_sink));
2238 goto cleanup;
2239 }
2240 } else {
2241 output->decoder_src = gst_object_ref (slot->src_pad);
2242 output->decoder_sink = NULL;
2243 }
2244 output->linked = TRUE;
2245 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
2246 output->decoder_src)) {
2247 GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
2248 goto cleanup;
2249 }
2250 if (output->src_exposed == FALSE) {
2251 output->src_exposed = TRUE;
2252 gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
2253 }
2254
2255 if (output->decoder)
2256 gst_element_sync_state_with_parent (output->decoder);
2257
2258 output->slot = slot;
2259 return;
2260
2261 cleanup:
2262 {
2263 GST_DEBUG_OBJECT (dbin, "Cleanup");
2264 if (output->decoder_sink) {
2265 gst_object_unref (output->decoder_sink);
2266 output->decoder_sink = NULL;
2267 }
2268 if (output->decoder_src) {
2269 gst_object_unref (output->decoder_src);
2270 output->decoder_src = NULL;
2271 }
2272 if (output->decoder) {
2273 gst_element_set_state (output->decoder, GST_STATE_NULL);
2274 gst_bin_remove ((GstBin *) dbin, output->decoder);
2275 output->decoder = NULL;
2276 }
2277 }
2278 }
2279
2280 static GstPadProbeReturn
idle_reconfigure(GstPad * pad,GstPadProbeInfo * info,MultiQueueSlot * slot)2281 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
2282 {
2283 GstMessage *msg = NULL;
2284 DecodebinOutputStream *output;
2285
2286 SELECTION_LOCK (slot->dbin);
2287 output = get_output_for_slot (slot);
2288
2289 GST_DEBUG_OBJECT (pad, "output : %p", output);
2290
2291 if (output) {
2292 reconfigure_output_stream (output, slot);
2293 msg = is_selection_done (slot->dbin);
2294 }
2295 SELECTION_UNLOCK (slot->dbin);
2296 if (msg)
2297 gst_element_post_message ((GstElement *) slot->dbin, msg);
2298
2299 return GST_PAD_PROBE_REMOVE;
2300 }
2301
2302 static MultiQueueSlot *
find_slot_for_stream_id(GstDecodebin3 * dbin,const gchar * sid)2303 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
2304 {
2305 GList *tmp;
2306
2307 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2308 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2309 const gchar *stream_id;
2310 if (slot->active_stream) {
2311 stream_id = gst_stream_get_stream_id (slot->active_stream);
2312 if (!g_strcmp0 (sid, stream_id))
2313 return slot;
2314 }
2315 if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
2316 stream_id = gst_stream_get_stream_id (slot->pending_stream);
2317 if (!g_strcmp0 (sid, stream_id))
2318 return slot;
2319 }
2320 }
2321
2322 return NULL;
2323 }
2324
2325 /* This function handles the reassignment of a slot. Call this from
2326 * the streaming thread of a slot. */
2327 static gboolean
reassign_slot(GstDecodebin3 * dbin,MultiQueueSlot * slot)2328 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2329 {
2330 DecodebinOutputStream *output;
2331 MultiQueueSlot *target_slot = NULL;
2332 GList *tmp;
2333 const gchar *sid, *tsid;
2334
2335 SELECTION_LOCK (dbin);
2336 output = slot->output;
2337
2338 if (G_UNLIKELY (slot->active_stream == NULL)) {
2339 GST_DEBUG_OBJECT (slot->src_pad,
2340 "Called on inactive slot (active_stream == NULL)");
2341 SELECTION_UNLOCK (dbin);
2342 return FALSE;
2343 }
2344
2345 if (G_UNLIKELY (output == NULL)) {
2346 GST_DEBUG_OBJECT (slot->src_pad,
2347 "Slot doesn't have any output to be removed");
2348 SELECTION_UNLOCK (dbin);
2349 return FALSE;
2350 }
2351
2352 sid = gst_stream_get_stream_id (slot->active_stream);
2353 GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
2354
2355 /* Recheck whether this stream is still in the list of streams to deactivate */
2356 if (stream_in_list (dbin->requested_selection, sid)) {
2357 /* Stream is in the list of requested streams, don't remove */
2358 SELECTION_UNLOCK (dbin);
2359 GST_DEBUG_OBJECT (slot->src_pad,
2360 "Stream '%s' doesn't need to be deactivated", sid);
2361 return FALSE;
2362 }
2363
2364 /* Unlink slot from output */
2365 /* FIXME : Handle flushing ? */
2366 /* FIXME : Handle outputs without decoders */
2367 GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
2368 output->decoder_sink);
2369 if (output->decoder_sink)
2370 gst_pad_unlink (slot->src_pad, output->decoder_sink);
2371 output->linked = FALSE;
2372 slot->output = NULL;
2373 output->slot = NULL;
2374 /* Remove sid from active selection */
2375 for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
2376 if (!g_strcmp0 (sid, tmp->data)) {
2377 dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
2378 break;
2379 }
2380
2381 /* Can we re-assign this output to a requested stream ? */
2382 GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
2383 for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
2384 MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
2385 GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
2386 tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
2387 if (tslot && tslot->type == output->type && tslot->output == NULL) {
2388 GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
2389 target_slot = tslot;
2390 tsid = tmp->data;
2391 /* Pass target stream id to requested selection */
2392 dbin->requested_selection =
2393 g_list_append (dbin->requested_selection, g_strdup (tmp->data));
2394 dbin->to_activate = g_list_remove (dbin->to_activate, tmp->data);
2395 break;
2396 }
2397 }
2398
2399 if (target_slot) {
2400 GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
2401 target_slot, tsid);
2402 target_slot->output = output;
2403 output->slot = target_slot;
2404 dbin->active_selection =
2405 g_list_append (dbin->active_selection, (gchar *) tsid);
2406 SELECTION_UNLOCK (dbin);
2407
2408 /* Wakeup the target slot so that it retries to send events/buffers
2409 * thereby triggering the output reconfiguration codepath */
2410 gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2411 (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
2412 /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
2413 } else {
2414 GstMessage *msg;
2415
2416 dbin->output_streams = g_list_remove (dbin->output_streams, output);
2417 free_output_stream (dbin, output);
2418 msg = is_selection_done (slot->dbin);
2419 SELECTION_UNLOCK (dbin);
2420
2421 if (msg)
2422 gst_element_post_message ((GstElement *) slot->dbin, msg);
2423 }
2424
2425 return TRUE;
2426 }
2427
2428 /* Idle probe called when a slot should be unassigned from its output stream.
2429 * This is needed to ensure nothing is flowing when unlinking the slot.
2430 *
2431 * Also, this method will search for a pending stream which could re-use
2432 * the output stream. */
2433 static GstPadProbeReturn
slot_unassign_probe(GstPad * pad,GstPadProbeInfo * info,MultiQueueSlot * slot)2434 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
2435 MultiQueueSlot * slot)
2436 {
2437 GstDecodebin3 *dbin = slot->dbin;
2438
2439 reassign_slot (dbin, slot);
2440
2441 return GST_PAD_PROBE_REMOVE;
2442 }
2443
2444 static gboolean
handle_stream_switch(GstDecodebin3 * dbin,GList * select_streams,guint32 seqnum)2445 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
2446 guint32 seqnum)
2447 {
2448 gboolean ret = TRUE;
2449 GList *tmp;
2450 /* List of slots to (de)activate. */
2451 GList *to_deactivate = NULL;
2452 GList *to_activate = NULL;
2453 /* List of unknown stream id, most likely means the event
2454 * should be sent upstream so that elements can expose the requested stream */
2455 GList *unknown = NULL;
2456 GList *to_reassign = NULL;
2457 GList *future_request_streams = NULL;
2458 GList *pending_streams = NULL;
2459 GList *slots_to_reassign = NULL;
2460
2461 SELECTION_LOCK (dbin);
2462 if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
2463 GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
2464 SELECTION_UNLOCK (dbin);
2465 return TRUE;
2466 }
2467 /* Remove pending select_streams */
2468 g_list_free (dbin->pending_select_streams);
2469 dbin->pending_select_streams = NULL;
2470
2471 /* COMPARE the requested streams to the active and requested streams
2472 * on multiqueue. */
2473
2474 /* First check the slots to activate and which ones are unknown */
2475 for (tmp = select_streams; tmp; tmp = tmp->next) {
2476 const gchar *sid = (const gchar *) tmp->data;
2477 MultiQueueSlot *slot;
2478 GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
2479 slot = find_slot_for_stream_id (dbin, sid);
2480 /* Find the corresponding slot */
2481 if (slot == NULL) {
2482 if (stream_in_collection (dbin, (gchar *) sid)) {
2483 pending_streams = g_list_append (pending_streams, (gchar *) sid);
2484 } else {
2485 GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
2486 unknown = g_list_append (unknown, (gchar *) sid);
2487 }
2488 } else if (slot->output == NULL) {
2489 GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
2490 slot, sid);
2491 to_activate = g_list_append (to_activate, slot);
2492 } else {
2493 GST_DEBUG_OBJECT (dbin,
2494 "Stream '%s' from slot %p is already active on output %p", sid, slot,
2495 slot->output);
2496 future_request_streams =
2497 g_list_append (future_request_streams, (gchar *) sid);
2498 }
2499 }
2500
2501 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2502 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2503 /* For slots that have an output, check if it's part of the streams to
2504 * be active */
2505 if (slot->output) {
2506 gboolean slot_to_deactivate = TRUE;
2507
2508 if (slot->active_stream) {
2509 if (stream_in_list (select_streams,
2510 gst_stream_get_stream_id (slot->active_stream)))
2511 slot_to_deactivate = FALSE;
2512 }
2513 if (slot_to_deactivate && slot->pending_stream
2514 && slot->pending_stream != slot->active_stream) {
2515 if (stream_in_list (select_streams,
2516 gst_stream_get_stream_id (slot->pending_stream)))
2517 slot_to_deactivate = FALSE;
2518 }
2519 if (slot_to_deactivate) {
2520 GST_DEBUG_OBJECT (dbin,
2521 "Slot %p (%s) should be deactivated, no longer used", slot,
2522 slot->
2523 active_stream ? gst_stream_get_stream_id (slot->active_stream) :
2524 "NULL");
2525 to_deactivate = g_list_append (to_deactivate, slot);
2526 }
2527 }
2528 }
2529
2530 if (to_deactivate != NULL) {
2531 GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
2532 /* We need to compare what needs to be activated and deactivated in order
2533 * to determine whether there are outputs that can be transferred */
2534 /* Take the stream-id of the slots that are to be activated, for which there
2535 * is a slot of the same type that needs to be deactivated */
2536 tmp = to_deactivate;
2537 while (tmp) {
2538 MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
2539 gboolean removeit = FALSE;
2540 GList *tmp2, *next;
2541 GST_DEBUG_OBJECT (dbin,
2542 "Checking if slot to deactivate (%p) has a candidate slot to activate",
2543 slot_to_deactivate);
2544 for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
2545 MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
2546 GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
2547 if (slot_to_activate->type == slot_to_deactivate->type) {
2548 GST_DEBUG_OBJECT (dbin, "Re-using");
2549 to_reassign = g_list_append (to_reassign, (gchar *)
2550 gst_stream_get_stream_id (slot_to_activate->active_stream));
2551 slots_to_reassign =
2552 g_list_append (slots_to_reassign, slot_to_deactivate);
2553 to_activate = g_list_remove (to_activate, slot_to_activate);
2554 removeit = TRUE;
2555 break;
2556 }
2557 }
2558 next = tmp->next;
2559 if (removeit)
2560 to_deactivate = g_list_delete_link (to_deactivate, tmp);
2561 tmp = next;
2562 }
2563 }
2564
2565 for (tmp = to_deactivate; tmp; tmp = tmp->next) {
2566 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2567 GST_DEBUG_OBJECT (dbin,
2568 "Really need to deactivate slot %p, but no available alternative",
2569 slot);
2570
2571 slots_to_reassign = g_list_append (slots_to_reassign, slot);
2572 }
2573
2574 /* The only slots left to activate are the ones that won't be reassigned and
2575 * therefore really need to have a new output created */
2576 for (tmp = to_activate; tmp; tmp = tmp->next) {
2577 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2578 if (slot->active_stream)
2579 future_request_streams =
2580 g_list_append (future_request_streams,
2581 (gchar *) gst_stream_get_stream_id (slot->active_stream));
2582 else if (slot->pending_stream)
2583 future_request_streams =
2584 g_list_append (future_request_streams,
2585 (gchar *) gst_stream_get_stream_id (slot->pending_stream));
2586 else
2587 GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
2588 }
2589
2590 if (to_activate == NULL && pending_streams != NULL) {
2591 GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
2592 if (dbin->requested_selection)
2593 g_list_free_full (dbin->requested_selection, g_free);
2594 dbin->requested_selection =
2595 g_list_copy_deep (select_streams, (GCopyFunc) g_strdup, NULL);
2596 g_list_free (to_deactivate);
2597 g_list_free (pending_streams);
2598 to_deactivate = NULL;
2599 pending_streams = NULL;
2600 } else {
2601 if (dbin->requested_selection)
2602 g_list_free_full (dbin->requested_selection, g_free);
2603 dbin->requested_selection =
2604 g_list_copy_deep (future_request_streams, (GCopyFunc) g_strdup, NULL);
2605 dbin->requested_selection =
2606 g_list_concat (dbin->requested_selection,
2607 g_list_copy_deep (pending_streams, (GCopyFunc) g_strdup, NULL));
2608 if (dbin->to_activate)
2609 g_list_free (dbin->to_activate);
2610 dbin->to_activate = g_list_copy (to_reassign);
2611 }
2612
2613 dbin->selection_updated = TRUE;
2614 SELECTION_UNLOCK (dbin);
2615
2616 if (unknown) {
2617 GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
2618 g_list_free (unknown);
2619 }
2620
2621 if (to_activate && !slots_to_reassign) {
2622 for (tmp = to_activate; tmp; tmp = tmp->next) {
2623 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2624 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2625 (GstPadProbeCallback) idle_reconfigure, slot, NULL);
2626 }
2627 }
2628
2629 /* For all streams to deactivate, add an idle probe where we will do
2630 * the unassignment and switch over */
2631 for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
2632 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2633 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2634 (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
2635 }
2636
2637 if (to_deactivate)
2638 g_list_free (to_deactivate);
2639 if (to_activate)
2640 g_list_free (to_activate);
2641 if (to_reassign)
2642 g_list_free (to_reassign);
2643 if (future_request_streams)
2644 g_list_free (future_request_streams);
2645 if (pending_streams)
2646 g_list_free (pending_streams);
2647 if (slots_to_reassign)
2648 g_list_free (slots_to_reassign);
2649
2650 return ret;
2651 }
2652
2653 static GstPadProbeReturn
ghost_pad_event_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinOutputStream * output)2654 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
2655 DecodebinOutputStream * output)
2656 {
2657 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2658 GstDecodebin3 *dbin = output->dbin;
2659 GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
2660
2661 GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
2662
2663 switch (GST_EVENT_TYPE (event)) {
2664 case GST_EVENT_SELECT_STREAMS:
2665 {
2666 GstPad *peer;
2667 GList *streams = NULL;
2668 guint32 seqnum = gst_event_get_seqnum (event);
2669
2670 SELECTION_LOCK (dbin);
2671 if (seqnum == dbin->select_streams_seqnum) {
2672 SELECTION_UNLOCK (dbin);
2673 GST_DEBUG_OBJECT (pad,
2674 "Already handled/handling that SELECT_STREAMS event");
2675 gst_event_unref (event);
2676 ret = GST_PAD_PROBE_HANDLED;
2677 break;
2678 }
2679 dbin->select_streams_seqnum = seqnum;
2680 if (dbin->pending_select_streams != NULL) {
2681 GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2682 g_list_free (dbin->pending_select_streams);
2683 dbin->pending_select_streams = NULL;
2684 }
2685 gst_event_parse_select_streams (event, &streams);
2686 dbin->pending_select_streams = g_list_copy (streams);
2687 SELECTION_UNLOCK (dbin);
2688
2689 /* Send event upstream */
2690 if ((peer = gst_pad_get_peer (pad))) {
2691 gst_pad_send_event (peer, event);
2692 gst_object_unref (peer);
2693 } else {
2694 gst_event_unref (event);
2695 }
2696 /* Finally handle the switch */
2697 if (streams) {
2698 handle_stream_switch (dbin, streams, seqnum);
2699 g_list_free_full (streams, g_free);
2700 }
2701 ret = GST_PAD_PROBE_HANDLED;
2702 }
2703 break;
2704 default:
2705 break;
2706 }
2707
2708 return ret;
2709 }
2710
2711 static gboolean
gst_decodebin3_send_event(GstElement * element,GstEvent * event)2712 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
2713 {
2714 GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
2715 if (GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
2716 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2717 GList *streams = NULL;
2718 guint32 seqnum = gst_event_get_seqnum (event);
2719
2720 SELECTION_LOCK (dbin);
2721 if (seqnum == dbin->select_streams_seqnum) {
2722 SELECTION_UNLOCK (dbin);
2723 GST_DEBUG_OBJECT (dbin,
2724 "Already handled/handling that SELECT_STREAMS event");
2725 return TRUE;
2726 }
2727 dbin->select_streams_seqnum = seqnum;
2728 if (dbin->pending_select_streams != NULL) {
2729 GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2730 g_list_free (dbin->pending_select_streams);
2731 dbin->pending_select_streams = NULL;
2732 }
2733 gst_event_parse_select_streams (event, &streams);
2734 dbin->pending_select_streams = g_list_copy (streams);
2735 SELECTION_UNLOCK (dbin);
2736
2737 /* FIXME : We don't have an upstream ?? */
2738 #if 0
2739 /* Send event upstream */
2740 if ((peer = gst_pad_get_peer (pad))) {
2741 gst_pad_send_event (peer, event);
2742 gst_object_unref (peer);
2743 }
2744 #endif
2745 /* Finally handle the switch */
2746 if (streams) {
2747 handle_stream_switch (dbin, streams, seqnum);
2748 g_list_free_full (streams, g_free);
2749 }
2750
2751 gst_event_unref (event);
2752 return TRUE;
2753 }
2754 return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
2755 }
2756
2757
2758 static void
free_multiqueue_slot(GstDecodebin3 * dbin,MultiQueueSlot * slot)2759 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2760 {
2761 if (slot->probe_id)
2762 gst_pad_remove_probe (slot->src_pad, slot->probe_id);
2763 if (slot->input) {
2764 if (slot->input->srcpad)
2765 gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
2766 }
2767
2768 gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2769 gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
2770 gst_object_replace ((GstObject **) & slot->src_pad, NULL);
2771 gst_object_replace ((GstObject **) & slot->active_stream, NULL);
2772 g_free (slot);
2773 }
2774
2775 static void
free_multiqueue_slot_async(GstDecodebin3 * dbin,MultiQueueSlot * slot)2776 free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2777 {
2778 GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
2779 gst_element_call_async (GST_ELEMENT_CAST (dbin),
2780 (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
2781 }
2782
2783 /* Create a DecodebinOutputStream for a given type
2784 * Note: It will be empty initially, it needs to be configured
2785 * afterwards */
2786 static DecodebinOutputStream *
create_output_stream(GstDecodebin3 * dbin,GstStreamType type)2787 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
2788 {
2789 DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
2790 gchar *pad_name;
2791 const gchar *prefix;
2792 GstStaticPadTemplate *templ;
2793 GstPadTemplate *ptmpl;
2794 guint32 *counter;
2795 GstPad *internal_pad;
2796
2797 GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
2798 res, gst_stream_type_get_name (type));
2799
2800 res->type = type;
2801 res->dbin = dbin;
2802
2803 if (type & GST_STREAM_TYPE_VIDEO) {
2804 templ = &video_src_template;
2805 counter = &dbin->vpadcount;
2806 prefix = "video";
2807 } else if (type & GST_STREAM_TYPE_AUDIO) {
2808 templ = &audio_src_template;
2809 counter = &dbin->apadcount;
2810 prefix = "audio";
2811 } else if (type & GST_STREAM_TYPE_TEXT) {
2812 templ = &text_src_template;
2813 counter = &dbin->tpadcount;
2814 prefix = "text";
2815 } else {
2816 templ = &src_template;
2817 counter = &dbin->opadcount;
2818 prefix = "src";
2819 }
2820
2821 pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
2822 *counter += 1;
2823 ptmpl = gst_static_pad_template_get (templ);
2824 res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
2825 gst_object_unref (ptmpl);
2826 g_free (pad_name);
2827 gst_pad_set_active (res->src_pad, TRUE);
2828 /* Put an event probe on the internal proxy pad to detect upstream
2829 * events */
2830 internal_pad =
2831 (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
2832 gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
2833 (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
2834 gst_object_unref (internal_pad);
2835
2836 dbin->output_streams = g_list_append (dbin->output_streams, res);
2837
2838 return res;
2839 }
2840
2841 static void
free_output_stream(GstDecodebin3 * dbin,DecodebinOutputStream * output)2842 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
2843 {
2844 if (output->slot) {
2845 if (output->decoder_sink && output->decoder)
2846 gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
2847
2848 output->slot->output = NULL;
2849 output->slot = NULL;
2850 }
2851 gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2852 gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
2853 gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2854 if (output->src_exposed) {
2855 gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
2856 }
2857 if (output->decoder) {
2858 gst_element_set_locked_state (output->decoder, TRUE);
2859 gst_element_set_state (output->decoder, GST_STATE_NULL);
2860 gst_bin_remove ((GstBin *) dbin, output->decoder);
2861 }
2862 g_free (output);
2863 }
2864
2865 static GstStateChangeReturn
gst_decodebin3_change_state(GstElement * element,GstStateChange transition)2866 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
2867 {
2868 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2869 GstStateChangeReturn ret;
2870
2871 /* Upwards */
2872 switch (transition) {
2873 default:
2874 break;
2875 }
2876 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2877 if (ret == GST_STATE_CHANGE_FAILURE)
2878 goto beach;
2879
2880 switch (transition) {
2881 case GST_STATE_CHANGE_PAUSED_TO_READY:
2882 {
2883 GList *tmp;
2884
2885 /* Free output streams */
2886 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
2887 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
2888 free_output_stream (dbin, output);
2889 }
2890 g_list_free (dbin->output_streams);
2891 dbin->output_streams = NULL;
2892 /* Free multiqueue slots */
2893 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2894 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2895 free_multiqueue_slot (dbin, slot);
2896 }
2897 g_list_free (dbin->slots);
2898 dbin->slots = NULL;
2899 dbin->current_group_id = GST_GROUP_ID_INVALID;
2900 /* Free inputs */
2901 }
2902 break;
2903 default:
2904 break;
2905 }
2906 beach:
2907 return ret;
2908 }
2909
2910 gboolean
gst_decodebin3_plugin_init(GstPlugin * plugin)2911 gst_decodebin3_plugin_init (GstPlugin * plugin)
2912 {
2913 GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");
2914
2915 return gst_element_register (plugin, "decodebin3", GST_RANK_NONE,
2916 GST_TYPE_DECODEBIN3);
2917 }
2918