/* GStreamer * * Copyright (C) <2015> Centricular Ltd * @author: Edward Hervey * @author: Jan Schmidt * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include #include #include #include #include #include "gstplayback.h" #include "gstplay-enum.h" #include "gstrawcaps.h" /** * SECTION:element-decodebin3 * @title: decodebin3 * * #GstBin that auto-magically constructs a decoding pipeline using available * decoders and demuxers via auto-plugging. The output is raw audio, video * or subtitle streams. * * decodebin3 differs from the previous decodebin (decodebin2) in important ways: * * * supports publication and selection of stream information via * GstStreamCollection messages and #GST_EVENT_SELECT_STREAM events. * * * dynamically switches stream connections internally, and * reuses decoder elements when stream selections change, so that in * the normal case it maintains 1 decoder of each type (video/audio/subtitle) * and only creates new elements when streams change and an existing decoder * is not capable of handling the new format. * * * supports multiple input pads for the parallel decoding of auxilliary streams * not muxed with the primary stream. * * * does not handle network stream buffering. decodebin3 expects that network stream * buffering is handled upstream, before data is passed to it. * * decodebin3 is still experimental API and a technology preview. * Its behaviour and exposed API is subject to change. * */ /** * Global design * * 1) From sink pad to elementary streams (GstParseBin) * * The input sink pads are fed to GstParseBin. GstParseBin will feed them * through typefind. When the caps are detected (or changed) we recursively * figure out which demuxer, parser or depayloader is needed until we get to * elementary streams. * * All elementary streams (whether decoded or not, whether exposed or not) are * fed through multiqueue. There is only *one* multiqueue in decodebin3. * * => MultiQueue is the cornerstone. * => No buffering before multiqueue * * 2) Elementary streams * * After GstParseBin, there are 3 main components: * 1) Input Streams (provided by GstParseBin) * 2) Multiqueue slots * 3) Output Streams * * Input Streams correspond to the stream coming from GstParseBin and that gets * fed into a multiqueue slot. * * Output Streams correspond to the combination of a (optional) decoder and an * output ghostpad. Output Streams can be moved from one multiqueue slot to * another, can reconfigure itself (different decoders), and can be * added/removed depending on the configuration (all streams outputted, only one * of each type, ...). * * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For * each 'active' Input Stream there is a corresponding slot. * Slots might have different streams on input and output (due to internal * buffering). * * Due to internal queuing/buffering/..., all those components (might) behave * asynchronously. Therefore probes will be used on each component source pad to * detect various key-points: * * EOS : * the stream is done => Mark that component as done, optionally freeing/removing it * * STREAM_START : * a new stream is starting => link it further if needed * * 3) Gradual replacement * * If the caps change at any point in decodebin (input sink pad, demuxer output, * multiqueue output, ..), we gradually replace (if needed) the following elements. * * This is handled by the probes in various locations: * a) typefind output * b) multiqueue input (source pad of Input Streams) * c) multiqueue output (source pad of Multiqueue Slots) * d) final output (target of source ghostpads) * * When CAPS event arrive at those points, one of three things can happen: * a) There is no elements downstream yet, just create/link-to following elements * b) There are downstream elements, do a ACCEPT_CAPS query * b.1) The new CAPS are accepted, keep current configuration * b.2) The new CAPS are not accepted, remove following elements then do a) * * Components: * * MultiQ Output * Input(s) Slots Streams * /-------------------------------------------\ /-----\ /------------- \ * * +-------------------------------------------------------------------------+ * | | * | +---------------------------------------------+ | * | | GstParseBin(s) | | * | | +--------------+ | +-----+ | * | | | |---[parser]-[|--| Mul |---[ decoder ]-[| * |]--[ typefind ]---| demuxer(s) |------------[| | ti | | * | | | (if needed) |---[parser]-[|--| qu | | * | | | |---[parser]-[|--| eu |---[ decoder ]-[| * | | +--------------+ | +------ ^ | * | +---------------------------------------------+ ^ | | * | ^ | | | * +-----------------------------------------------+--------+-------------+--+ * | | | * | | | * Probes --/--------/-------------/ * * ATOMIC SWITCHING * * We want to ensure we re-use decoders when switching streams. This takes place * at the multiqueue output level. * * MAIN CONCEPTS * 1) Activating a stream (i.e. linking a slot to an output) is only done within * the streaming thread in the multiqueue_src_probe() and only if the stream is in the REQUESTED selection. * 2) Deactivating a stream (i.e. unlinking a slot from an output) is also done * within the stream thread, but only in a purposefully called IDLE probe * that calls reassign_slot(). * * Based on those two principles, 3 "selection" of streams (stream-id) are used: * 1) requested_selection * All streams within that list should be activated * 2) active_selection * List of streams that are exposed by decodebin * 3) to_activate * List of streams that will be moved to requested_selection in the * reassign_slot() method (i.e. once a stream was deactivated, and the output * was retargetted) */ GST_DEBUG_CATEGORY_STATIC (decodebin3_debug); #define GST_CAT_DEFAULT decodebin3_debug #define GST_TYPE_DECODEBIN3 (gst_decodebin3_get_type ()) #define EXTRA_DEBUG 1 #define CUSTOM_FINAL_EOS_QUARK _custom_final_eos_quark_get () #define CUSTOM_FINAL_EOS_QUARK_DATA "custom-final-eos" static GQuark _custom_final_eos_quark_get (void) { static gsize g_quark; if (g_once_init_enter (&g_quark)) { gsize quark = (gsize) g_quark_from_static_string ("decodebin3-custom-final-eos"); g_once_init_leave (&g_quark, quark); } return g_quark; } typedef struct _GstDecodebin3 GstDecodebin3; typedef struct _GstDecodebin3Class GstDecodebin3Class; typedef struct _DecodebinInputStream DecodebinInputStream; typedef struct _DecodebinInput DecodebinInput; typedef struct _DecodebinOutputStream DecodebinOutputStream; struct _GstDecodebin3 { GstBin bin; /* input_lock protects the following variables */ GMutex input_lock; /* Main input (static sink pad) */ DecodebinInput *main_input; /* Supplementary input (request sink pads) */ GList *other_inputs; /* counter for input */ guint32 input_counter; /* Current stream group_id (default : GST_GROUP_ID_INVALID) */ /* FIXME : Needs to be resetted appropriately (when upstream changes ?) */ guint32 current_group_id; /* End of variables protected by input_lock */ GstElement *multiqueue; /* selection_lock protects access to following variables */ GMutex selection_lock; GList *input_streams; /* List of DecodebinInputStream for active collection */ GList *output_streams; /* List of DecodebinOutputStream used for output */ GList *slots; /* List of MultiQueueSlot */ guint slot_id; /* Active collection */ GstStreamCollection *collection; /* requested selection of stream-id to activate post-multiqueue */ GList *requested_selection; /* list of stream-id currently activated in output */ GList *active_selection; /* List of stream-id that need to be activated (after a stream switch for ex) */ GList *to_activate; /* Pending select streams event */ guint32 select_streams_seqnum; /* pending list of streams to select (from downstream) */ GList *pending_select_streams; /* TRUE if requested_selection was updated, will become FALSE once * it has fully transitioned to active */ gboolean selection_updated; /* End of variables protected by selection_lock */ /* List of pending collections. * FIXME : Is this really needed ? */ GList *pending_collection; /* Factories */ GMutex factories_lock; guint32 factories_cookie; /* All DECODABLE factories */ GList *factories; /* Only DECODER factories */ GList *decoder_factories; /* DECODABLE but not DECODER factories */ GList *decodable_factories; /* counters for pads */ guint32 apadcount, vpadcount, tpadcount, opadcount; /* Properties */ GstCaps *caps; }; struct _GstDecodebin3Class { GstBinClass class; gint (*select_stream) (GstDecodebin3 * dbin, GstStreamCollection * collection, GstStream * stream); }; /* Input of decodebin, controls input pad and parsebin */ struct _DecodebinInput { GstDecodebin3 *dbin; gboolean is_main; GstPad *ghost_sink; GstPad *parsebin_sink; GstStreamCollection *collection; /* Active collection */ guint group_id; GstElement *parsebin; gulong pad_added_sigid; gulong pad_removed_sigid; gulong drained_sigid; /* TRUE if the input got drained * FIXME : When do we reset it if re-used ? */ gboolean drained; /* HACK : Remove these fields */ /* List of PendingPad structures */ GList *pending_pads; }; /* Multiqueue Slots */ typedef struct _MultiQueueSlot { guint id; GstDecodebin3 *dbin; /* Type of stream handled by this slot */ GstStreamType type; /* Linked input and output */ DecodebinInputStream *input; /* pending => last stream received on sink pad */ GstStream *pending_stream; /* active => last stream outputted on source pad */ GstStream *active_stream; GstPad *sink_pad, *src_pad; /* id of the MQ src_pad event probe */ gulong probe_id; gboolean is_drained; DecodebinOutputStream *output; } MultiQueueSlot; /* Streams that are exposed downstream (i.e. output) */ struct _DecodebinOutputStream { GstDecodebin3 *dbin; /* The type of stream handled by this output stream */ GstStreamType type; /* The slot to which this output stream is currently connected to */ MultiQueueSlot *slot; GstElement *decoder; /* Optional */ GstPad *decoder_sink, *decoder_src; gboolean linked; /* ghostpad */ GstPad *src_pad; /* Flag if ghost pad is exposed */ gboolean src_exposed; /* keyframe dropping probe */ gulong drop_probe_id; }; /* Pending pads from parsebin */ typedef struct _PendingPad { GstDecodebin3 *dbin; DecodebinInput *input; GstPad *pad; gulong buffer_probe; gulong event_probe; gboolean saw_eos; } PendingPad; /* properties */ enum { PROP_0, PROP_CAPS }; /* signals */ enum { SIGNAL_SELECT_STREAM, SIGNAL_ABOUT_TO_FINISH, LAST_SIGNAL }; static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 }; #define SELECTION_LOCK(dbin) G_STMT_START { \ GST_LOG_OBJECT (dbin, \ "selection locking from thread %p", \ g_thread_self ()); \ g_mutex_lock (&dbin->selection_lock); \ GST_LOG_OBJECT (dbin, \ "selection locked from thread %p", \ g_thread_self ()); \ } G_STMT_END #define SELECTION_UNLOCK(dbin) G_STMT_START { \ GST_LOG_OBJECT (dbin, \ "selection unlocking from thread %p", \ g_thread_self ()); \ g_mutex_unlock (&dbin->selection_lock); \ } G_STMT_END #define INPUT_LOCK(dbin) G_STMT_START { \ GST_LOG_OBJECT (dbin, \ "input locking from thread %p", \ g_thread_self ()); \ g_mutex_lock (&dbin->input_lock); \ GST_LOG_OBJECT (dbin, \ "input locked from thread %p", \ g_thread_self ()); \ } G_STMT_END #define INPUT_UNLOCK(dbin) G_STMT_START { \ GST_LOG_OBJECT (dbin, \ "input unlocking from thread %p", \ g_thread_self ()); \ g_mutex_unlock (&dbin->input_lock); \ } G_STMT_END GType gst_decodebin3_get_type (void); #define gst_decodebin3_parent_class parent_class G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN); static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS); static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); static GstStaticPadTemplate request_sink_template = GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK, GST_PAD_REQUEST, GST_STATIC_CAPS_ANY); static GstStaticPadTemplate video_src_template = GST_STATIC_PAD_TEMPLATE ("video_%u", GST_PAD_SRC, GST_PAD_SOMETIMES, GST_STATIC_CAPS_ANY); static GstStaticPadTemplate audio_src_template = GST_STATIC_PAD_TEMPLATE ("audio_%u", GST_PAD_SRC, GST_PAD_SOMETIMES, GST_STATIC_CAPS_ANY); static GstStaticPadTemplate text_src_template = GST_STATIC_PAD_TEMPLATE ("text_%u", GST_PAD_SRC, GST_PAD_SOMETIMES, GST_STATIC_CAPS_ANY); static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u", GST_PAD_SRC, GST_PAD_SOMETIMES, GST_STATIC_CAPS_ANY); static void gst_decodebin3_dispose (GObject * object); static void gst_decodebin3_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); static void gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); static gboolean parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin); static gint gst_decodebin3_select_stream (GstDecodebin3 * dbin, GstStreamCollection * collection, GstStream * stream) { GST_LOG_OBJECT (dbin, "default select-stream, returning -1"); return -1; } static GstPad *gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp, const gchar * name, const GstCaps * caps); static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message); static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element, GstStateChange transition); static gboolean gst_decodebin3_send_event (GstElement * element, GstEvent * event); static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin); #if 0 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps, GstElementFactoryListType ftype); #endif static void free_input (GstDecodebin3 * dbin, DecodebinInput * input); static void free_input_async (GstDecodebin3 * dbin, DecodebinInput * input); static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main); static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id); static void reconfigure_output_stream (DecodebinOutputStream * output, MultiQueueSlot * slot); static void free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output); static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin, GstStreamType type); static GstPadProbeReturn slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot); static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot); static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input); static void link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot); static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot); static void free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot); static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin); static void update_requested_selection (GstDecodebin3 * dbin); /* FIXME: Really make all the parser stuff a self-contained helper object */ #include "gstdecodebin3-parse.c" static gboolean _gst_int_accumulator (GSignalInvocationHint * ihint, GValue * return_accu, const GValue * handler_return, gpointer dummy) { gint res = g_value_get_int (handler_return); if (!(ihint->run_type & G_SIGNAL_RUN_CLEANUP)) g_value_set_int (return_accu, res); if (res == -1) return TRUE; return FALSE; } static void gst_decodebin3_class_init (GstDecodebin3Class * klass) { GObjectClass *gobject_klass = (GObjectClass *) klass; GstElementClass *element_class = (GstElementClass *) klass; GstBinClass *bin_klass = (GstBinClass *) klass; gobject_klass->dispose = gst_decodebin3_dispose; gobject_klass->set_property = gst_decodebin3_set_property; gobject_klass->get_property = gst_decodebin3_get_property; /* FIXME : ADD PROPERTIES ! */ g_object_class_install_property (gobject_klass, PROP_CAPS, g_param_spec_boxed ("caps", "Caps", "The caps on which to stop decoding. (NULL = default)", GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /* FIXME : ADD SIGNALS ! */ /** * GstDecodebin3::select-stream * @decodebin: a #GstDecodebin3 * @collection: a #GstStreamCollection * @stream: a #GstStream * * This signal is emitted whenever @decodebin needs to decide whether * to expose a @stream of a given @collection. * * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected. * A value of -1 (default) lets @decodebin decide what to do with the stream. * */ gst_decodebin3_signals[SIGNAL_SELECT_STREAM] = g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream), _gst_int_accumulator, NULL, g_cclosure_marshal_generic, G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM); /** * GstDecodebin3::about-to-finish: * * This signal is emitted when the data for the selected URI is * entirely buffered and it is safe to specify anothe URI. */ gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH] = g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0, G_TYPE_NONE); element_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad); element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state); element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event); gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&sink_template)); gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&request_sink_template)); gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&video_src_template)); gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&audio_src_template)); gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&text_src_template)); gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&src_template)); gst_element_class_set_static_metadata (element_class, "Decoder Bin 3", "Generic/Bin/Decoder", "Autoplug and decode to raw media", "Edward Hervey "); bin_klass->handle_message = gst_decodebin3_handle_message; klass->select_stream = gst_decodebin3_select_stream; } static void gst_decodebin3_init (GstDecodebin3 * dbin) { /* Create main input */ dbin->main_input = create_new_input (dbin, TRUE); dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL); g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE, "max-size-buffers", 0, "use-interleave", TRUE, NULL); gst_bin_add ((GstBin *) dbin, dbin->multiqueue); dbin->current_group_id = GST_GROUP_ID_INVALID; g_mutex_init (&dbin->factories_lock); g_mutex_init (&dbin->selection_lock); g_mutex_init (&dbin->input_lock); dbin->caps = gst_static_caps_get (&default_raw_caps); GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE); } static void gst_decodebin3_dispose (GObject * object) { GstDecodebin3 *dbin = (GstDecodebin3 *) object; GList *walk, *next; if (dbin->factories) gst_plugin_feature_list_free (dbin->factories); if (dbin->decoder_factories) g_list_free (dbin->decoder_factories); if (dbin->decodable_factories) g_list_free (dbin->decodable_factories); g_list_free_full (dbin->requested_selection, g_free); g_list_free (dbin->active_selection); g_list_free (dbin->to_activate); g_list_free (dbin->pending_select_streams); g_clear_object (&dbin->collection); free_input (dbin, dbin->main_input); for (walk = dbin->other_inputs; walk; walk = next) { DecodebinInput *input = walk->data; next = g_list_next (walk); free_input (dbin, input); dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk); } G_OBJECT_CLASS (parent_class)->dispose (object); } static void gst_decodebin3_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { GstDecodebin3 *dbin = (GstDecodebin3 *) object; /* FIXME : IMPLEMENT */ switch (prop_id) { case PROP_CAPS: GST_OBJECT_LOCK (dbin); if (dbin->caps) gst_caps_unref (dbin->caps); dbin->caps = g_value_dup_boxed (value); GST_OBJECT_UNLOCK (dbin); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } static void gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { GstDecodebin3 *dbin = (GstDecodebin3 *) object; /* FIXME : IMPLEMENT */ switch (prop_id) { case PROP_CAPS: GST_OBJECT_LOCK (dbin); g_value_set_boxed (value, dbin->caps); GST_OBJECT_UNLOCK (dbin); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } static gboolean parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin) { GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps); /* If it matches our target caps, expose it */ if (gst_caps_can_intersect (caps, dbin->caps)) return FALSE; return TRUE; } /* This method should be called whenever a STREAM_START event * comes out of a given parsebin. * The caller shall replace the group_id if the function returns TRUE */ static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id) { GstDecodebin3 *dbin = input->dbin; if (input->group_id != *group_id) { if (input->group_id != GST_GROUP_ID_INVALID) GST_WARNING_OBJECT (dbin, "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT ") on input %p ", input->group_id, *group_id, input); input->group_id = *group_id; } if (*group_id != dbin->current_group_id) { if (dbin->current_group_id == GST_GROUP_ID_INVALID) { GST_DEBUG_OBJECT (dbin, "Setting current group id to %" G_GUINT32_FORMAT, *group_id); dbin->current_group_id = *group_id; } *group_id = dbin->current_group_id; return TRUE; } return FALSE; } static void parsebin_drained_cb (GstElement * parsebin, DecodebinInput * input) { GstDecodebin3 *dbin = input->dbin; gboolean all_drained; GList *tmp; GST_WARNING_OBJECT (dbin, "input %p drained", input); input->drained = TRUE; all_drained = dbin->main_input->drained; for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) { DecodebinInput *data = (DecodebinInput *) tmp->data; all_drained &= data->drained; } if (all_drained) { GST_WARNING_OBJECT (dbin, "All inputs drained. Posting about-to-finish"); g_signal_emit (dbin, gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH], 0, NULL); } } /* Call with INPUT_LOCK taken */ static gboolean ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input) { gboolean set_state = FALSE; if (input->parsebin == NULL) { input->parsebin = gst_element_factory_make ("parsebin", NULL); if (input->parsebin == NULL) goto no_parsebin; input->parsebin = gst_object_ref (input->parsebin); input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink"); input->pad_added_sigid = g_signal_connect (input->parsebin, "pad-added", (GCallback) parsebin_pad_added_cb, input); input->pad_removed_sigid = g_signal_connect (input->parsebin, "pad-removed", (GCallback) parsebin_pad_removed_cb, input); input->drained_sigid = g_signal_connect (input->parsebin, "drained", (GCallback) parsebin_drained_cb, input); g_signal_connect (input->parsebin, "autoplug-continue", (GCallback) parsebin_autoplug_continue_cb, dbin); } if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) { gst_bin_add (GST_BIN (dbin), input->parsebin); set_state = TRUE; } gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), input->parsebin_sink); if (set_state) gst_element_sync_state_with_parent (input->parsebin); return TRUE; /* ERRORS */ no_parsebin: { gst_element_post_message ((GstElement *) dbin, gst_missing_element_message_new ((GstElement *) dbin, "parsebin")); return FALSE; } } static GstPadLinkReturn gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer) { GstDecodebin3 *dbin = (GstDecodebin3 *) parent; GstPadLinkReturn res = GST_PAD_LINK_OK; DecodebinInput *input; GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT ". Creating parsebin if needed", pad); if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL) goto fail; INPUT_LOCK (dbin); if (!ensure_input_parsebin (dbin, input)) res = GST_PAD_LINK_REFUSED; INPUT_UNLOCK (dbin); return res; fail: GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad"); return GST_PAD_LINK_REFUSED; } /* Drop duration query during _input_pad_unlink */ static GstPadProbeReturn query_duration_drop_probe (GstPad * pad, GstPadProbeInfo * info, DecodebinInput * input) { GstPadProbeReturn ret = GST_PAD_PROBE_OK; if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) { GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info); if (GST_QUERY_TYPE (query) == GST_QUERY_DURATION) { GST_LOG_OBJECT (pad, "stop forwarding query duration"); ret = GST_PAD_PROBE_HANDLED; } } return ret; } static void gst_decodebin3_input_pad_unlink (GstPad * pad, GstObject * parent) { GstDecodebin3 *dbin = (GstDecodebin3 *) parent; DecodebinInput *input; GST_LOG_OBJECT (parent, "Got unlink on input pad %" GST_PTR_FORMAT ". Removing parsebin.", pad); if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL) goto fail; INPUT_LOCK (dbin); if (input->parsebin == NULL) { INPUT_UNLOCK (dbin); return; } if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) == GST_OBJECT (dbin)) { GstStreamCollection *collection = NULL; gulong probe_id = gst_pad_add_probe (input->parsebin_sink, GST_PAD_PROBE_TYPE_QUERY_UPSTREAM, (GstPadProbeCallback) query_duration_drop_probe, input, NULL); /* Clear stream-collection corresponding to current INPUT and post new * stream-collection message, if needed */ if (input->collection) { gst_object_unref (input->collection); input->collection = NULL; } SELECTION_LOCK (dbin); collection = get_merged_collection (dbin); if (collection && collection != dbin->collection) { GstMessage *msg; GST_DEBUG_OBJECT (dbin, "Update Stream Collection"); if (dbin->collection) gst_object_unref (dbin->collection); dbin->collection = collection; msg = gst_message_new_stream_collection ((GstObject *) dbin, dbin->collection); SELECTION_UNLOCK (dbin); gst_element_post_message (GST_ELEMENT_CAST (dbin), msg); update_requested_selection (dbin); } else SELECTION_UNLOCK (dbin); gst_bin_remove (GST_BIN (dbin), input->parsebin); gst_element_set_state (input->parsebin, GST_STATE_NULL); g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid); g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid); g_signal_handler_disconnect (input->parsebin, input->drained_sigid); gst_pad_remove_probe (input->parsebin_sink, probe_id); gst_object_unref (input->parsebin); gst_object_unref (input->parsebin_sink); input->parsebin = NULL; input->parsebin_sink = NULL; if (!input->is_main) { dbin->other_inputs = g_list_remove (dbin->other_inputs, input); free_input_async (dbin, input); } } INPUT_UNLOCK (dbin); return; fail: GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad"); return; } static void free_input (GstDecodebin3 * dbin, DecodebinInput * input) { GST_DEBUG ("Freeing input %p", input); gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL); gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink); if (input->parsebin) { g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid); g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid); g_signal_handler_disconnect (input->parsebin, input->drained_sigid); gst_element_set_state (input->parsebin, GST_STATE_NULL); gst_object_unref (input->parsebin); gst_object_unref (input->parsebin_sink); } if (input->collection) gst_object_unref (input->collection); g_free (input); } static void free_input_async (GstDecodebin3 * dbin, DecodebinInput * input) { GST_LOG_OBJECT (dbin, "pushing input %p on thread pool to free", input); gst_element_call_async (GST_ELEMENT_CAST (dbin), (GstElementCallAsyncFunc) free_input, input, NULL); } /* Call with INPUT_LOCK taken */ static DecodebinInput * create_new_input (GstDecodebin3 * dbin, gboolean main) { DecodebinInput *input; input = g_new0 (DecodebinInput, 1); input->dbin = dbin; input->is_main = main; input->group_id = GST_GROUP_ID_INVALID; if (main) input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK); else { gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++); input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK); g_free (pad_name); } g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input); gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link); gst_pad_set_unlink_function (input->ghost_sink, gst_decodebin3_input_pad_unlink); gst_pad_set_active (input->ghost_sink, TRUE); gst_element_add_pad ((GstElement *) dbin, input->ghost_sink); return input; } static GstPad * gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp, const gchar * name, const GstCaps * caps) { GstDecodebin3 *dbin = (GstDecodebin3 *) element; DecodebinInput *input; GstPad *res = NULL; /* We are ignoring names for the time being, not sure it makes any sense * within the context of decodebin3 ... */ input = create_new_input (dbin, FALSE); if (input) { INPUT_LOCK (dbin); dbin->other_inputs = g_list_append (dbin->other_inputs, input); res = input->ghost_sink; INPUT_UNLOCK (dbin); } return res; } /* Must be called with factories lock! */ static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin) { guint cookie; cookie = gst_registry_get_feature_list_cookie (gst_registry_get ()); if (!dbin->factories || dbin->factories_cookie != cookie) { GList *tmp; if (dbin->factories) gst_plugin_feature_list_free (dbin->factories); if (dbin->decoder_factories) g_list_free (dbin->decoder_factories); if (dbin->decodable_factories) g_list_free (dbin->decodable_factories); dbin->factories = gst_element_factory_list_get_elements (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL); dbin->factories = g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func); dbin->factories_cookie = cookie; /* Filter decoder and other decodables */ dbin->decoder_factories = NULL; dbin->decodable_factories = NULL; for (tmp = dbin->factories; tmp; tmp = tmp->next) { GstElementFactory *fact = (GstElementFactory *) tmp->data; if (gst_element_factory_list_is_type (fact, GST_ELEMENT_FACTORY_TYPE_DECODER)) dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact); else dbin->decodable_factories = g_list_append (dbin->decodable_factories, fact); } } } /* Must be called with appropriate lock if list is a protected variable */ static const gchar * stream_in_list (GList * list, const gchar * sid) { GList *tmp; #if EXTRA_DEBUG for (tmp = list; tmp; tmp = tmp->next) { gchar *osid = (gchar *) tmp->data; GST_DEBUG ("Checking %s against %s", sid, osid); } #endif for (tmp = list; tmp; tmp = tmp->next) { const gchar *osid = (gchar *) tmp->data; if (!g_strcmp0 (sid, osid)) return osid; } return NULL; } static void update_requested_selection (GstDecodebin3 * dbin) { guint i, nb; GList *tmp = NULL; GstStreamType used_types = 0; GstStreamCollection *collection; /* 1. Is there a pending SELECT_STREAMS we can return straight away since * the switch handler will take care of the pending selection */ SELECTION_LOCK (dbin); if (dbin->pending_select_streams) { GST_DEBUG_OBJECT (dbin, "No need to create pending selection, SELECT_STREAMS underway"); goto beach; } collection = dbin->collection; if (G_UNLIKELY (collection == NULL)) { GST_DEBUG_OBJECT (dbin, "No current GstStreamCollection"); goto beach; } nb = gst_stream_collection_get_size (collection); /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */ GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE"); /* 3. If not, check if we already have some of the streams in the * existing active/requested selection */ for (i = 0; i < nb; i++) { GstStream *stream = gst_stream_collection_get_stream (collection, i); const gchar *sid = gst_stream_get_stream_id (stream); gint request = -1; /* Fire select-stream signal to see if outside components want to * hint at which streams should be selected */ g_signal_emit (G_OBJECT (dbin), gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream, &request); GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request); if (request == 1 || (request == -1 && (stream_in_list (dbin->requested_selection, sid) || stream_in_list (dbin->active_selection, sid)))) { GstStreamType curtype = gst_stream_get_stream_type (stream); if (request == 1) GST_DEBUG_OBJECT (dbin, "Using stream requested by 'select-stream' signal : %s", sid); else GST_DEBUG_OBJECT (dbin, "Re-using stream already present in requested or active selection : %s", sid); tmp = g_list_append (tmp, (gchar *) sid); used_types |= curtype; } } /* 4. If not, match one stream of each type */ for (i = 0; i < nb; i++) { GstStream *stream = gst_stream_collection_get_stream (collection, i); GstStreamType curtype = gst_stream_get_stream_type (stream); if (!(used_types & curtype)) { const gchar *sid = gst_stream_get_stream_id (stream); GST_DEBUG_OBJECT (dbin, "Selecting stream '%s' of type %s", sid, gst_stream_type_get_name (curtype)); tmp = g_list_append (tmp, (gchar *) sid); used_types |= curtype; } } beach: /* Finally set the requested selection */ if (tmp) { if (dbin->requested_selection) { GST_FIXME_OBJECT (dbin, "Replacing non-NULL requested_selection, what should we do ??"); g_list_free_full (dbin->requested_selection, g_free); } dbin->requested_selection = g_list_copy_deep (tmp, (GCopyFunc) g_strdup, NULL); dbin->selection_updated = TRUE; g_list_free (tmp); } SELECTION_UNLOCK (dbin); } /* sort_streams: * GCompareFunc to use with lists of GstStream. * Sorts GstStreams by stream type and SELECT flag and stream-id * First video, then audio, then others. * * Return: negative if ab */ static gint sort_streams (GstStream * sa, GstStream * sb) { GstStreamType typea, typeb; GstStreamFlags flaga, flagb; const gchar *ida, *idb; gint ret = 0; typea = gst_stream_get_stream_type (sa); typeb = gst_stream_get_stream_type (sb); GST_LOG ("sa(%s), sb(%s)", gst_stream_get_stream_id (sa), gst_stream_get_stream_id (sb)); /* Sort by stream type. First video, then audio, then others(text, container, unknown) */ if (typea != typeb) { if (typea & GST_STREAM_TYPE_VIDEO) ret = -1; else if (typea & GST_STREAM_TYPE_AUDIO) ret = (!(typeb & GST_STREAM_TYPE_VIDEO)) ? -1 : 1; else if (typea & GST_STREAM_TYPE_TEXT) ret = (!(typeb & GST_STREAM_TYPE_VIDEO) && !(typeb & GST_STREAM_TYPE_AUDIO)) ? -1 : 1; else if (typea & GST_STREAM_TYPE_CONTAINER) ret = (typeb & GST_STREAM_TYPE_UNKNOWN) ? -1 : 1; else ret = 1; if (ret != 0) { GST_LOG ("Sort by stream-type: %d", ret); return ret; } } /* Sort by SELECT flag, if stream type is same. */ flaga = gst_stream_get_stream_flags (sa); flagb = gst_stream_get_stream_flags (sb); ret = (flaga & GST_STREAM_FLAG_SELECT) ? ((flagb & GST_STREAM_FLAG_SELECT) ? 0 : -1) : ((flagb & GST_STREAM_FLAG_SELECT) ? 1 : 0); if (ret != 0) { GST_LOG ("Sort by SELECT flag: %d", ret); return ret; } /* Sort by stream-id, if otherwise the same. */ ida = gst_stream_get_stream_id (sa); idb = gst_stream_get_stream_id (sb); ret = g_strcmp0 (ida, idb); GST_LOG ("Sort by stream-id: %d", ret); return ret; } /* Call with INPUT_LOCK taken */ static GstStreamCollection * get_merged_collection (GstDecodebin3 * dbin) { gboolean needs_merge = FALSE; GstStreamCollection *res = NULL; GList *tmp; GList *unsorted_streams = NULL; guint i, nb_stream; /* First check if we need to do a merge or just return the only collection */ res = dbin->main_input->collection; for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) { DecodebinInput *input = (DecodebinInput *) tmp->data; if (input->collection) { if (res) { needs_merge = TRUE; break; } res = input->collection; } } if (!needs_merge) { GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res); return res ? gst_object_ref (res) : NULL; } /* We really need to create a new collection */ /* FIXME : Some numbering scheme maybe ?? */ res = gst_stream_collection_new ("decodebin3"); if (dbin->main_input->collection) { nb_stream = gst_stream_collection_get_size (dbin->main_input->collection); GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream); for (i = 0; i < nb_stream; i++) { GstStream *stream = gst_stream_collection_get_stream (dbin->main_input->collection, i); unsorted_streams = g_list_append (unsorted_streams, stream); } } for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) { DecodebinInput *input = (DecodebinInput *) tmp->data; GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input, input->collection); if (input->collection) { nb_stream = gst_stream_collection_get_size (input->collection); GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream); for (i = 0; i < nb_stream; i++) { GstStream *stream = gst_stream_collection_get_stream (input->collection, i); unsorted_streams = g_list_append (unsorted_streams, stream); } } } /* re-order streams : video, then audio, then others */ unsorted_streams = g_list_sort (unsorted_streams, (GCompareFunc) sort_streams); for (tmp = unsorted_streams; tmp; tmp = tmp->next) { GstStream *stream = (GstStream *) tmp->data; GST_DEBUG_OBJECT (dbin, "Adding #stream(%s) to collection", gst_stream_get_stream_id (stream)); gst_stream_collection_add_stream (res, gst_object_ref (stream)); } if (unsorted_streams) g_list_free (unsorted_streams); return res; } /* Call with INPUT_LOCK taken */ static DecodebinInput * find_message_parsebin (GstDecodebin3 * dbin, GstElement * child) { DecodebinInput *input = NULL; GstElement *parent = gst_object_ref (child); GList *tmp; do { GstElement *next_parent; GST_DEBUG_OBJECT (dbin, "parent %s", parent ? GST_ELEMENT_NAME (parent) : ""); if (parent == dbin->main_input->parsebin) { input = dbin->main_input; break; } for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) { DecodebinInput *cur = (DecodebinInput *) tmp->data; if (parent == cur->parsebin) { input = cur; break; } } next_parent = (GstElement *) gst_element_get_parent (parent); gst_object_unref (parent); parent = next_parent; } while (parent && parent != (GstElement *) dbin); if (parent) gst_object_unref (parent); return input; } static const gchar * stream_in_collection (GstDecodebin3 * dbin, gchar * sid) { guint i, len; if (dbin->collection == NULL) return NULL; len = gst_stream_collection_get_size (dbin->collection); for (i = 0; i < len; i++) { GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i); const gchar *osid = gst_stream_get_stream_id (stream); if (!g_strcmp0 (sid, osid)) return osid; } return NULL; } /* Call with INPUT_LOCK taken */ static void handle_stream_collection (GstDecodebin3 * dbin, GstStreamCollection * collection, GstElement * child) { #ifndef GST_DISABLE_GST_DEBUG const gchar *upstream_id; guint i; #endif DecodebinInput *input = find_message_parsebin (dbin, child); if (!input) { GST_DEBUG_OBJECT (dbin, "Couldn't find corresponding input, most likely shutting down"); return; } /* Replace collection in input */ if (input->collection) gst_object_unref (input->collection); input->collection = gst_object_ref (collection); GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection, input); /* Merge collection if needed */ collection = get_merged_collection (dbin); #ifndef GST_DISABLE_GST_DEBUG /* Just some debugging */ upstream_id = gst_stream_collection_get_upstream_id (collection); GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id); GST_DEBUG ("From input %p", input); GST_DEBUG (" %d streams", gst_stream_collection_get_size (collection)); for (i = 0; i < gst_stream_collection_get_size (collection); i++) { GstStream *stream = gst_stream_collection_get_stream (collection, i); GstTagList *taglist; GstCaps *caps; GST_DEBUG (" Stream '%s'", gst_stream_get_stream_id (stream)); GST_DEBUG (" type : %s", gst_stream_type_get_name (gst_stream_get_stream_type (stream))); GST_DEBUG (" flags : 0x%x", gst_stream_get_stream_flags (stream)); taglist = gst_stream_get_tags (stream); GST_DEBUG (" tags : %" GST_PTR_FORMAT, taglist); caps = gst_stream_get_caps (stream); GST_DEBUG (" caps : %" GST_PTR_FORMAT, caps); if (taglist) gst_tag_list_unref (taglist); if (caps) gst_caps_unref (caps); } #endif /* Store collection for later usage */ SELECTION_LOCK (dbin); if (dbin->collection == NULL) { dbin->collection = collection; } else { /* We need to check who emitted this collection (the owner). * If we already had a collection from that user, this one is an update, * that is to say that we need to figure out how we are going to re-use * the streams/slot */ GST_FIXME_OBJECT (dbin, "New collection but already had one ..."); /* FIXME : When do we switch from pending collection to active collection ? * When all streams from active collection are drained in multiqueue output ? */ gst_object_unref (dbin->collection); dbin->collection = collection; /* dbin->pending_collection = */ /* g_list_append (dbin->pending_collection, collection); */ } SELECTION_UNLOCK (dbin); } static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message) { GstDecodebin3 *dbin = (GstDecodebin3 *) bin; gboolean posting_collection = FALSE; GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message)); switch (GST_MESSAGE_TYPE (message)) { case GST_MESSAGE_STREAM_COLLECTION: { GstStreamCollection *collection = NULL; gst_message_parse_stream_collection (message, &collection); if (collection) { INPUT_LOCK (dbin); handle_stream_collection (dbin, collection, (GstElement *) GST_MESSAGE_SRC (message)); posting_collection = TRUE; INPUT_UNLOCK (dbin); } SELECTION_LOCK (dbin); if (dbin->collection && collection != dbin->collection) { /* Replace collection message, we most likely aggregated it */ GstMessage *new_msg; new_msg = gst_message_new_stream_collection ((GstObject *) dbin, dbin->collection); gst_message_unref (message); message = new_msg; } SELECTION_UNLOCK (dbin); if (collection) gst_object_unref (collection); break; } default: break; } GST_BIN_CLASS (parent_class)->handle_message (bin, message); if (posting_collection) { /* Figure out a selection for that collection */ update_requested_selection (dbin); } } static DecodebinOutputStream * find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream) { GList *tmp; GstStreamType stype = gst_stream_get_stream_type (stream); for (tmp = dbin->output_streams; tmp; tmp = tmp->next) { DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data; if (output->type == stype && output->slot && output->slot->active_stream) { GstStream *tstream = output->slot->active_stream; if (!stream_in_list (dbin->requested_selection, (gchar *) gst_stream_get_stream_id (tstream))) { return output; } } } return NULL; } /* Give a certain slot, figure out if it should be linked to an * output stream * CALL WITH SELECTION LOCK TAKEN !*/ static DecodebinOutputStream * get_output_for_slot (MultiQueueSlot * slot) { GstDecodebin3 *dbin = slot->dbin; DecodebinOutputStream *output = NULL; const gchar *stream_id; GstCaps *caps; gchar *id_in_list = NULL; /* If we already have a configured output, just use it */ if (slot->output != NULL) return slot->output; /* * FIXME * * This method needs to be split into multiple parts * * 1) Figure out whether stream should be exposed or not * This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence * in the default stream attribution * * 2) Figure out whether an output stream should be created, whether * we can re-use the output stream already linked to the slot, or * whether we need to get re-assigned another (currently used) output * stream. */ stream_id = gst_stream_get_stream_id (slot->active_stream); caps = gst_stream_get_caps (slot->active_stream); GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps); gst_caps_unref (caps); /* 0. Emit autoplug-continue signal for pending caps ? */ GST_FIXME_OBJECT (dbin, "emit autoplug-continue"); /* 1. if in EXPOSE_ALL_MODE, just accept */ GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE"); #if 0 /* FIXME : The idea around this was to avoid activating a stream for * which we have no decoder. Unfortunately it is way too * expensive. Need to figure out a better solution */ /* 2. Is there a potential decoder (if one is required) */ if (!gst_caps_can_intersect (caps, dbin->caps) && !have_factory (dbin, (GstCaps *) caps, GST_ELEMENT_FACTORY_TYPE_DECODER)) { GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT, caps); SELECTION_UNLOCK (dbin); gst_element_post_message (GST_ELEMENT_CAST (dbin), gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps)); SELECTION_LOCK (dbin); return NULL; } #endif /* 3. In default mode check if we should expose */ id_in_list = (gchar *) stream_in_list (dbin->requested_selection, stream_id); if (id_in_list) { /* Check if we can steal an existing output stream we could re-use. * that is: * * an output stream whose slot->stream is not in requested * * and is of the same type as this stream */ output = find_free_compatible_output (dbin, slot->active_stream); if (output) { /* Move this output from its current slot to this slot */ dbin->to_activate = g_list_append (dbin->to_activate, (gchar *) stream_id); dbin->requested_selection = g_list_remove (dbin->requested_selection, id_in_list); g_free (id_in_list); SELECTION_UNLOCK (dbin); gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE, (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL); SELECTION_LOCK (dbin); return NULL; } output = create_output_stream (dbin, slot->type); output->slot = slot; GST_DEBUG ("Linking slot %p to new output %p", slot, output); slot->output = output; dbin->active_selection = g_list_append (dbin->active_selection, (gchar *) stream_id); } else GST_DEBUG ("Not creating any output for slot %p", slot); return output; } /* Returns SELECTED_STREAMS message if active_selection is equal to * requested_selection, else NULL. * Must be called with LOCK taken */ static GstMessage * is_selection_done (GstDecodebin3 * dbin) { GList *tmp; GstMessage *msg; if (!dbin->selection_updated) return NULL; GST_LOG_OBJECT (dbin, "Checking"); if (dbin->to_activate != NULL) { GST_DEBUG ("Still have streams to activate"); return NULL; } for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) { GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data); if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) { GST_DEBUG ("Not in active selection, returning"); return NULL; } } GST_DEBUG_OBJECT (dbin, "Selection active, creating message"); /* We are completely active */ msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection); GST_MESSAGE_SEQNUM (msg) = dbin->select_streams_seqnum; for (tmp = dbin->output_streams; tmp; tmp = tmp->next) { DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data; if (output->slot) { GST_DEBUG_OBJECT (dbin, "Adding stream %s", gst_stream_get_stream_id (output->slot->active_stream)); gst_message_streams_selected_add (msg, output->slot->active_stream); } else GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output); } dbin->selection_updated = FALSE; return msg; } /* Must be called with SELECTION_LOCK taken */ static void check_all_slot_for_eos (GstDecodebin3 * dbin) { gboolean all_drained = TRUE; GList *iter; GST_DEBUG_OBJECT (dbin, "check slot for eos"); for (iter = dbin->slots; iter; iter = iter->next) { MultiQueueSlot *slot = iter->data; if (!slot->output) continue; if (slot->is_drained) { GST_LOG_OBJECT (slot->sink_pad, "slot %p is drained", slot); continue; } all_drained = FALSE; break; } if (all_drained) { INPUT_LOCK (dbin); if (!pending_pads_are_eos (dbin->main_input)) all_drained = FALSE; if (all_drained) { for (iter = dbin->other_inputs; iter; iter = iter->next) { if (!pending_pads_are_eos ((DecodebinInput *) iter->data)) { all_drained = FALSE; break; } } } INPUT_UNLOCK (dbin); } if (all_drained) { GST_DEBUG_OBJECT (dbin, "All active slots are drained, and no pending input, push EOS"); for (iter = dbin->input_streams; iter; iter = iter->next) { DecodebinInputStream *input = (DecodebinInputStream *) iter->data; GstPad *peer = gst_pad_get_peer (input->srcpad); /* Send EOS to all slots */ if (peer) { GstEvent *stream_start, *eos; stream_start = gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0); /* First forward a custom STREAM_START event to reset the EOS status (if any) */ if (stream_start) { GstStructure *s; GstEvent *custom_stream_start = gst_event_copy (stream_start); gst_event_unref (stream_start); s = (GstStructure *) gst_event_get_structure (custom_stream_start); gst_structure_set (s, "decodebin3-flushing-stream-start", G_TYPE_BOOLEAN, TRUE, NULL); gst_pad_send_event (peer, custom_stream_start); } eos = gst_event_new_eos (); gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (eos), CUSTOM_FINAL_EOS_QUARK, (gchar *) CUSTOM_FINAL_EOS_QUARK_DATA, NULL); gst_pad_send_event (peer, eos); gst_object_unref (peer); } else GST_DEBUG_OBJECT (dbin, "no output"); } } } static GstPadProbeReturn multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot) { GstPadProbeReturn ret = GST_PAD_PROBE_OK; GstDecodebin3 *dbin = slot->dbin; if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) { GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info); GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev)); switch (GST_EVENT_TYPE (ev)) { case GST_EVENT_STREAM_START: { GstStream *stream = NULL; const GstStructure *s = gst_event_get_structure (ev); /* Drop STREAM_START events used to cleanup multiqueue */ if (s && gst_structure_has_field (s, "decodebin3-flushing-stream-start")) { ret = GST_PAD_PROBE_HANDLED; gst_event_unref (ev); break; } gst_event_parse_stream (ev, &stream); if (stream == NULL) { GST_ERROR_OBJECT (pad, "Got a STREAM_START event without a GstStream"); break; } slot->is_drained = FALSE; GST_DEBUG_OBJECT (pad, "Stream Start '%s'", gst_stream_get_stream_id (stream)); if (slot->active_stream == NULL) { slot->active_stream = stream; } else if (slot->active_stream != stream) { GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !", gst_stream_get_stream_id (slot->active_stream), gst_stream_get_stream_id (stream)); gst_object_unref (slot->active_stream); slot->active_stream = stream; } else gst_object_unref (stream); #if 0 /* Disabled because stream-start is pushed for every buffer on every unlinked pad */ { gboolean is_active, is_requested; /* Quick check to see if we're in the current selection */ /* FIXME : Re-check all slot<=>output mappings based on requested_selection */ SELECTION_LOCK (dbin); GST_DEBUG_OBJECT (dbin, "Checking active selection"); is_active = stream_in_list (dbin->active_selection, stream_id); GST_DEBUG_OBJECT (dbin, "Checking requested selection"); is_requested = stream_in_list (dbin->requested_selection, stream_id); SELECTION_UNLOCK (dbin); if (is_active) GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)", slot->output); if (is_requested) GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)", slot->output); else if (slot->output) { GST_DEBUG_OBJECT (pad, "Slot needs to be deactivated ? It's no longer in requested selection"); } else if (!is_active) GST_DEBUG_OBJECT (pad, "Slot in neither active nor requested selection"); } #endif } break; case GST_EVENT_CAPS: { /* Configure the output slot if needed */ DecodebinOutputStream *output; GstMessage *msg = NULL; SELECTION_LOCK (dbin); output = get_output_for_slot (slot); if (output) { reconfigure_output_stream (output, slot); msg = is_selection_done (dbin); } SELECTION_UNLOCK (dbin); if (msg) gst_element_post_message ((GstElement *) slot->dbin, msg); } break; case GST_EVENT_EOS: { gboolean was_drained = slot->is_drained; slot->is_drained = TRUE; /* Custom EOS handling first */ if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev), CUSTOM_EOS_QUARK)) { /* remove custom-eos */ gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev), CUSTOM_EOS_QUARK, NULL, NULL); GST_LOG_OBJECT (pad, "Received custom EOS"); ret = GST_PAD_PROBE_HANDLED; SELECTION_LOCK (dbin); if (slot->input == NULL) { GST_DEBUG_OBJECT (pad, "Got custom-eos from null input stream, remove output stream"); /* Remove the output */ if (slot->output) { DecodebinOutputStream *output = slot->output; dbin->output_streams = g_list_remove (dbin->output_streams, output); free_output_stream (dbin, output); } slot->probe_id = 0; dbin->slots = g_list_remove (dbin->slots, slot); free_multiqueue_slot_async (dbin, slot); ret = GST_PAD_PROBE_REMOVE; } else if (!was_drained) { check_all_slot_for_eos (dbin); } if (ret == GST_PAD_PROBE_HANDLED) gst_event_unref (ev); SELECTION_UNLOCK (dbin); break; } GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p", slot->input); if (slot->input == NULL) { GstPad *peer; GST_DEBUG_OBJECT (pad, "last EOS for input, forwarding and removing slot"); peer = gst_pad_get_peer (pad); if (peer) { gst_pad_send_event (peer, ev); gst_object_unref (peer); } else { gst_event_unref (ev); } SELECTION_LOCK (dbin); /* FIXME : Shouldn't we try to re-assign the output instead of just * removing it ? */ /* Remove the output */ if (slot->output) { DecodebinOutputStream *output = slot->output; dbin->output_streams = g_list_remove (dbin->output_streams, output); free_output_stream (dbin, output); } slot->probe_id = 0; dbin->slots = g_list_remove (dbin->slots, slot); SELECTION_UNLOCK (dbin); free_multiqueue_slot_async (dbin, slot); ret = GST_PAD_PROBE_REMOVE; } else if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev), CUSTOM_FINAL_EOS_QUARK)) { GST_DEBUG_OBJECT (pad, "Got final eos, propagating downstream"); } else { GST_DEBUG_OBJECT (pad, "Got regular eos (all_inputs_are_eos)"); /* drop current event as eos will be sent in check_all_slot_for_eos * when all output streams are also eos */ ret = GST_PAD_PROBE_DROP; SELECTION_LOCK (dbin); check_all_slot_for_eos (dbin); SELECTION_UNLOCK (dbin); } } break; default: break; } } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) { GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info); switch (GST_QUERY_TYPE (query)) { case GST_QUERY_CAPS: { GST_DEBUG_OBJECT (pad, "Intercepting CAPS query"); gst_query_set_caps_result (query, GST_CAPS_ANY); ret = GST_PAD_PROBE_HANDLED; } break; case GST_QUERY_ACCEPT_CAPS: { GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query"); /* If the current decoder doesn't accept caps, we'll reconfigure * on the actual caps event. So accept any caps. */ gst_query_set_accept_caps_result (query, TRUE); ret = GST_PAD_PROBE_HANDLED; } default: break; } } return ret; } /* Create a new multiqueue slot for the given type * * It is up to the caller to know whether that slot is needed or not * (and release it when no longer needed) */ static MultiQueueSlot * create_new_slot (GstDecodebin3 * dbin, GstStreamType type) { MultiQueueSlot *slot; GstIterator *it = NULL; GValue item = { 0, }; GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s", gst_stream_type_get_name (type)); slot = g_new0 (MultiQueueSlot, 1); slot->dbin = dbin; slot->id = dbin->slot_id++; slot->type = type; slot->sink_pad = gst_element_get_request_pad (dbin->multiqueue, "sink_%u"); if (slot->sink_pad == NULL) goto fail; it = gst_pad_iterate_internal_links (slot->sink_pad); if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) { GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s", GST_DEBUG_PAD_NAME (slot->src_pad)); goto fail; } gst_iterator_free (it); g_value_reset (&item); g_object_set (slot->sink_pad, "group-id", (guint) type, NULL); /* Add event probe */ slot->probe_id = gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, (GstPadProbeCallback) multiqueue_src_probe, slot, NULL); GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot, GST_DEBUG_PAD_NAME (slot->src_pad)); dbin->slots = g_list_append (dbin->slots, slot); return slot; /* ERRORS */ fail: { if (slot->sink_pad) gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad); g_free (slot); return NULL; } } /* Must be called with SELECTION_LOCK */ static MultiQueueSlot * get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input) { GList *tmp; MultiQueueSlot *empty_slot = NULL; GstStreamType input_type = 0; gchar *stream_id = NULL; GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)", input, input->active_stream, input-> active_stream ? gst_stream_get_stream_id (input->active_stream) : ""); if (input->active_stream) { input_type = gst_stream_get_stream_type (input->active_stream); stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream); } /* Go over existing slots and check if there is already one for it */ for (tmp = dbin->slots; tmp; tmp = tmp->next) { MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data; /* Already used input, return that one */ if (slot->input == input) { GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id); return slot; } } /* Go amongst all unused slots of the right type and try to find a candidate */ for (tmp = dbin->slots; tmp; tmp = tmp->next) { MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data; if (slot->input == NULL && input_type == slot->type) { /* Remember this empty slot for later */ empty_slot = slot; /* Check if available slot is of the same stream_id */ GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)", slot->id, slot->active_stream); if (stream_id && slot->active_stream) { gchar *ostream_id = (gchar *) gst_stream_get_stream_id (slot->active_stream); GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id, ostream_id, stream_id); if (!g_strcmp0 (stream_id, ostream_id)) break; } } } if (empty_slot) { GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id); empty_slot->input = input; return empty_slot; } if (input_type) return create_new_slot (dbin, input_type); return NULL; } static void link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot) { if (slot->input != NULL && slot->input != input) { GST_ERROR_OBJECT (slot->dbin, "Trying to link input to an already used slot"); return; } gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING); slot->pending_stream = input->active_stream; slot->input = input; } #if 0 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps, GstElementFactoryListType ftype) { gboolean ret = FALSE; GList *res; g_mutex_lock (&dbin->factories_lock); gst_decode_bin_update_factories_list (dbin); if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER) res = gst_element_factory_list_filter (dbin->decoder_factories, caps, GST_PAD_SINK, TRUE); else res = gst_element_factory_list_filter (dbin->decodable_factories, caps, GST_PAD_SINK, TRUE); g_mutex_unlock (&dbin->factories_lock); if (res) { ret = TRUE; gst_plugin_feature_list_free (res); } return ret; } #endif static GstElement * create_element (GstDecodebin3 * dbin, GstStream * stream, GstElementFactoryListType ftype) { GList *res; GstElement *element = NULL; GstCaps *caps; g_mutex_lock (&dbin->factories_lock); gst_decode_bin_update_factories_list (dbin); caps = gst_stream_get_caps (stream); if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER) res = gst_element_factory_list_filter (dbin->decoder_factories, caps, GST_PAD_SINK, TRUE); else res = gst_element_factory_list_filter (dbin->decodable_factories, caps, GST_PAD_SINK, TRUE); g_mutex_unlock (&dbin->factories_lock); if (res) { element = gst_element_factory_create ((GstElementFactory *) res->data, NULL); GST_DEBUG ("Created element '%s'", GST_ELEMENT_NAME (element)); gst_plugin_feature_list_free (res); } else { GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT, caps); } gst_caps_unref (caps); return element; } /* FIXME : VERY NAIVE. ASSUMING FIRST ONE WILL WORK */ static GstElement * create_decoder (GstDecodebin3 * dbin, GstStream * stream) { return create_element (dbin, stream, GST_ELEMENT_FACTORY_TYPE_DECODER); } static GstPadProbeReturn keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info, DecodebinOutputStream * output) { GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info); /* If we have a keyframe, remove the probe and let all data through */ /* FIXME : HANDLE HEADER BUFFER ?? */ if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) || GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) { GST_DEBUG_OBJECT (pad, "Buffer is keyframe or header, letting through and removing probe"); output->drop_probe_id = 0; return GST_PAD_PROBE_REMOVE; } GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping"); return GST_PAD_PROBE_DROP; } static void reconfigure_output_stream (DecodebinOutputStream * output, MultiQueueSlot * slot) { GstDecodebin3 *dbin = output->dbin; GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream); gboolean needs_decoder; needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE; GST_DEBUG_OBJECT (dbin, "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot, needs_decoder); /* FIXME : Maybe make the output un-hook itself automatically ? */ if (output->slot != NULL && output->slot != slot) { GST_WARNING_OBJECT (dbin, "Output still linked to another slot (%p)", output->slot); gst_caps_unref (new_caps); return; } /* Check if existing config is reusable as-is by checking if * the existing decoder accepts the new caps, if not delete * it and create a new one */ if (output->decoder) { gboolean can_reuse_decoder; if (needs_decoder) { can_reuse_decoder = gst_pad_query_accept_caps (output->decoder_sink, new_caps); } else can_reuse_decoder = FALSE; if (can_reuse_decoder) { if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) { GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe"); output->drop_probe_id = gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER, (GstPadProbeCallback) keyframe_waiter_probe, output, NULL); } GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot); if (output->linked == FALSE) { gst_pad_link_full (slot->src_pad, output->decoder_sink, GST_PAD_LINK_CHECK_NOTHING); output->linked = TRUE; } gst_caps_unref (new_caps); return; } GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot); if (output->linked) gst_pad_unlink (slot->src_pad, output->decoder_sink); output->linked = FALSE; if (output->drop_probe_id) { gst_pad_remove_probe (slot->src_pad, output->drop_probe_id); output->drop_probe_id = 0; } if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) { GST_ERROR_OBJECT (dbin, "Could not release decoder pad"); gst_caps_unref (new_caps); goto cleanup; } gst_element_set_locked_state (output->decoder, TRUE); gst_element_set_state (output->decoder, GST_STATE_NULL); gst_bin_remove ((GstBin *) dbin, output->decoder); output->decoder = NULL; } gst_caps_unref (new_caps); gst_object_replace ((GstObject **) & output->decoder_sink, NULL); gst_object_replace ((GstObject **) & output->decoder_src, NULL); /* If a decoder is required, create one */ if (needs_decoder) { /* If we don't have a decoder yet, instantiate one */ output->decoder = create_decoder (dbin, slot->active_stream); if (output->decoder == NULL) { GstCaps *caps; SELECTION_UNLOCK (dbin); /* FIXME : Should we be smarter if there's a missing decoder ? * Should we deactivate that stream ? */ caps = gst_stream_get_caps (slot->active_stream); gst_element_post_message (GST_ELEMENT_CAST (dbin), gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps)); gst_caps_unref (caps); SELECTION_LOCK (dbin); goto cleanup; } if (!gst_bin_add ((GstBin *) dbin, output->decoder)) { GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline"); goto cleanup; } output->decoder_sink = gst_element_get_static_pad (output->decoder, "sink"); output->decoder_src = gst_element_get_static_pad (output->decoder, "src"); if (output->type & GST_STREAM_TYPE_VIDEO) { GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe"); output->drop_probe_id = gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER, (GstPadProbeCallback) keyframe_waiter_probe, output, NULL); } if (gst_pad_link_full (slot->src_pad, output->decoder_sink, GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) { GST_ERROR_OBJECT (dbin, "could not link to %s:%s", GST_DEBUG_PAD_NAME (output->decoder_sink)); goto cleanup; } } else { output->decoder_src = gst_object_ref (slot->src_pad); output->decoder_sink = NULL; } output->linked = TRUE; if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, output->decoder_src)) { GST_ERROR_OBJECT (dbin, "Could not expose decoder pad"); goto cleanup; } if (output->src_exposed == FALSE) { output->src_exposed = TRUE; gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad); } if (output->decoder) gst_element_sync_state_with_parent (output->decoder); output->slot = slot; return; cleanup: { GST_DEBUG_OBJECT (dbin, "Cleanup"); if (output->decoder_sink) { gst_object_unref (output->decoder_sink); output->decoder_sink = NULL; } if (output->decoder_src) { gst_object_unref (output->decoder_src); output->decoder_src = NULL; } if (output->decoder) { gst_element_set_state (output->decoder, GST_STATE_NULL); gst_bin_remove ((GstBin *) dbin, output->decoder); output->decoder = NULL; } } } static GstPadProbeReturn idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot) { GstMessage *msg = NULL; DecodebinOutputStream *output; SELECTION_LOCK (slot->dbin); output = get_output_for_slot (slot); GST_DEBUG_OBJECT (pad, "output : %p", output); if (output) { reconfigure_output_stream (output, slot); msg = is_selection_done (slot->dbin); } SELECTION_UNLOCK (slot->dbin); if (msg) gst_element_post_message ((GstElement *) slot->dbin, msg); return GST_PAD_PROBE_REMOVE; } static MultiQueueSlot * find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid) { GList *tmp; for (tmp = dbin->slots; tmp; tmp = tmp->next) { MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data; const gchar *stream_id; if (slot->active_stream) { stream_id = gst_stream_get_stream_id (slot->active_stream); if (!g_strcmp0 (sid, stream_id)) return slot; } if (slot->pending_stream && slot->pending_stream != slot->active_stream) { stream_id = gst_stream_get_stream_id (slot->pending_stream); if (!g_strcmp0 (sid, stream_id)) return slot; } } return NULL; } /* This function handles the reassignment of a slot. Call this from * the streaming thread of a slot. */ static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot) { DecodebinOutputStream *output; MultiQueueSlot *target_slot = NULL; GList *tmp; const gchar *sid, *tsid; SELECTION_LOCK (dbin); output = slot->output; if (G_UNLIKELY (slot->active_stream == NULL)) { GST_DEBUG_OBJECT (slot->src_pad, "Called on inactive slot (active_stream == NULL)"); SELECTION_UNLOCK (dbin); return FALSE; } if (G_UNLIKELY (output == NULL)) { GST_DEBUG_OBJECT (slot->src_pad, "Slot doesn't have any output to be removed"); SELECTION_UNLOCK (dbin); return FALSE; } sid = gst_stream_get_stream_id (slot->active_stream); GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot); /* Recheck whether this stream is still in the list of streams to deactivate */ if (stream_in_list (dbin->requested_selection, sid)) { /* Stream is in the list of requested streams, don't remove */ SELECTION_UNLOCK (dbin); GST_DEBUG_OBJECT (slot->src_pad, "Stream '%s' doesn't need to be deactivated", sid); return FALSE; } /* Unlink slot from output */ /* FIXME : Handle flushing ? */ /* FIXME : Handle outputs without decoders */ GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p", output->decoder_sink); if (output->decoder_sink) gst_pad_unlink (slot->src_pad, output->decoder_sink); output->linked = FALSE; slot->output = NULL; output->slot = NULL; /* Remove sid from active selection */ for (tmp = dbin->active_selection; tmp; tmp = tmp->next) if (!g_strcmp0 (sid, tmp->data)) { dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp); break; } /* Can we re-assign this output to a requested stream ? */ GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream"); for (tmp = dbin->to_activate; tmp; tmp = tmp->next) { MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data); GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)", tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream)); if (tslot && tslot->type == output->type && tslot->output == NULL) { GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot"); target_slot = tslot; tsid = tmp->data; /* Pass target stream id to requested selection */ dbin->requested_selection = g_list_append (dbin->requested_selection, g_strdup (tmp->data)); dbin->to_activate = g_list_remove (dbin->to_activate, tmp->data); break; } } if (target_slot) { GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'", target_slot, tsid); target_slot->output = output; output->slot = target_slot; dbin->active_selection = g_list_append (dbin->active_selection, (gchar *) tsid); SELECTION_UNLOCK (dbin); /* Wakeup the target slot so that it retries to send events/buffers * thereby triggering the output reconfiguration codepath */ gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE, (GstPadProbeCallback) idle_reconfigure, target_slot, NULL); /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */ } else { GstMessage *msg; dbin->output_streams = g_list_remove (dbin->output_streams, output); free_output_stream (dbin, output); msg = is_selection_done (slot->dbin); SELECTION_UNLOCK (dbin); if (msg) gst_element_post_message ((GstElement *) slot->dbin, msg); } return TRUE; } /* Idle probe called when a slot should be unassigned from its output stream. * This is needed to ensure nothing is flowing when unlinking the slot. * * Also, this method will search for a pending stream which could re-use * the output stream. */ static GstPadProbeReturn slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot) { GstDecodebin3 *dbin = slot->dbin; reassign_slot (dbin, slot); return GST_PAD_PROBE_REMOVE; } static gboolean handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams, guint32 seqnum) { gboolean ret = TRUE; GList *tmp; /* List of slots to (de)activate. */ GList *to_deactivate = NULL; GList *to_activate = NULL; /* List of unknown stream id, most likely means the event * should be sent upstream so that elements can expose the requested stream */ GList *unknown = NULL; GList *to_reassign = NULL; GList *future_request_streams = NULL; GList *pending_streams = NULL; GList *slots_to_reassign = NULL; SELECTION_LOCK (dbin); if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) { GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime"); SELECTION_UNLOCK (dbin); return TRUE; } /* Remove pending select_streams */ g_list_free (dbin->pending_select_streams); dbin->pending_select_streams = NULL; /* COMPARE the requested streams to the active and requested streams * on multiqueue. */ /* First check the slots to activate and which ones are unknown */ for (tmp = select_streams; tmp; tmp = tmp->next) { const gchar *sid = (const gchar *) tmp->data; MultiQueueSlot *slot; GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid); slot = find_slot_for_stream_id (dbin, sid); /* Find the corresponding slot */ if (slot == NULL) { if (stream_in_collection (dbin, (gchar *) sid)) { pending_streams = g_list_append (pending_streams, (gchar *) sid); } else { GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid); unknown = g_list_append (unknown, (gchar *) sid); } } else if (slot->output == NULL) { GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')", slot, sid); to_activate = g_list_append (to_activate, slot); } else { GST_DEBUG_OBJECT (dbin, "Stream '%s' from slot %p is already active on output %p", sid, slot, slot->output); future_request_streams = g_list_append (future_request_streams, (gchar *) sid); } } for (tmp = dbin->slots; tmp; tmp = tmp->next) { MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data; /* For slots that have an output, check if it's part of the streams to * be active */ if (slot->output) { gboolean slot_to_deactivate = TRUE; if (slot->active_stream) { if (stream_in_list (select_streams, gst_stream_get_stream_id (slot->active_stream))) slot_to_deactivate = FALSE; } if (slot_to_deactivate && slot->pending_stream && slot->pending_stream != slot->active_stream) { if (stream_in_list (select_streams, gst_stream_get_stream_id (slot->pending_stream))) slot_to_deactivate = FALSE; } if (slot_to_deactivate) { GST_DEBUG_OBJECT (dbin, "Slot %p (%s) should be deactivated, no longer used", slot, slot-> active_stream ? gst_stream_get_stream_id (slot->active_stream) : "NULL"); to_deactivate = g_list_append (to_deactivate, slot); } } } if (to_deactivate != NULL) { GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots"); /* We need to compare what needs to be activated and deactivated in order * to determine whether there are outputs that can be transferred */ /* Take the stream-id of the slots that are to be activated, for which there * is a slot of the same type that needs to be deactivated */ tmp = to_deactivate; while (tmp) { MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data; gboolean removeit = FALSE; GList *tmp2, *next; GST_DEBUG_OBJECT (dbin, "Checking if slot to deactivate (%p) has a candidate slot to activate", slot_to_deactivate); for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) { MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data; GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate); if (slot_to_activate->type == slot_to_deactivate->type) { GST_DEBUG_OBJECT (dbin, "Re-using"); to_reassign = g_list_append (to_reassign, (gchar *) gst_stream_get_stream_id (slot_to_activate->active_stream)); slots_to_reassign = g_list_append (slots_to_reassign, slot_to_deactivate); to_activate = g_list_remove (to_activate, slot_to_activate); removeit = TRUE; break; } } next = tmp->next; if (removeit) to_deactivate = g_list_delete_link (to_deactivate, tmp); tmp = next; } } for (tmp = to_deactivate; tmp; tmp = tmp->next) { MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data; GST_DEBUG_OBJECT (dbin, "Really need to deactivate slot %p, but no available alternative", slot); slots_to_reassign = g_list_append (slots_to_reassign, slot); } /* The only slots left to activate are the ones that won't be reassigned and * therefore really need to have a new output created */ for (tmp = to_activate; tmp; tmp = tmp->next) { MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data; if (slot->active_stream) future_request_streams = g_list_append (future_request_streams, (gchar *) gst_stream_get_stream_id (slot->active_stream)); else if (slot->pending_stream) future_request_streams = g_list_append (future_request_streams, (gchar *) gst_stream_get_stream_id (slot->pending_stream)); else GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot); } if (to_activate == NULL && pending_streams != NULL) { GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection"); if (dbin->requested_selection) g_list_free_full (dbin->requested_selection, g_free); dbin->requested_selection = g_list_copy_deep (select_streams, (GCopyFunc) g_strdup, NULL); g_list_free (to_deactivate); g_list_free (pending_streams); to_deactivate = NULL; pending_streams = NULL; } else { if (dbin->requested_selection) g_list_free_full (dbin->requested_selection, g_free); dbin->requested_selection = g_list_copy_deep (future_request_streams, (GCopyFunc) g_strdup, NULL); dbin->requested_selection = g_list_concat (dbin->requested_selection, g_list_copy_deep (pending_streams, (GCopyFunc) g_strdup, NULL)); if (dbin->to_activate) g_list_free (dbin->to_activate); dbin->to_activate = g_list_copy (to_reassign); } dbin->selection_updated = TRUE; SELECTION_UNLOCK (dbin); if (unknown) { GST_FIXME_OBJECT (dbin, "Got request for an unknown stream"); g_list_free (unknown); } if (to_activate && !slots_to_reassign) { for (tmp = to_activate; tmp; tmp = tmp->next) { MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data; gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE, (GstPadProbeCallback) idle_reconfigure, slot, NULL); } } /* For all streams to deactivate, add an idle probe where we will do * the unassignment and switch over */ for (tmp = slots_to_reassign; tmp; tmp = tmp->next) { MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data; gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE, (GstPadProbeCallback) slot_unassign_probe, slot, NULL); } if (to_deactivate) g_list_free (to_deactivate); if (to_activate) g_list_free (to_activate); if (to_reassign) g_list_free (to_reassign); if (future_request_streams) g_list_free (future_request_streams); if (pending_streams) g_list_free (pending_streams); if (slots_to_reassign) g_list_free (slots_to_reassign); return ret; } static GstPadProbeReturn ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info, DecodebinOutputStream * output) { GstPadProbeReturn ret = GST_PAD_PROBE_OK; GstDecodebin3 *dbin = output->dbin; GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info); GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event)); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SELECT_STREAMS: { GstPad *peer; GList *streams = NULL; guint32 seqnum = gst_event_get_seqnum (event); SELECTION_LOCK (dbin); if (seqnum == dbin->select_streams_seqnum) { SELECTION_UNLOCK (dbin); GST_DEBUG_OBJECT (pad, "Already handled/handling that SELECT_STREAMS event"); gst_event_unref (event); ret = GST_PAD_PROBE_HANDLED; break; } dbin->select_streams_seqnum = seqnum; if (dbin->pending_select_streams != NULL) { GST_LOG_OBJECT (dbin, "Replacing pending select streams"); g_list_free (dbin->pending_select_streams); dbin->pending_select_streams = NULL; } gst_event_parse_select_streams (event, &streams); dbin->pending_select_streams = g_list_copy (streams); SELECTION_UNLOCK (dbin); /* Send event upstream */ if ((peer = gst_pad_get_peer (pad))) { gst_pad_send_event (peer, event); gst_object_unref (peer); } else { gst_event_unref (event); } /* Finally handle the switch */ if (streams) { handle_stream_switch (dbin, streams, seqnum); g_list_free_full (streams, g_free); } ret = GST_PAD_PROBE_HANDLED; } break; default: break; } return ret; } static gboolean gst_decodebin3_send_event (GstElement * element, GstEvent * event) { GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event)); if (GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) { GstDecodebin3 *dbin = (GstDecodebin3 *) element; GList *streams = NULL; guint32 seqnum = gst_event_get_seqnum (event); SELECTION_LOCK (dbin); if (seqnum == dbin->select_streams_seqnum) { SELECTION_UNLOCK (dbin); GST_DEBUG_OBJECT (dbin, "Already handled/handling that SELECT_STREAMS event"); return TRUE; } dbin->select_streams_seqnum = seqnum; if (dbin->pending_select_streams != NULL) { GST_LOG_OBJECT (dbin, "Replacing pending select streams"); g_list_free (dbin->pending_select_streams); dbin->pending_select_streams = NULL; } gst_event_parse_select_streams (event, &streams); dbin->pending_select_streams = g_list_copy (streams); SELECTION_UNLOCK (dbin); /* FIXME : We don't have an upstream ?? */ #if 0 /* Send event upstream */ if ((peer = gst_pad_get_peer (pad))) { gst_pad_send_event (peer, event); gst_object_unref (peer); } #endif /* Finally handle the switch */ if (streams) { handle_stream_switch (dbin, streams, seqnum); g_list_free_full (streams, g_free); } gst_event_unref (event); return TRUE; } return GST_ELEMENT_CLASS (parent_class)->send_event (element, event); } static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot) { if (slot->probe_id) gst_pad_remove_probe (slot->src_pad, slot->probe_id); if (slot->input) { if (slot->input->srcpad) gst_pad_unlink (slot->input->srcpad, slot->sink_pad); } gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad); gst_object_replace ((GstObject **) & slot->sink_pad, NULL); gst_object_replace ((GstObject **) & slot->src_pad, NULL); gst_object_replace ((GstObject **) & slot->active_stream, NULL); g_free (slot); } static void free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot) { GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free"); gst_element_call_async (GST_ELEMENT_CAST (dbin), (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL); } /* Create a DecodebinOutputStream for a given type * Note: It will be empty initially, it needs to be configured * afterwards */ static DecodebinOutputStream * create_output_stream (GstDecodebin3 * dbin, GstStreamType type) { DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1); gchar *pad_name; const gchar *prefix; GstStaticPadTemplate *templ; GstPadTemplate *ptmpl; guint32 *counter; GstPad *internal_pad; GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s", res, gst_stream_type_get_name (type)); res->type = type; res->dbin = dbin; if (type & GST_STREAM_TYPE_VIDEO) { templ = &video_src_template; counter = &dbin->vpadcount; prefix = "video"; } else if (type & GST_STREAM_TYPE_AUDIO) { templ = &audio_src_template; counter = &dbin->apadcount; prefix = "audio"; } else if (type & GST_STREAM_TYPE_TEXT) { templ = &text_src_template; counter = &dbin->tpadcount; prefix = "text"; } else { templ = &src_template; counter = &dbin->opadcount; prefix = "src"; } pad_name = g_strdup_printf ("%s_%u", prefix, *counter); *counter += 1; ptmpl = gst_static_pad_template_get (templ); res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl); gst_object_unref (ptmpl); g_free (pad_name); gst_pad_set_active (res->src_pad, TRUE); /* Put an event probe on the internal proxy pad to detect upstream * events */ internal_pad = (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad); gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, (GstPadProbeCallback) ghost_pad_event_probe, res, NULL); gst_object_unref (internal_pad); dbin->output_streams = g_list_append (dbin->output_streams, res); return res; } static void free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output) { if (output->slot) { if (output->decoder_sink && output->decoder) gst_pad_unlink (output->slot->src_pad, output->decoder_sink); output->slot->output = NULL; output->slot = NULL; } gst_object_replace ((GstObject **) & output->decoder_sink, NULL); gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL); gst_object_replace ((GstObject **) & output->decoder_src, NULL); if (output->src_exposed) { gst_element_remove_pad ((GstElement *) dbin, output->src_pad); } if (output->decoder) { gst_element_set_locked_state (output->decoder, TRUE); gst_element_set_state (output->decoder, GST_STATE_NULL); gst_bin_remove ((GstBin *) dbin, output->decoder); } g_free (output); } static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element, GstStateChange transition) { GstDecodebin3 *dbin = (GstDecodebin3 *) element; GstStateChangeReturn ret; /* Upwards */ switch (transition) { default: break; } ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); if (ret == GST_STATE_CHANGE_FAILURE) goto beach; switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_READY: { GList *tmp; /* Free output streams */ for (tmp = dbin->output_streams; tmp; tmp = tmp->next) { DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data; free_output_stream (dbin, output); } g_list_free (dbin->output_streams); dbin->output_streams = NULL; /* Free multiqueue slots */ for (tmp = dbin->slots; tmp; tmp = tmp->next) { MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data; free_multiqueue_slot (dbin, slot); } g_list_free (dbin->slots); dbin->slots = NULL; dbin->current_group_id = GST_GROUP_ID_INVALID; /* Free inputs */ } break; default: break; } beach: return ret; } gboolean gst_decodebin3_plugin_init (GstPlugin * plugin) { GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin"); return gst_element_register (plugin, "decodebin3", GST_RANK_NONE, GST_TYPE_DECODEBIN3); }