1 /* GStreamer input selector
2  * Copyright (C) 2003 Julien Moutte <julien@moutte.net>
3  * Copyright (C) 2005 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
4  * Copyright (C) 2005 Jan Schmidt <thaytan@mad.scientist.com>
5  * Copyright (C) 2007 Wim Taymans <wim.taymans@gmail.com>
6  * Copyright (C) 2007 Andy Wingo <wingo@pobox.com>
7  * Copyright (C) 2008 Nokia Corporation. (contact <stefan.kost@nokia.com>)
8  * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25 
26 /**
27  * SECTION:element-input-selector
28  * @title: input-selector
29  * @see_also: #GstOutputSelector
30  *
31  * Direct one out of N input streams to the output pad.
32  *
33  * The input pads are from a GstPad subclass and have additional
34  * properties, which users may find useful, namely:
35  *
36  * * "running-time": Running time of stream on pad (#gint64)
37  * * "tags": The currently active tags on the pad (#GstTagList, boxed type)
38  * * "active": If the pad is currently active (#gboolean)
39  * * "always-ok" : Make an inactive pads return #GST_FLOW_OK instead of #GST_FLOW_NOT_LINKED
40  *
41  */
42 
43 #ifdef HAVE_CONFIG_H
44 #include "config.h"
45 #endif
46 
47 #include <string.h>
48 
49 #include "rsninputselector.h"
50 
51 #define DEBUG_CACHED_BUFFERS 0
52 
53 GST_DEBUG_CATEGORY_STATIC (input_selector_debug);
54 #define GST_CAT_DEFAULT input_selector_debug
55 
56 #define GST_TYPE_INPUT_SELECTOR_SYNC_MODE (gst_input_selector_sync_mode_get_type())
57 static GType
gst_input_selector_sync_mode_get_type(void)58 gst_input_selector_sync_mode_get_type (void)
59 {
60   static GType type = 0;
61   static const GEnumValue data[] = {
62     {GST_INPUT_SELECTOR_SYNC_MODE_ACTIVE_SEGMENT,
63           "Sync using the current active segment",
64         "active-segment"},
65     {GST_INPUT_SELECTOR_SYNC_MODE_CLOCK, "Sync using the clock", "clock"},
66     {0, NULL, NULL},
67   };
68 
69   if (!type) {
70     type = g_enum_register_static ("RsnInputSelectorSyncMode", data);
71   }
72   return type;
73 }
74 
75 #define GST_INPUT_SELECTOR_GET_LOCK(sel) (&((RsnInputSelector*)(sel))->lock)
76 #define GST_INPUT_SELECTOR_GET_COND(sel) (&((RsnInputSelector*)(sel))->cond)
77 #define GST_INPUT_SELECTOR_LOCK(sel) (g_mutex_lock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
78 #define GST_INPUT_SELECTOR_UNLOCK(sel) (g_mutex_unlock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
79 #define GST_INPUT_SELECTOR_WAIT(sel) (g_cond_wait (GST_INPUT_SELECTOR_GET_COND(sel), \
80 			GST_INPUT_SELECTOR_GET_LOCK(sel)))
81 #define GST_INPUT_SELECTOR_BROADCAST(sel) (g_cond_broadcast (GST_INPUT_SELECTOR_GET_COND(sel)))
82 
83 static GstStaticPadTemplate gst_input_selector_sink_factory =
84 GST_STATIC_PAD_TEMPLATE ("sink_%u",
85     GST_PAD_SINK,
86     GST_PAD_REQUEST,
87     GST_STATIC_CAPS_ANY);
88 
89 static GstStaticPadTemplate gst_input_selector_src_factory =
90 GST_STATIC_PAD_TEMPLATE ("src",
91     GST_PAD_SRC,
92     GST_PAD_ALWAYS,
93     GST_STATIC_CAPS_ANY);
94 
95 enum
96 {
97   PROP_0,
98   PROP_N_PADS,
99   PROP_ACTIVE_PAD,
100   PROP_SYNC_STREAMS,
101   PROP_SYNC_MODE,
102   PROP_CACHE_BUFFERS
103 };
104 
105 #define DEFAULT_SYNC_STREAMS TRUE
106 #define DEFAULT_SYNC_MODE GST_INPUT_SELECTOR_SYNC_MODE_ACTIVE_SEGMENT
107 #define DEFAULT_CACHE_BUFFERS FALSE
108 #define DEFAULT_PAD_ALWAYS_OK TRUE
109 
110 enum
111 {
112   PROP_PAD_0,
113   PROP_PAD_RUNNING_TIME,
114   PROP_PAD_TAGS,
115   PROP_PAD_ACTIVE,
116   PROP_PAD_ALWAYS_OK
117 };
118 
119 enum
120 {
121   /* methods */
122   SIGNAL_BLOCK,
123   SIGNAL_SWITCH,
124   LAST_SIGNAL
125 };
126 static guint gst_input_selector_signals[LAST_SIGNAL] = { 0 };
127 
128 static void gst_input_selector_active_pad_changed (RsnInputSelector * sel,
129     GParamSpec * pspec, gpointer user_data);
130 static inline gboolean gst_input_selector_is_active_sinkpad (RsnInputSelector *
131     sel, GstPad * pad);
132 static GstPad *gst_input_selector_activate_sinkpad (RsnInputSelector * sel,
133     GstPad * pad);
134 static gboolean gst_input_selector_set_active_pad (RsnInputSelector * self,
135     GstPad * pad);
136 static GstPad *gst_input_selector_get_linked_pad (RsnInputSelector * sel,
137     GstPad * pad, gboolean strict);
138 static gboolean forward_sticky_events (GstPad * sinkpad, GstEvent ** event,
139     gpointer user_data);
140 
141 #define GST_TYPE_SELECTOR_PAD \
142   (gst_selector_pad_get_type())
143 #define GST_SELECTOR_PAD(obj) \
144   (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SELECTOR_PAD, RsnSelectorPad))
145 #define GST_SELECTOR_PAD_CLASS(klass) \
146   (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SELECTOR_PAD, RsnSelectorPadClass))
147 #define GST_IS_SELECTOR_PAD(obj) \
148   (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SELECTOR_PAD))
149 #define GST_IS_SELECTOR_PAD_CLASS(klass) \
150   (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SELECTOR_PAD))
151 #define GST_SELECTOR_PAD_CAST(obj) \
152   ((RsnSelectorPad *)(obj))
153 
154 typedef struct _RsnSelectorPad RsnSelectorPad;
155 typedef struct _RsnSelectorPadClass RsnSelectorPadClass;
156 typedef struct _RsnSelectorPadCachedBuffer RsnSelectorPadCachedBuffer;
157 
158 struct _RsnSelectorPad
159 {
160   GstPad parent;
161 
162   gboolean active;              /* when buffer have passed the pad */
163   gboolean pushed;              /* when buffer was pushed downstream since activation */
164   gboolean eos;                 /* when EOS has been received */
165   gboolean eos_sent;            /* when EOS was sent downstream */
166   gboolean discont;             /* after switching we create a discont */
167   gboolean flushing;            /* set after flush-start and before flush-stop */
168   gboolean always_ok;
169   GstTagList *tags;             /* last tags received on the pad */
170 
171   GstClockTime position;        /* the current position in the segment */
172   GstSegment segment;           /* the current segment on the pad */
173   guint32 segment_seqnum;       /* sequence number of the current segment */
174 
175   gboolean events_pending;      /* TRUE if sticky events need to be updated */
176 
177   gboolean sending_cached_buffers;
178   GQueue *cached_buffers;
179 };
180 
181 struct _RsnSelectorPadCachedBuffer
182 {
183   GstBuffer *buffer;
184   GstSegment segment;
185 };
186 
187 struct _RsnSelectorPadClass
188 {
189   GstPadClass parent;
190 };
191 
192 GType gst_selector_pad_get_type (void);
193 static void gst_selector_pad_finalize (GObject * object);
194 static void gst_selector_pad_get_property (GObject * object,
195     guint prop_id, GValue * value, GParamSpec * pspec);
196 static void gst_selector_pad_set_property (GObject * object,
197     guint prop_id, const GValue * value, GParamSpec * pspec);
198 
199 static gint64 gst_selector_pad_get_running_time (RsnSelectorPad * pad);
200 static void gst_selector_pad_reset (RsnSelectorPad * pad);
201 static gboolean gst_selector_pad_event (GstPad * pad, GstObject * parent,
202     GstEvent * event);
203 static gboolean gst_selector_pad_query (GstPad * pad, GstObject * parent,
204     GstQuery * query);
205 static GstIterator *gst_selector_pad_iterate_linked_pads (GstPad * pad,
206     GstObject * parent);
207 static GstFlowReturn gst_selector_pad_chain (GstPad * pad, GstObject * parent,
208     GstBuffer * buf);
209 static void gst_selector_pad_cache_buffer (RsnSelectorPad * selpad,
210     GstBuffer * buffer);
211 static void gst_selector_pad_free_cached_buffers (RsnSelectorPad * selpad);
212 
213 G_DEFINE_TYPE (RsnSelectorPad, gst_selector_pad, GST_TYPE_PAD);
214 
215 static void
gst_selector_pad_class_init(RsnSelectorPadClass * klass)216 gst_selector_pad_class_init (RsnSelectorPadClass * klass)
217 {
218   GObjectClass *gobject_class;
219 
220   gobject_class = (GObjectClass *) klass;
221 
222   gobject_class->finalize = gst_selector_pad_finalize;
223 
224   gobject_class->get_property = gst_selector_pad_get_property;
225   gobject_class->set_property = gst_selector_pad_set_property;
226 
227   g_object_class_install_property (gobject_class, PROP_PAD_RUNNING_TIME,
228       g_param_spec_int64 ("running-time", "Running time",
229           "Running time of stream on pad", 0, G_MAXINT64, 0,
230           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
231   g_object_class_install_property (gobject_class, PROP_PAD_TAGS,
232       g_param_spec_boxed ("tags", "Tags",
233           "The currently active tags on the pad", GST_TYPE_TAG_LIST,
234           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
235   g_object_class_install_property (gobject_class, PROP_PAD_ACTIVE,
236       g_param_spec_boolean ("active", "Active",
237           "If the pad is currently active", FALSE,
238           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
239   /* FIXME: better property name? */
240   g_object_class_install_property (gobject_class, PROP_PAD_ALWAYS_OK,
241       g_param_spec_boolean ("always-ok", "Always OK",
242           "Make an inactive pad return OK instead of NOT_LINKED",
243           DEFAULT_PAD_ALWAYS_OK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
244 }
245 
246 static void
gst_selector_pad_init(RsnSelectorPad * pad)247 gst_selector_pad_init (RsnSelectorPad * pad)
248 {
249   pad->always_ok = DEFAULT_PAD_ALWAYS_OK;
250   gst_selector_pad_reset (pad);
251 }
252 
253 static void
gst_selector_pad_finalize(GObject * object)254 gst_selector_pad_finalize (GObject * object)
255 {
256   RsnSelectorPad *pad;
257 
258   pad = GST_SELECTOR_PAD_CAST (object);
259 
260   if (pad->tags)
261     gst_tag_list_unref (pad->tags);
262   gst_selector_pad_free_cached_buffers (pad);
263 
264   G_OBJECT_CLASS (gst_selector_pad_parent_class)->finalize (object);
265 }
266 
267 static void
gst_selector_pad_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)268 gst_selector_pad_set_property (GObject * object, guint prop_id,
269     const GValue * value, GParamSpec * pspec)
270 {
271   RsnSelectorPad *spad = GST_SELECTOR_PAD_CAST (object);
272 
273   switch (prop_id) {
274     case PROP_PAD_ALWAYS_OK:
275       GST_OBJECT_LOCK (object);
276       spad->always_ok = g_value_get_boolean (value);
277       GST_OBJECT_UNLOCK (object);
278       break;
279     default:
280       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
281       break;
282   }
283 }
284 
285 static void
gst_selector_pad_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)286 gst_selector_pad_get_property (GObject * object, guint prop_id,
287     GValue * value, GParamSpec * pspec)
288 {
289   RsnSelectorPad *spad = GST_SELECTOR_PAD_CAST (object);
290 
291   switch (prop_id) {
292     case PROP_PAD_RUNNING_TIME:
293       g_value_set_int64 (value, gst_selector_pad_get_running_time (spad));
294       break;
295     case PROP_PAD_TAGS:
296       GST_OBJECT_LOCK (object);
297       g_value_set_boxed (value, spad->tags);
298       GST_OBJECT_UNLOCK (object);
299       break;
300     case PROP_PAD_ACTIVE:
301     {
302       RsnInputSelector *sel;
303 
304       sel = GST_INPUT_SELECTOR (gst_pad_get_parent (spad));
305       g_value_set_boolean (value, gst_input_selector_is_active_sinkpad (sel,
306               GST_PAD_CAST (spad)));
307       gst_object_unref (sel);
308       break;
309     }
310     case PROP_PAD_ALWAYS_OK:
311       GST_OBJECT_LOCK (object);
312       g_value_set_boolean (value, spad->always_ok);
313       GST_OBJECT_UNLOCK (object);
314       break;
315     default:
316       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
317       break;
318   }
319 }
320 
321 static gint64
gst_selector_pad_get_running_time(RsnSelectorPad * pad)322 gst_selector_pad_get_running_time (RsnSelectorPad * pad)
323 {
324   gint64 ret = 0;
325 
326   GST_OBJECT_LOCK (pad);
327   if (pad->active) {
328     guint64 position = pad->position;
329     GstFormat format = pad->segment.format;
330 
331     ret = gst_segment_to_running_time (&pad->segment, format, position);
332   }
333   GST_OBJECT_UNLOCK (pad);
334 
335   GST_DEBUG_OBJECT (pad, "running time: %" GST_TIME_FORMAT
336       " segment: %" GST_SEGMENT_FORMAT, GST_TIME_ARGS (ret), &pad->segment);
337 
338   return ret;
339 }
340 
341 /* must be called with the SELECTOR_LOCK */
342 static void
gst_selector_pad_reset(RsnSelectorPad * pad)343 gst_selector_pad_reset (RsnSelectorPad * pad)
344 {
345   GST_OBJECT_LOCK (pad);
346   pad->active = FALSE;
347   pad->pushed = FALSE;
348   pad->eos = FALSE;
349   pad->eos_sent = FALSE;
350   pad->events_pending = FALSE;
351   pad->discont = FALSE;
352   pad->flushing = FALSE;
353   pad->position = GST_CLOCK_TIME_NONE;
354   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
355   pad->sending_cached_buffers = FALSE;
356   gst_selector_pad_free_cached_buffers (pad);
357   GST_OBJECT_UNLOCK (pad);
358 }
359 
360 static RsnSelectorPadCachedBuffer *
gst_selector_pad_new_cached_buffer(RsnSelectorPad * selpad,GstBuffer * buffer)361 gst_selector_pad_new_cached_buffer (RsnSelectorPad * selpad, GstBuffer * buffer)
362 {
363   RsnSelectorPadCachedBuffer *cached_buffer =
364       g_slice_new (RsnSelectorPadCachedBuffer);
365   cached_buffer->buffer = buffer;
366   cached_buffer->segment = selpad->segment;
367   return cached_buffer;
368 }
369 
370 static void
gst_selector_pad_free_cached_buffer(RsnSelectorPadCachedBuffer * cached_buffer)371 gst_selector_pad_free_cached_buffer (RsnSelectorPadCachedBuffer * cached_buffer)
372 {
373   gst_buffer_unref (cached_buffer->buffer);
374   g_slice_free (RsnSelectorPadCachedBuffer, cached_buffer);
375 }
376 
377 /* must be called with the SELECTOR_LOCK */
378 static void
gst_selector_pad_cache_buffer(RsnSelectorPad * selpad,GstBuffer * buffer)379 gst_selector_pad_cache_buffer (RsnSelectorPad * selpad, GstBuffer * buffer)
380 {
381   if (selpad->segment.format != GST_FORMAT_TIME) {
382     GST_DEBUG_OBJECT (selpad, "Buffer %p with segment not in time format, "
383         "not caching", buffer);
384     return;
385   }
386 
387   GST_DEBUG_OBJECT (selpad, "Caching buffer %p", buffer);
388   if (!selpad->cached_buffers)
389     selpad->cached_buffers = g_queue_new ();
390   g_queue_push_tail (selpad->cached_buffers,
391       gst_selector_pad_new_cached_buffer (selpad, buffer));
392 }
393 
394 /* must be called with the SELECTOR_LOCK */
395 static void
gst_selector_pad_free_cached_buffers(RsnSelectorPad * selpad)396 gst_selector_pad_free_cached_buffers (RsnSelectorPad * selpad)
397 {
398   RsnSelectorPadCachedBuffer *cached_buffer;
399 
400   if (!selpad->cached_buffers)
401     return;
402 
403   GST_DEBUG_OBJECT (selpad, "Freeing cached buffers");
404   while ((cached_buffer = g_queue_pop_head (selpad->cached_buffers)))
405     gst_selector_pad_free_cached_buffer (cached_buffer);
406   g_queue_free (selpad->cached_buffers);
407   selpad->cached_buffers = NULL;
408 }
409 
410 /* strictly get the linked pad from the sinkpad. If the pad is active we return
411  * the srcpad else we return NULL */
412 static GstIterator *
gst_selector_pad_iterate_linked_pads(GstPad * pad,GstObject * parent)413 gst_selector_pad_iterate_linked_pads (GstPad * pad, GstObject * parent)
414 {
415   RsnInputSelector *sel;
416   GstPad *otherpad;
417   GstIterator *it = NULL;
418   GValue val = { 0, };
419 
420   sel = GST_INPUT_SELECTOR (parent);
421 
422   otherpad = gst_input_selector_get_linked_pad (sel, pad, TRUE);
423   if (otherpad) {
424     g_value_init (&val, GST_TYPE_PAD);
425     g_value_set_object (&val, otherpad);
426     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
427     g_value_unset (&val);
428     gst_object_unref (otherpad);
429   }
430 
431   return it;
432 }
433 
434 static gboolean
gst_selector_pad_event(GstPad * pad,GstObject * parent,GstEvent * event)435 gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
436 {
437   gboolean res = TRUE;
438   gboolean forward;
439   gboolean new_tags = FALSE;
440   RsnInputSelector *sel;
441   RsnSelectorPad *selpad;
442   GstPad *prev_active_sinkpad;
443   GstPad *active_sinkpad;
444 
445   sel = GST_INPUT_SELECTOR (parent);
446   selpad = GST_SELECTOR_PAD_CAST (pad);
447   GST_DEBUG_OBJECT (selpad, "received event %" GST_PTR_FORMAT, event);
448 
449   GST_INPUT_SELECTOR_LOCK (sel);
450   prev_active_sinkpad = sel->active_sinkpad;
451   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
452   GST_INPUT_SELECTOR_UNLOCK (sel);
453 
454   if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
455     g_object_notify (G_OBJECT (sel), "active-pad");
456   }
457 
458   GST_INPUT_SELECTOR_LOCK (sel);
459   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
460 
461   /* only forward if we are dealing with the active sinkpad */
462   forward = (pad == active_sinkpad);
463 
464   switch (GST_EVENT_TYPE (event)) {
465     case GST_EVENT_FLUSH_START:
466       /* Unblock the pad if it's waiting */
467       selpad->flushing = TRUE;
468       GST_INPUT_SELECTOR_BROADCAST (sel);
469       break;
470     case GST_EVENT_FLUSH_STOP:
471       gst_selector_pad_reset (selpad);
472       break;
473     case GST_EVENT_SEGMENT:
474     {
475       gst_event_copy_segment (event, &selpad->segment);
476       selpad->segment_seqnum = gst_event_get_seqnum (event);
477 
478       /* Update the position */
479       if (selpad->position == GST_CLOCK_TIME_NONE
480           || selpad->segment.position > selpad->position) {
481         selpad->position = selpad->segment.position;
482       } else if (selpad->position != GST_CLOCK_TIME_NONE
483           && selpad->position > selpad->segment.position) {
484         selpad->segment.position = selpad->position;
485 
486         if (forward) {
487           gst_event_unref (event);
488           event = gst_event_new_segment (&selpad->segment);
489           gst_event_set_seqnum (event, selpad->segment_seqnum);
490         }
491       }
492       GST_DEBUG_OBJECT (pad, "configured SEGMENT %" GST_SEGMENT_FORMAT,
493           &selpad->segment);
494       break;
495     }
496     case GST_EVENT_TAG:
497     {
498       GstTagList *tags, *oldtags, *newtags;
499 
500       gst_event_parse_tag (event, &tags);
501 
502       oldtags = selpad->tags;
503 
504       newtags = gst_tag_list_merge (oldtags, tags, GST_TAG_MERGE_REPLACE);
505       selpad->tags = newtags;
506       if (oldtags)
507         gst_tag_list_unref (oldtags);
508       GST_DEBUG_OBJECT (pad, "received tags %" GST_PTR_FORMAT, newtags);
509 
510       new_tags = TRUE;
511       break;
512     }
513     case GST_EVENT_EOS:
514       selpad->eos = TRUE;
515 
516       if (forward) {
517         selpad->eos_sent = TRUE;
518       } else {
519         RsnSelectorPad *active_selpad;
520 
521         /* If the active sinkpad is in EOS state but EOS
522          * was not sent downstream this means that the pad
523          * got EOS before it was set as active pad and that
524          * the previously active pad got EOS after it was
525          * active
526          */
527         active_selpad = GST_SELECTOR_PAD (active_sinkpad);
528         forward = (active_selpad->eos && !active_selpad->eos_sent);
529         active_selpad->eos_sent = TRUE;
530       }
531       GST_DEBUG_OBJECT (pad, "received EOS");
532       break;
533     case GST_EVENT_CUSTOM_DOWNSTREAM:
534     {
535       const GstStructure *structure = gst_event_get_structure (event);
536       if (structure != NULL &&
537           gst_structure_has_name (structure, "application/x-gst-dvd")) {
538         const char *type = gst_structure_get_string (structure, "event");
539         if (strcmp (type, "select-pad") == 0) {
540           gst_input_selector_set_active_pad (sel, pad);
541           forward = FALSE;
542         }
543       }
544     }
545       break;
546 
547     default:
548       break;
549   }
550   GST_INPUT_SELECTOR_UNLOCK (sel);
551   if (new_tags)
552     g_object_notify (G_OBJECT (selpad), "tags");
553   if (forward) {
554     GST_DEBUG_OBJECT (pad, "forwarding event");
555     res = gst_pad_push_event (sel->srcpad, event);
556   } else {
557     /* If we aren't forwarding the event because the pad is not the
558      * active_sinkpad, then set the flag on the pad
559      * that says a segment needs sending if/when that pad is activated.
560      * For all other cases, we send the event immediately, which makes
561      * sparse streams and other segment updates work correctly downstream.
562      */
563     if (GST_EVENT_IS_STICKY (event))
564       selpad->events_pending = TRUE;
565     gst_event_unref (event);
566   }
567 
568   return res;
569 }
570 
571 static gboolean
gst_selector_pad_query(GstPad * pad,GstObject * parent,GstQuery * query)572 gst_selector_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
573 {
574   gboolean res = FALSE;
575 
576   switch (GST_QUERY_TYPE (query)) {
577     case GST_QUERY_ALLOCATION:{
578       GstPad *active_sinkpad;
579       RsnInputSelector *sel = GST_INPUT_SELECTOR (parent);
580 
581       /* Only do the allocation query for the active sinkpad,
582        * after switching a reconfigure event is sent and upstream
583        * should reconfigure and do a new allocation query
584        */
585       if (GST_PAD_DIRECTION (pad) == GST_PAD_SINK) {
586         GST_INPUT_SELECTOR_LOCK (sel);
587         active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
588         GST_INPUT_SELECTOR_UNLOCK (sel);
589 
590         if (pad != active_sinkpad) {
591           res = FALSE;
592           goto done;
593         }
594       }
595     }
596       /* fall through */
597     default:
598       res = gst_pad_query_default (pad, parent, query);
599       break;
600   }
601 
602 done:
603   return res;
604 }
605 
606 /* must be called with the SELECTOR_LOCK, will block while the pad is blocked
607  * or return TRUE when flushing */
608 static gboolean
gst_input_selector_wait(RsnInputSelector * self,RsnSelectorPad * pad)609 gst_input_selector_wait (RsnInputSelector * self, RsnSelectorPad * pad)
610 {
611   while (self->blocked && !self->flushing && !pad->flushing) {
612     /* we can be unlocked here when we are shutting down (flushing) or when we
613      * get unblocked */
614     GST_INPUT_SELECTOR_WAIT (self);
615   }
616   return self->flushing;
617 }
618 
619 /* must be called without the SELECTOR_LOCK, will wait until the running time
620  * of the active pad is after this pad or return TRUE when flushing */
621 static gboolean
gst_input_selector_wait_running_time(RsnInputSelector * sel,RsnSelectorPad * selpad,GstBuffer * buf)622 gst_input_selector_wait_running_time (RsnInputSelector * sel,
623     RsnSelectorPad * selpad, GstBuffer * buf)
624 {
625   GstSegment *seg;
626 
627   GST_DEBUG_OBJECT (selpad, "entering wait for buffer %p", buf);
628 
629   /* If we have no valid timestamp we can't sync this buffer */
630   if (!GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
631     GST_DEBUG_OBJECT (selpad, "leaving wait for buffer with "
632         "invalid timestamp");
633     return FALSE;
634   }
635 
636   seg = &selpad->segment;
637 
638   /* Wait until
639    *   a) this is the active pad
640    *   b) the pad or the selector is flushing
641    *   c) the selector is not blocked
642    *   d) the buffer running time is before the current running time
643    *      (either active-seg or clock, depending on sync-mode)
644    */
645 
646   GST_INPUT_SELECTOR_LOCK (sel);
647   while (TRUE) {
648     GstPad *active_sinkpad;
649     RsnSelectorPad *active_selpad;
650     GstClock *clock;
651     gint64 cur_running_time;
652     GstClockTime running_time;
653 
654     active_sinkpad =
655         gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (selpad));
656     active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
657 
658     if (seg->format != GST_FORMAT_TIME) {
659       GST_DEBUG_OBJECT (selpad,
660           "Not waiting because we don't have a TIME segment");
661       GST_INPUT_SELECTOR_UNLOCK (sel);
662       return FALSE;
663     }
664 
665     running_time = GST_BUFFER_TIMESTAMP (buf);
666     /* If possible try to get the running time at the end of the buffer */
667     if (GST_BUFFER_DURATION_IS_VALID (buf))
668       running_time += GST_BUFFER_DURATION (buf);
669     /* Only use the segment to convert to running time if the segment is
670      * in TIME format, otherwise do our best to try to sync */
671     if (GST_CLOCK_TIME_IS_VALID (seg->stop)) {
672       if (running_time > seg->stop) {
673         running_time = seg->stop;
674       }
675     }
676     running_time =
677         gst_segment_to_running_time (seg, GST_FORMAT_TIME, running_time);
678     /* If this is outside the segment don't sync */
679     if (running_time == -1) {
680       GST_DEBUG_OBJECT (selpad,
681           "Not waiting because buffer is outside segment");
682       GST_INPUT_SELECTOR_UNLOCK (sel);
683       return FALSE;
684     }
685 
686     cur_running_time = GST_CLOCK_TIME_NONE;
687     if (sel->sync_mode == GST_INPUT_SELECTOR_SYNC_MODE_CLOCK) {
688       clock = gst_element_get_clock (GST_ELEMENT_CAST (sel));
689       if (clock) {
690         GstClockTime base_time;
691 
692         cur_running_time = gst_clock_get_time (clock);
693         base_time = gst_element_get_base_time (GST_ELEMENT_CAST (sel));
694         if (base_time <= cur_running_time)
695           cur_running_time -= base_time;
696         else
697           cur_running_time = 0;
698         gst_object_unref (clock);
699       }
700     } else {
701       GstSegment *active_seg;
702 
703       active_seg = &active_selpad->segment;
704 
705       /* If the active segment is configured but not to time format
706        * we can't do any syncing at all */
707       if (active_seg->format != GST_FORMAT_TIME
708           && active_seg->format != GST_FORMAT_UNDEFINED) {
709         GST_DEBUG_OBJECT (selpad,
710             "Not waiting because active segment isn't in TIME format");
711         GST_INPUT_SELECTOR_UNLOCK (sel);
712         return FALSE;
713       }
714 
715       /* Get active pad's running time, if no configured segment yet keep at -1 */
716       if (active_seg->format == GST_FORMAT_TIME)
717         cur_running_time = gst_segment_to_running_time (active_seg,
718             GST_FORMAT_TIME, active_seg->position);
719     }
720 
721     if (selpad != active_selpad && !sel->flushing && !selpad->flushing &&
722         (sel->blocked || cur_running_time == -1
723             || running_time >= cur_running_time)) {
724       if (!sel->blocked) {
725         GST_DEBUG_OBJECT (selpad,
726             "Waiting for active streams to advance. %" GST_TIME_FORMAT " >= %"
727             GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
728             GST_TIME_ARGS (cur_running_time));
729       } else
730         GST_DEBUG_OBJECT (selpad, "Waiting for selector to unblock");
731 
732       GST_INPUT_SELECTOR_WAIT (sel);
733     } else {
734       GST_INPUT_SELECTOR_UNLOCK (sel);
735       break;
736     }
737   }
738 
739   /* Return TRUE if the selector or the pad is flushing */
740   return (sel->flushing || selpad->flushing);
741 }
742 
743 static gboolean
forward_sticky_events(GstPad * sinkpad,GstEvent ** event,gpointer user_data)744 forward_sticky_events (GstPad * sinkpad, GstEvent ** event, gpointer user_data)
745 {
746   RsnInputSelector *sel = GST_INPUT_SELECTOR (user_data);
747 
748   if (GST_EVENT_TYPE (*event) == GST_EVENT_SEGMENT) {
749     GstSegment *seg = &GST_SELECTOR_PAD (sinkpad)->segment;
750     GstEvent *e;
751 
752     e = gst_event_new_segment (seg);
753     gst_event_set_seqnum (e, GST_SELECTOR_PAD_CAST (sinkpad)->segment_seqnum);
754 
755     gst_pad_push_event (sel->srcpad, e);
756   } else if (GST_EVENT_TYPE (*event) != GST_EVENT_STREAM_START) {
757     GST_WARNING ("Pushing event %" GST_PTR_FORMAT, *event);
758     gst_pad_push_event (sel->srcpad, gst_event_ref (*event));
759   }
760 
761   return TRUE;
762 }
763 
764 #if DEBUG_CACHED_BUFFERS
765 static void
gst_input_selector_debug_cached_buffers(RsnInputSelector * sel)766 gst_input_selector_debug_cached_buffers (RsnInputSelector * sel)
767 {
768   GList *walk;
769 
770   for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk; walk = g_list_next (walk)) {
771     RsnSelectorPad *selpad;
772     GString *timestamps;
773     gchar *str;
774     int i;
775 
776     selpad = GST_SELECTOR_PAD_CAST (walk->data);
777     if (!selpad->cached_buffers) {
778       GST_DEBUG_OBJECT (selpad, "Cached buffers timestamps: <none>");
779       continue;
780     }
781 
782     timestamps = g_string_new ("Cached buffers timestamps:");
783     for (i = 0; i < selpad->cached_buffers->length; ++i) {
784       RsnSelectorPadCachedBuffer *cached_buffer;
785 
786       cached_buffer = g_queue_peek_nth (selpad->cached_buffers, i);
787       g_string_append_printf (timestamps, " %" GST_TIME_FORMAT,
788           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (cached_buffer->buffer)));
789     }
790     str = g_string_free (timestamps, FALSE);
791     GST_DEBUG_OBJECT (selpad, str);
792     g_free (str);
793   }
794 }
795 #endif
796 
797 /* must be called with the SELECTOR_LOCK */
798 static void
gst_input_selector_cleanup_old_cached_buffers(RsnInputSelector * sel,GstPad * pad)799 gst_input_selector_cleanup_old_cached_buffers (RsnInputSelector * sel,
800     GstPad * pad)
801 {
802   GstClock *clock;
803   gint64 cur_running_time;
804   GList *walk;
805 
806   cur_running_time = GST_CLOCK_TIME_NONE;
807   if (sel->sync_mode == GST_INPUT_SELECTOR_SYNC_MODE_CLOCK) {
808     clock = gst_element_get_clock (GST_ELEMENT_CAST (sel));
809     if (clock) {
810       GstClockTime base_time;
811 
812       cur_running_time = gst_clock_get_time (clock);
813       base_time = gst_element_get_base_time (GST_ELEMENT_CAST (sel));
814       if (base_time <= cur_running_time)
815         cur_running_time -= base_time;
816       else
817         cur_running_time = 0;
818       gst_object_unref (clock);
819     }
820   } else {
821     GstPad *active_sinkpad;
822     RsnSelectorPad *active_selpad;
823     GstSegment *active_seg;
824 
825     active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
826     active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
827     active_seg = &active_selpad->segment;
828 
829     /* Get active pad's running time, if no configured segment yet keep at -1 */
830     if (active_seg->format == GST_FORMAT_TIME)
831       cur_running_time = gst_segment_to_running_time (active_seg,
832           GST_FORMAT_TIME, active_seg->position);
833   }
834 
835   if (!GST_CLOCK_TIME_IS_VALID (cur_running_time))
836     return;
837 
838   GST_DEBUG_OBJECT (sel, "Cleaning up old cached buffers");
839   for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk; walk = g_list_next (walk)) {
840     RsnSelectorPad *selpad;
841     GstSegment *seg;
842     RsnSelectorPadCachedBuffer *cached_buffer;
843     GSList *maybe_remove;
844     guint queue_position;
845 
846     selpad = GST_SELECTOR_PAD_CAST (walk->data);
847     if (!selpad->cached_buffers)
848       continue;
849 
850     seg = &selpad->segment;
851 
852     maybe_remove = NULL;
853     queue_position = 0;
854     while ((cached_buffer = g_queue_peek_nth (selpad->cached_buffers,
855                 queue_position))) {
856       GstBuffer *buffer = cached_buffer->buffer;
857       GstClockTime running_time;
858       GSList *l;
859 
860       /* If we have no valid timestamp we can't sync this buffer */
861       if (!GST_BUFFER_TIMESTAMP_IS_VALID (buffer)) {
862         maybe_remove = g_slist_append (maybe_remove, cached_buffer);
863         queue_position = g_slist_length (maybe_remove);
864         continue;
865       }
866 
867       /* the buffer is still valid if its duration is valid and the
868        * timestamp + duration is >= time, or if its duration is invalid
869        * and the timestamp is >= time */
870       running_time = GST_BUFFER_TIMESTAMP (buffer);
871       /* If possible try to get the running time at the end of the buffer */
872       if (GST_BUFFER_DURATION_IS_VALID (buffer))
873         running_time += GST_BUFFER_DURATION (buffer);
874       /* Only use the segment to convert to running time if the segment is
875        * in TIME format, otherwise do our best to try to sync */
876       if (GST_CLOCK_TIME_IS_VALID (seg->stop)) {
877         if (running_time > seg->stop) {
878           running_time = seg->stop;
879         }
880       }
881       running_time =
882           gst_segment_to_running_time (seg, GST_FORMAT_TIME, running_time);
883 
884       GST_DEBUG_OBJECT (selpad,
885           "checking if buffer %p running time=%" GST_TIME_FORMAT
886           " >= stream time=%" GST_TIME_FORMAT, buffer,
887           GST_TIME_ARGS (running_time), GST_TIME_ARGS (cur_running_time));
888       if (running_time >= cur_running_time) {
889         break;
890       }
891 
892       GST_DEBUG_OBJECT (selpad, "Removing old cached buffer %p", buffer);
893       g_queue_pop_nth (selpad->cached_buffers, queue_position);
894       gst_selector_pad_free_cached_buffer (cached_buffer);
895 
896       for (l = maybe_remove; l != NULL; l = g_slist_next (l)) {
897         /* A buffer after some invalid buffers was removed, it means the invalid buffers
898          * are old, lets also remove them */
899         cached_buffer = l->data;
900         g_queue_remove (selpad->cached_buffers, cached_buffer);
901         gst_selector_pad_free_cached_buffer (cached_buffer);
902       }
903 
904       g_slist_free (maybe_remove);
905       maybe_remove = NULL;
906       queue_position = 0;
907     }
908 
909     g_slist_free (maybe_remove);
910     maybe_remove = NULL;
911 
912     if (g_queue_is_empty (selpad->cached_buffers)) {
913       g_queue_free (selpad->cached_buffers);
914       selpad->cached_buffers = NULL;
915     }
916   }
917 
918 #if DEBUG_CACHED_BUFFERS
919   gst_input_selector_debug_cached_buffers (sel);
920 #endif
921 }
922 
923 static GstFlowReturn
gst_selector_pad_chain(GstPad * pad,GstObject * parent,GstBuffer * buf)924 gst_selector_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
925 {
926   RsnInputSelector *sel;
927   GstFlowReturn res;
928   GstPad *active_sinkpad;
929   GstPad *prev_active_sinkpad;
930   RsnSelectorPad *selpad;
931   GstClockTime start_time;
932 
933   sel = GST_INPUT_SELECTOR (parent);
934   selpad = GST_SELECTOR_PAD_CAST (pad);
935 
936   GST_DEBUG_OBJECT (selpad,
937       "entering chain for buf %p with timestamp %" GST_TIME_FORMAT, buf,
938       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
939 
940   GST_INPUT_SELECTOR_LOCK (sel);
941   /* wait or check for flushing */
942   if (gst_input_selector_wait (sel, selpad)) {
943     GST_INPUT_SELECTOR_UNLOCK (sel);
944     goto flushing;
945   }
946 
947   GST_LOG_OBJECT (pad, "getting active pad");
948 
949   prev_active_sinkpad = sel->active_sinkpad;
950   active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
951 
952   /* In sync mode wait until the active pad has advanced
953    * after the running time of the current buffer */
954   if (sel->sync_streams) {
955     /* call chain for each cached buffer if we are not the active pad
956      * or if we are the active pad but didn't push anything yet. */
957     if (active_sinkpad != pad || !selpad->pushed) {
958       /* no need to check for sel->cache_buffers as selpad->cached_buffers
959        * will only be valid if cache_buffers is TRUE */
960       if (selpad->cached_buffers && !selpad->sending_cached_buffers) {
961         RsnSelectorPadCachedBuffer *cached_buffer;
962         GstSegment saved_segment;
963 
964         saved_segment = selpad->segment;
965 
966         selpad->sending_cached_buffers = TRUE;
967         while (!sel->flushing && !selpad->flushing &&
968             (cached_buffer = g_queue_pop_head (selpad->cached_buffers))) {
969           GST_DEBUG_OBJECT (pad, "Cached buffers found, "
970               "invoking chain for cached buffer %p", cached_buffer->buffer);
971 
972           selpad->segment = cached_buffer->segment;
973           selpad->events_pending = TRUE;
974           GST_INPUT_SELECTOR_UNLOCK (sel);
975           gst_selector_pad_chain (pad, parent, cached_buffer->buffer);
976           GST_INPUT_SELECTOR_LOCK (sel);
977 
978           /* we may have cleaned up the queue in the meantime because of
979            * old buffers */
980           if (!selpad->cached_buffers) {
981             break;
982           }
983         }
984         selpad->sending_cached_buffers = FALSE;
985 
986         /* all cached buffers sent, restore segment for current buffer */
987         selpad->segment = saved_segment;
988         selpad->events_pending = TRUE;
989 
990         /* Might have changed while calling chain for cached buffers */
991         active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
992       }
993     }
994 
995     if (active_sinkpad != pad) {
996       GST_INPUT_SELECTOR_UNLOCK (sel);
997       if (gst_input_selector_wait_running_time (sel, selpad, buf))
998         goto flushing;
999       GST_INPUT_SELECTOR_LOCK (sel);
1000     }
1001 
1002     /* Might have changed while waiting */
1003     active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
1004   }
1005 
1006   /* update the segment on the srcpad */
1007   start_time = GST_BUFFER_TIMESTAMP (buf);
1008   if (GST_CLOCK_TIME_IS_VALID (start_time)) {
1009     GST_LOG_OBJECT (pad, "received start time %" GST_TIME_FORMAT,
1010         GST_TIME_ARGS (start_time));
1011     if (GST_BUFFER_DURATION_IS_VALID (buf))
1012       GST_LOG_OBJECT (pad, "received end time %" GST_TIME_FORMAT,
1013           GST_TIME_ARGS (start_time + GST_BUFFER_DURATION (buf)));
1014 
1015     GST_OBJECT_LOCK (pad);
1016     selpad->position = start_time;
1017     selpad->segment.position = start_time;
1018     GST_OBJECT_UNLOCK (pad);
1019   }
1020 
1021   /* Ignore buffers from pads except the selected one */
1022   if (pad != active_sinkpad)
1023     goto ignore;
1024 
1025   /* Tell all non-active pads that we advanced the running time */
1026   if (sel->sync_streams)
1027     GST_INPUT_SELECTOR_BROADCAST (sel);
1028 
1029   GST_INPUT_SELECTOR_UNLOCK (sel);
1030 
1031   if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
1032     g_object_notify (G_OBJECT (sel), "active-pad");
1033   }
1034 
1035   /* if we have a pending events, push them now */
1036   if (G_UNLIKELY (prev_active_sinkpad != active_sinkpad
1037           || selpad->events_pending)) {
1038     gst_pad_sticky_events_foreach (GST_PAD_CAST (selpad), forward_sticky_events,
1039         sel);
1040     selpad->events_pending = FALSE;
1041   }
1042 
1043   if (selpad->discont) {
1044     buf = gst_buffer_make_writable (buf);
1045 
1046     GST_DEBUG_OBJECT (pad, "Marking discont buffer %p", buf);
1047     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
1048     selpad->discont = FALSE;
1049   }
1050 
1051   /* forward */
1052   GST_LOG_OBJECT (pad, "Forwarding buffer %p with timestamp %" GST_TIME_FORMAT,
1053       buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
1054 
1055   res = gst_pad_push (sel->srcpad, gst_buffer_ref (buf));
1056   GST_LOG_OBJECT (pad, "Buffer %p forwarded result=%d", buf, res);
1057 
1058   GST_INPUT_SELECTOR_LOCK (sel);
1059 
1060   if (sel->sync_streams && sel->cache_buffers) {
1061     /* Might have changed while pushing */
1062     active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
1063     /* only set pad to pushed if we are still the active pad */
1064     if (active_sinkpad == pad)
1065       selpad->pushed = TRUE;
1066 
1067     /* cache buffer as we may need it again if we change pads */
1068     gst_selector_pad_cache_buffer (selpad, buf);
1069     gst_input_selector_cleanup_old_cached_buffers (sel, pad);
1070   } else {
1071     selpad->pushed = TRUE;
1072     gst_buffer_unref (buf);
1073   }
1074   GST_INPUT_SELECTOR_UNLOCK (sel);
1075 
1076 done:
1077   return res;
1078 
1079   /* dropped buffers */
1080 ignore:
1081   {
1082     gboolean active_pad_pushed = GST_SELECTOR_PAD_CAST (active_sinkpad)->pushed;
1083 
1084     GST_DEBUG_OBJECT (pad, "Pad not active, discard buffer %p", buf);
1085     /* when we drop a buffer, we're creating a discont on this pad */
1086     selpad->discont = TRUE;
1087     GST_INPUT_SELECTOR_UNLOCK (sel);
1088     gst_buffer_unref (buf);
1089 
1090     /* figure out what to return upstream */
1091     GST_OBJECT_LOCK (selpad);
1092     if (selpad->always_ok || !active_pad_pushed)
1093       res = GST_FLOW_OK;
1094     else
1095       res = GST_FLOW_NOT_LINKED;
1096     GST_OBJECT_UNLOCK (selpad);
1097 
1098     goto done;
1099   }
1100 flushing:
1101   {
1102     GST_DEBUG_OBJECT (pad, "We are flushing, discard buffer %p", buf);
1103     gst_buffer_unref (buf);
1104     res = GST_FLOW_FLUSHING;
1105     goto done;
1106   }
1107 }
1108 
1109 static void gst_input_selector_dispose (GObject * object);
1110 static void gst_input_selector_finalize (GObject * object);
1111 
1112 static void gst_input_selector_set_property (GObject * object,
1113     guint prop_id, const GValue * value, GParamSpec * pspec);
1114 static void gst_input_selector_get_property (GObject * object,
1115     guint prop_id, GValue * value, GParamSpec * pspec);
1116 
1117 static GstPad *gst_input_selector_request_new_pad (GstElement * element,
1118     GstPadTemplate * templ, const gchar * unused, const GstCaps * caps);
1119 static void gst_input_selector_release_pad (GstElement * element, GstPad * pad);
1120 
1121 static GstStateChangeReturn gst_input_selector_change_state (GstElement *
1122     element, GstStateChange transition);
1123 
1124 static gboolean gst_input_selector_event (GstPad * pad, GstObject * parent,
1125     GstEvent * event);
1126 static gboolean gst_input_selector_query (GstPad * pad, GstObject * parent,
1127     GstQuery * query);
1128 static gint64 gst_input_selector_block (RsnInputSelector * self);
1129 
1130 #define _do_init \
1131     GST_DEBUG_CATEGORY_INIT (input_selector_debug, \
1132         "rsninputselector", 0, "An input stream selector element");
1133 #define gst_input_selector_parent_class parent_class
1134 G_DEFINE_TYPE_WITH_CODE (RsnInputSelector, gst_input_selector, GST_TYPE_ELEMENT,
1135     _do_init);
1136 
1137 static void
gst_input_selector_class_init(RsnInputSelectorClass * klass)1138 gst_input_selector_class_init (RsnInputSelectorClass * klass)
1139 {
1140   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
1141   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
1142 
1143   gobject_class->dispose = gst_input_selector_dispose;
1144   gobject_class->finalize = gst_input_selector_finalize;
1145 
1146   gobject_class->set_property = gst_input_selector_set_property;
1147   gobject_class->get_property = gst_input_selector_get_property;
1148 
1149   g_object_class_install_property (gobject_class, PROP_N_PADS,
1150       g_param_spec_uint ("n-pads", "Number of Pads",
1151           "The number of sink pads", 0, G_MAXUINT, 0,
1152           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1153 
1154   g_object_class_install_property (gobject_class, PROP_ACTIVE_PAD,
1155       g_param_spec_object ("active-pad", "Active pad",
1156           "The currently active sink pad", GST_TYPE_PAD,
1157           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1158 
1159   /**
1160    * RsnInputSelector:sync-streams
1161    *
1162    * If set to %TRUE all inactive streams will be synced to the
1163    * running time of the active stream or to the current clock.
1164    *
1165    * To make sure no buffers are dropped by input-selector
1166    * that might be needed when switching the active pad,
1167    * sync-mode should be set to "clock" and cache-buffers to TRUE.
1168    */
1169   g_object_class_install_property (gobject_class, PROP_SYNC_STREAMS,
1170       g_param_spec_boolean ("sync-streams", "Sync Streams",
1171           "Synchronize inactive streams to the running time of the active "
1172           "stream or to the current clock",
1173           DEFAULT_SYNC_STREAMS,
1174           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
1175           GST_PARAM_MUTABLE_READY));
1176 
1177   /**
1178    * RsnInputSelector:sync-mode
1179    *
1180    * Select how input-selector will sync buffers when in sync-streams mode.
1181    *
1182    * Note that when using the "active-segment" mode, the "active-segment" may
1183    * be ahead of current clock time when switching the active pad, as the current
1184    * active pad may have pushed more buffers than what was displayed/consumed,
1185    * which may cause delays and some missing buffers.
1186    */
1187   g_object_class_install_property (gobject_class, PROP_SYNC_MODE,
1188       g_param_spec_enum ("sync-mode", "Sync mode",
1189           "Behavior in sync-streams mode", GST_TYPE_INPUT_SELECTOR_SYNC_MODE,
1190           DEFAULT_SYNC_MODE,
1191           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
1192           GST_PARAM_MUTABLE_READY));
1193 
1194   /**
1195    * RsnInputSelector:cache-buffers
1196    *
1197    * If set to %TRUE and RsnInputSelector:sync-streams is also set to %TRUE,
1198    * the active pad will cache the buffers still considered valid (after current
1199    * running time, see sync-mode) to avoid missing frames if/when the pad is
1200    * reactivated.
1201    *
1202    * The active pad may push more buffers than what is currently displayed/consumed
1203    * and when changing pads those buffers will be discarded and the only way to
1204    * reactivate that pad without loosing the already consumed buffers is to enable cache.
1205    */
1206   g_object_class_install_property (gobject_class, PROP_CACHE_BUFFERS,
1207       g_param_spec_boolean ("cache-buffers", "Cache Buffers",
1208           "Cache buffers for active-pad",
1209           DEFAULT_CACHE_BUFFERS,
1210           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
1211           GST_PARAM_MUTABLE_READY));
1212 
1213   /**
1214    * RsnInputSelector::block:
1215    * @inputselector: the #RsnInputSelector
1216    *
1217    * Block all sink pads in preparation for a switch. Returns the stop time of
1218    * the current switch segment, as a running time, or 0 if there is no current
1219    * active pad or the current active pad never received data.
1220    */
1221   gst_input_selector_signals[SIGNAL_BLOCK] =
1222       g_signal_new ("block", G_TYPE_FROM_CLASS (klass),
1223       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
1224       G_STRUCT_OFFSET (RsnInputSelectorClass, block), NULL, NULL,
1225       g_cclosure_marshal_generic, G_TYPE_INT64, 0);
1226 
1227   gst_element_class_set_static_metadata (gstelement_class, "Input selector",
1228       "Generic", "N-to-1 input stream selector",
1229       "Julien Moutte <julien@moutte.net>, "
1230       "Jan Schmidt <thaytan@mad.scientist.com>, "
1231       "Wim Taymans <wim.taymans@gmail.com>");
1232   gst_element_class_add_static_pad_template (gstelement_class,
1233       &gst_input_selector_sink_factory);
1234   gst_element_class_add_static_pad_template (gstelement_class,
1235       &gst_input_selector_src_factory);
1236 
1237   gstelement_class->request_new_pad = gst_input_selector_request_new_pad;
1238   gstelement_class->release_pad = gst_input_selector_release_pad;
1239   gstelement_class->change_state = gst_input_selector_change_state;
1240 
1241   klass->block = GST_DEBUG_FUNCPTR (gst_input_selector_block);
1242 }
1243 
1244 static void
gst_input_selector_init(RsnInputSelector * sel)1245 gst_input_selector_init (RsnInputSelector * sel)
1246 {
1247   sel->srcpad = gst_pad_new ("src", GST_PAD_SRC);
1248   gst_pad_set_iterate_internal_links_function (sel->srcpad,
1249       GST_DEBUG_FUNCPTR (gst_selector_pad_iterate_linked_pads));
1250   gst_pad_set_query_function (sel->srcpad,
1251       GST_DEBUG_FUNCPTR (gst_input_selector_query));
1252   gst_pad_set_event_function (sel->srcpad,
1253       GST_DEBUG_FUNCPTR (gst_input_selector_event));
1254   GST_OBJECT_FLAG_SET (sel->srcpad, GST_PAD_FLAG_PROXY_CAPS);
1255   gst_element_add_pad (GST_ELEMENT (sel), sel->srcpad);
1256   /* sinkpad management */
1257   sel->active_sinkpad = NULL;
1258   sel->padcount = 0;
1259   sel->sync_streams = DEFAULT_SYNC_STREAMS;
1260 
1261   g_mutex_init (&sel->lock);
1262   g_cond_init (&sel->cond);
1263   sel->blocked = FALSE;
1264 
1265   /* lets give a change for downstream to do something on
1266    * active-pad change before we start pushing new buffers */
1267   g_signal_connect_data (sel, "notify::active-pad",
1268       (GCallback) gst_input_selector_active_pad_changed, NULL,
1269       NULL, G_CONNECT_AFTER);
1270 }
1271 
1272 static void
gst_input_selector_dispose(GObject * object)1273 gst_input_selector_dispose (GObject * object)
1274 {
1275   RsnInputSelector *sel = GST_INPUT_SELECTOR (object);
1276 
1277   if (sel->active_sinkpad) {
1278     gst_object_unref (sel->active_sinkpad);
1279     sel->active_sinkpad = NULL;
1280   }
1281   G_OBJECT_CLASS (parent_class)->dispose (object);
1282 }
1283 
1284 static void
gst_input_selector_finalize(GObject * object)1285 gst_input_selector_finalize (GObject * object)
1286 {
1287   RsnInputSelector *sel = GST_INPUT_SELECTOR (object);
1288 
1289   g_mutex_clear (&sel->lock);
1290   g_cond_clear (&sel->cond);
1291 
1292   G_OBJECT_CLASS (parent_class)->finalize (object);
1293 }
1294 
1295 /* this function must be called with the SELECTOR_LOCK. It returns TRUE when the
1296  * active pad changed. */
1297 static gboolean
gst_input_selector_set_active_pad(RsnInputSelector * self,GstPad * pad)1298 gst_input_selector_set_active_pad (RsnInputSelector * self, GstPad * pad)
1299 {
1300   RsnSelectorPad *old, *new;
1301   GstPad **active_pad_p;
1302 
1303   if (pad == self->active_sinkpad)
1304     return FALSE;
1305 
1306   old = GST_SELECTOR_PAD_CAST (self->active_sinkpad);
1307   new = GST_SELECTOR_PAD_CAST (pad);
1308 
1309   GST_DEBUG_OBJECT (self, "setting active pad to %s:%s",
1310       GST_DEBUG_PAD_NAME (new));
1311 
1312   if (old)
1313     old->pushed = FALSE;
1314   if (new)
1315     new->pushed = FALSE;
1316 
1317   /* Send a new SEGMENT event on the new pad next */
1318   if (old != new && new)
1319     new->events_pending = TRUE;
1320 
1321   active_pad_p = &self->active_sinkpad;
1322   gst_object_replace ((GstObject **) active_pad_p, GST_OBJECT_CAST (pad));
1323 
1324   if (old && old != new)
1325     gst_pad_push_event (GST_PAD_CAST (old), gst_event_new_reconfigure ());
1326   if (new)
1327     gst_pad_push_event (GST_PAD_CAST (new), gst_event_new_reconfigure ());
1328 
1329   GST_DEBUG_OBJECT (self, "New active pad is %" GST_PTR_FORMAT,
1330       self->active_sinkpad);
1331 
1332   return TRUE;
1333 }
1334 
1335 static void
gst_input_selector_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1336 gst_input_selector_set_property (GObject * object, guint prop_id,
1337     const GValue * value, GParamSpec * pspec)
1338 {
1339   RsnInputSelector *sel = GST_INPUT_SELECTOR (object);
1340 
1341   switch (prop_id) {
1342     case PROP_ACTIVE_PAD:
1343     {
1344       GstPad *pad;
1345 
1346       pad = g_value_get_object (value);
1347 
1348       GST_INPUT_SELECTOR_LOCK (sel);
1349 
1350 #if DEBUG_CACHED_BUFFERS
1351       gst_input_selector_debug_cached_buffers (sel);
1352 #endif
1353 
1354       gst_input_selector_set_active_pad (sel, pad);
1355 
1356 #if DEBUG_CACHED_BUFFERS
1357       gst_input_selector_debug_cached_buffers (sel);
1358 #endif
1359 
1360       GST_INPUT_SELECTOR_UNLOCK (sel);
1361       break;
1362     }
1363     case PROP_SYNC_STREAMS:
1364       GST_INPUT_SELECTOR_LOCK (sel);
1365       sel->sync_streams = g_value_get_boolean (value);
1366       GST_INPUT_SELECTOR_UNLOCK (sel);
1367       break;
1368     case PROP_SYNC_MODE:
1369       GST_INPUT_SELECTOR_LOCK (sel);
1370       sel->sync_mode = g_value_get_enum (value);
1371       GST_INPUT_SELECTOR_UNLOCK (sel);
1372       break;
1373     case PROP_CACHE_BUFFERS:
1374       GST_INPUT_SELECTOR_LOCK (object);
1375       sel->cache_buffers = g_value_get_boolean (value);
1376       GST_INPUT_SELECTOR_UNLOCK (object);
1377       break;
1378     default:
1379       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1380       break;
1381   }
1382 }
1383 
1384 static void
gst_input_selector_active_pad_changed(RsnInputSelector * sel,GParamSpec * pspec,gpointer user_data)1385 gst_input_selector_active_pad_changed (RsnInputSelector * sel,
1386     GParamSpec * pspec, gpointer user_data)
1387 {
1388   /* Wake up all non-active pads in sync mode, they might be
1389    * the active pad now */
1390   if (sel->sync_streams)
1391     GST_INPUT_SELECTOR_BROADCAST (sel);
1392 }
1393 
1394 static void
gst_input_selector_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1395 gst_input_selector_get_property (GObject * object, guint prop_id,
1396     GValue * value, GParamSpec * pspec)
1397 {
1398   RsnInputSelector *sel = GST_INPUT_SELECTOR (object);
1399 
1400   switch (prop_id) {
1401     case PROP_N_PADS:
1402       GST_INPUT_SELECTOR_LOCK (object);
1403       g_value_set_uint (value, sel->n_pads);
1404       GST_INPUT_SELECTOR_UNLOCK (object);
1405       break;
1406     case PROP_ACTIVE_PAD:
1407       GST_INPUT_SELECTOR_LOCK (object);
1408       g_value_set_object (value, sel->active_sinkpad);
1409       GST_INPUT_SELECTOR_UNLOCK (object);
1410       break;
1411     case PROP_SYNC_STREAMS:
1412       GST_INPUT_SELECTOR_LOCK (object);
1413       g_value_set_boolean (value, sel->sync_streams);
1414       GST_INPUT_SELECTOR_UNLOCK (object);
1415       break;
1416     case PROP_SYNC_MODE:
1417       GST_INPUT_SELECTOR_LOCK (object);
1418       g_value_set_enum (value, sel->sync_mode);
1419       GST_INPUT_SELECTOR_UNLOCK (object);
1420       break;
1421     case PROP_CACHE_BUFFERS:
1422       GST_INPUT_SELECTOR_LOCK (object);
1423       g_value_set_boolean (value, sel->cache_buffers);
1424       GST_INPUT_SELECTOR_UNLOCK (object);
1425       break;
1426     default:
1427       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1428       break;
1429   }
1430 }
1431 
1432 static GstPad *
gst_input_selector_get_linked_pad(RsnInputSelector * sel,GstPad * pad,gboolean strict)1433 gst_input_selector_get_linked_pad (RsnInputSelector * sel, GstPad * pad,
1434     gboolean strict)
1435 {
1436   GstPad *otherpad = NULL;
1437 
1438   GST_INPUT_SELECTOR_LOCK (sel);
1439   if (pad == sel->srcpad)
1440     otherpad = sel->active_sinkpad;
1441   else if (pad == sel->active_sinkpad || !strict)
1442     otherpad = sel->srcpad;
1443   if (otherpad)
1444     gst_object_ref (otherpad);
1445   GST_INPUT_SELECTOR_UNLOCK (sel);
1446 
1447   return otherpad;
1448 }
1449 
1450 static gboolean
gst_input_selector_event(GstPad * pad,GstObject * parent,GstEvent * event)1451 gst_input_selector_event (GstPad * pad, GstObject * parent, GstEvent * event)
1452 {
1453   RsnInputSelector *sel;
1454   gboolean result = FALSE;
1455   GstIterator *iter;
1456   gboolean done = FALSE;
1457   GValue item = { 0, };
1458   GstPad *eventpad;
1459   GList *pushed_pads = NULL;
1460 
1461   sel = GST_INPUT_SELECTOR (parent);
1462   /* Send upstream events to all sinkpads */
1463   iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (sel));
1464 
1465   /* This is now essentially a copy of gst_pad_event_default_dispatch
1466    * with a different iterator */
1467   while (!done) {
1468     switch (gst_iterator_next (iter, &item)) {
1469       case GST_ITERATOR_OK:
1470         eventpad = g_value_get_object (&item);
1471 
1472         /* if already pushed,  skip */
1473         if (g_list_find (pushed_pads, eventpad)) {
1474           g_value_reset (&item);
1475           break;
1476         }
1477 
1478         gst_event_ref (event);
1479         result |= gst_pad_push_event (eventpad, event);
1480 
1481         g_value_reset (&item);
1482         break;
1483       case GST_ITERATOR_RESYNC:
1484         /* We don't reset the result here because we don't push the event
1485          * again on pads that got the event already and because we need
1486          * to consider the result of the previous pushes */
1487         gst_iterator_resync (iter);
1488         break;
1489       case GST_ITERATOR_ERROR:
1490         GST_ERROR_OBJECT (pad, "Could not iterate over sinkpads");
1491         done = TRUE;
1492         break;
1493       case GST_ITERATOR_DONE:
1494         done = TRUE;
1495         break;
1496     }
1497   }
1498   g_value_unset (&item);
1499   gst_iterator_free (iter);
1500 
1501   g_list_free (pushed_pads);
1502 
1503   gst_event_unref (event);
1504 
1505   return result;
1506 }
1507 
1508 /* query on the srcpad. We override this function because by default it will
1509  * only forward the query to one random sinkpad */
1510 static gboolean
gst_input_selector_query(GstPad * pad,GstObject * parent,GstQuery * query)1511 gst_input_selector_query (GstPad * pad, GstObject * parent, GstQuery * query)
1512 {
1513   gboolean res = FALSE;
1514   RsnInputSelector *sel;
1515 
1516   sel = GST_INPUT_SELECTOR (parent);
1517 
1518   switch (GST_QUERY_TYPE (query)) {
1519     case GST_QUERY_LATENCY:
1520     {
1521       GList *walk;
1522       GstClockTime resmin, resmax;
1523       gboolean reslive;
1524 
1525       resmin = 0;
1526       resmax = -1;
1527       reslive = FALSE;
1528 
1529       /* perform the query on all sinkpads and combine the results. We take the
1530        * max of min and the min of max for the result latency. */
1531       res = TRUE;
1532       GST_INPUT_SELECTOR_LOCK (sel);
1533       for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk;
1534           walk = g_list_next (walk)) {
1535         GstPad *sinkpad = GST_PAD_CAST (walk->data);
1536 
1537         if (gst_pad_peer_query (sinkpad, query)) {
1538           GstClockTime min, max;
1539           gboolean live;
1540 
1541           gst_query_parse_latency (query, &live, &min, &max);
1542 
1543           GST_DEBUG_OBJECT (sinkpad,
1544               "peer latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
1545               ", live %d", GST_TIME_ARGS (min), GST_TIME_ARGS (max), live);
1546 
1547           if (live) {
1548             if (min > resmin)
1549               resmin = min;
1550             if (resmax == -1)
1551               resmax = max;
1552             else if (max < resmax)
1553               resmax = max;
1554             reslive = TRUE;
1555           }
1556         } else {
1557           GST_LOG_OBJECT (sinkpad, "latency query failed");
1558           res = FALSE;
1559         }
1560       }
1561       GST_INPUT_SELECTOR_UNLOCK (sel);
1562       if (res) {
1563         gst_query_set_latency (query, reslive, resmin, resmax);
1564 
1565         GST_DEBUG_OBJECT (sel,
1566             "total latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
1567             ", live %d", GST_TIME_ARGS (resmin), GST_TIME_ARGS (resmax),
1568             reslive);
1569       }
1570 
1571       break;
1572     }
1573     default:
1574       res = gst_pad_query_default (pad, parent, query);
1575       break;
1576   }
1577 
1578   return res;
1579 }
1580 
1581 /* check if the pad is the active sinkpad */
1582 static inline gboolean
gst_input_selector_is_active_sinkpad(RsnInputSelector * sel,GstPad * pad)1583 gst_input_selector_is_active_sinkpad (RsnInputSelector * sel, GstPad * pad)
1584 {
1585   gboolean res;
1586 
1587   GST_INPUT_SELECTOR_LOCK (sel);
1588   res = (pad == sel->active_sinkpad);
1589   GST_INPUT_SELECTOR_UNLOCK (sel);
1590 
1591   return res;
1592 }
1593 
1594 /* Get or create the active sinkpad, must be called with SELECTOR_LOCK */
1595 static GstPad *
gst_input_selector_activate_sinkpad(RsnInputSelector * sel,GstPad * pad)1596 gst_input_selector_activate_sinkpad (RsnInputSelector * sel, GstPad * pad)
1597 {
1598   GstPad *active_sinkpad;
1599   RsnSelectorPad *selpad;
1600 
1601   selpad = GST_SELECTOR_PAD_CAST (pad);
1602 
1603   selpad->active = TRUE;
1604   active_sinkpad = sel->active_sinkpad;
1605   if (sel->active_sinkpad == NULL) {
1606     GValue item = G_VALUE_INIT;
1607     GstIterator *iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (sel));
1608     GstIteratorResult ires;
1609 
1610     while ((ires = gst_iterator_next (iter, &item)) == GST_ITERATOR_RESYNC)
1611       gst_iterator_resync (iter);
1612     if (ires == GST_ITERATOR_OK) {
1613       /* If no pad is currently selected, we return the first usable pad to
1614        * guarantee consistency */
1615 
1616       active_sinkpad = sel->active_sinkpad = g_value_dup_object (&item);
1617       g_value_reset (&item);
1618       GST_DEBUG_OBJECT (sel, "Activating pad %s:%s",
1619           GST_DEBUG_PAD_NAME (active_sinkpad));
1620     } else
1621       GST_WARNING_OBJECT (sel, "Couldn't find a default sink pad");
1622     gst_iterator_free (iter);
1623   }
1624 
1625   return active_sinkpad;
1626 }
1627 
1628 static GstPad *
gst_input_selector_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * unused,const GstCaps * caps)1629 gst_input_selector_request_new_pad (GstElement * element,
1630     GstPadTemplate * templ, const gchar * unused, const GstCaps * caps)
1631 {
1632   RsnInputSelector *sel;
1633   gchar *name = NULL;
1634   GstPad *sinkpad = NULL;
1635 
1636   g_return_val_if_fail (templ->direction == GST_PAD_SINK, NULL);
1637 
1638   sel = GST_INPUT_SELECTOR (element);
1639 
1640   GST_INPUT_SELECTOR_LOCK (sel);
1641 
1642   GST_LOG_OBJECT (sel, "Creating new pad %d", sel->padcount);
1643   name = g_strdup_printf ("sink_%u", sel->padcount++);
1644   sinkpad = g_object_new (GST_TYPE_SELECTOR_PAD,
1645       "name", name, "direction", templ->direction, "template", templ, NULL);
1646   g_free (name);
1647 
1648   sel->n_pads++;
1649 
1650   gst_pad_set_event_function (sinkpad,
1651       GST_DEBUG_FUNCPTR (gst_selector_pad_event));
1652   gst_pad_set_query_function (sinkpad,
1653       GST_DEBUG_FUNCPTR (gst_selector_pad_query));
1654   gst_pad_set_chain_function (sinkpad,
1655       GST_DEBUG_FUNCPTR (gst_selector_pad_chain));
1656   gst_pad_set_iterate_internal_links_function (sinkpad,
1657       GST_DEBUG_FUNCPTR (gst_selector_pad_iterate_linked_pads));
1658 
1659   GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_CAPS);
1660   GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_ALLOCATION);
1661   gst_pad_set_active (sinkpad, TRUE);
1662   gst_element_add_pad (GST_ELEMENT (sel), sinkpad);
1663   GST_INPUT_SELECTOR_UNLOCK (sel);
1664 
1665   return sinkpad;
1666 }
1667 
1668 static void
gst_input_selector_release_pad(GstElement * element,GstPad * pad)1669 gst_input_selector_release_pad (GstElement * element, GstPad * pad)
1670 {
1671   RsnInputSelector *sel;
1672 
1673   sel = GST_INPUT_SELECTOR (element);
1674   GST_LOG_OBJECT (sel, "Releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1675 
1676   GST_INPUT_SELECTOR_LOCK (sel);
1677   /* if the pad was the active pad, makes us select a new one */
1678   if (sel->active_sinkpad == pad) {
1679     GST_DEBUG_OBJECT (sel, "Deactivating pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1680     gst_object_unref (sel->active_sinkpad);
1681     sel->active_sinkpad = NULL;
1682   }
1683   sel->n_pads--;
1684 
1685   gst_pad_set_active (pad, FALSE);
1686   gst_element_remove_pad (GST_ELEMENT (sel), pad);
1687   GST_INPUT_SELECTOR_UNLOCK (sel);
1688 }
1689 
1690 static void
gst_input_selector_reset(RsnInputSelector * sel)1691 gst_input_selector_reset (RsnInputSelector * sel)
1692 {
1693   GList *walk;
1694 
1695   GST_INPUT_SELECTOR_LOCK (sel);
1696   /* clear active pad */
1697   if (sel->active_sinkpad) {
1698     gst_object_unref (sel->active_sinkpad);
1699     sel->active_sinkpad = NULL;
1700   }
1701   /* reset each of our sinkpads state */
1702   for (walk = GST_ELEMENT_CAST (sel)->sinkpads; walk; walk = g_list_next (walk)) {
1703     RsnSelectorPad *selpad = GST_SELECTOR_PAD_CAST (walk->data);
1704 
1705     gst_selector_pad_reset (selpad);
1706 
1707     if (selpad->tags) {
1708       gst_tag_list_unref (selpad->tags);
1709       selpad->tags = NULL;
1710     }
1711   }
1712   GST_INPUT_SELECTOR_UNLOCK (sel);
1713 }
1714 
1715 static GstStateChangeReturn
gst_input_selector_change_state(GstElement * element,GstStateChange transition)1716 gst_input_selector_change_state (GstElement * element,
1717     GstStateChange transition)
1718 {
1719   RsnInputSelector *self = GST_INPUT_SELECTOR (element);
1720   GstStateChangeReturn result;
1721 
1722   switch (transition) {
1723     case GST_STATE_CHANGE_READY_TO_PAUSED:
1724       GST_INPUT_SELECTOR_LOCK (self);
1725       self->blocked = FALSE;
1726       self->flushing = FALSE;
1727       GST_INPUT_SELECTOR_UNLOCK (self);
1728       break;
1729     case GST_STATE_CHANGE_PAUSED_TO_READY:
1730       /* first unlock before we call the parent state change function, which
1731        * tries to acquire the stream lock when going to ready. */
1732       GST_INPUT_SELECTOR_LOCK (self);
1733       self->blocked = FALSE;
1734       self->flushing = TRUE;
1735       GST_INPUT_SELECTOR_BROADCAST (self);
1736       GST_INPUT_SELECTOR_UNLOCK (self);
1737       break;
1738     default:
1739       break;
1740   }
1741 
1742   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1743 
1744   switch (transition) {
1745     case GST_STATE_CHANGE_PAUSED_TO_READY:
1746       gst_input_selector_reset (self);
1747       break;
1748     default:
1749       break;
1750   }
1751 
1752   return result;
1753 }
1754 
1755 static gint64
gst_input_selector_block(RsnInputSelector * self)1756 gst_input_selector_block (RsnInputSelector * self)
1757 {
1758   gint64 ret = 0;
1759   RsnSelectorPad *spad;
1760 
1761   GST_INPUT_SELECTOR_LOCK (self);
1762 
1763   if (self->blocked)
1764     GST_WARNING_OBJECT (self, "switch already blocked");
1765 
1766   self->blocked = TRUE;
1767   spad = GST_SELECTOR_PAD_CAST (self->active_sinkpad);
1768 
1769   if (spad)
1770     ret = gst_selector_pad_get_running_time (spad);
1771   else
1772     GST_DEBUG_OBJECT (self, "no active pad while blocking");
1773 
1774   GST_INPUT_SELECTOR_UNLOCK (self);
1775 
1776   return ret;
1777 }
1778