1 /* GStreamer Stream Combiner
2  * Copyright (C) 2010 Edward Hervey <edward.hervey@collabora.co.uk>
3  *           (C) 2009 Nokia Corporation
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include "gststreamcombiner.h"
26 #include "gststreamcombinerpad.h"
27 
28 static GstStaticPadTemplate src_template =
29 GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS,
30     GST_STATIC_CAPS_ANY);
31 
32 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink_%u",
33     GST_PAD_SINK,
34     GST_PAD_REQUEST,
35     GST_STATIC_CAPS_ANY);
36 
37 GST_DEBUG_CATEGORY_STATIC (gst_stream_combiner_debug);
38 #define GST_CAT_DEFAULT gst_stream_combiner_debug
39 
40 G_DEFINE_TYPE (GstStreamCombiner, gst_stream_combiner, GST_TYPE_ELEMENT);
41 
42 G_DEFINE_TYPE (GstStreamCombinerPad, gst_stream_combiner_pad, GST_TYPE_PAD);
43 
44 #define STREAMS_LOCK(obj) (g_mutex_lock(&obj->lock))
45 #define STREAMS_UNLOCK(obj) (g_mutex_unlock(&obj->lock))
46 
47 static void gst_stream_combiner_finalize (GObject * object);
48 
49 static GstPad *gst_stream_combiner_request_new_pad (GstElement * element,
50     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
51 static void gst_stream_combiner_release_pad (GstElement * element,
52     GstPad * pad);
53 
54 static void
gst_stream_combiner_pad_class_init(GstStreamCombinerPadClass * klass)55 gst_stream_combiner_pad_class_init (GstStreamCombinerPadClass * klass)
56 {
57   return;
58 }
59 
60 static void
gst_stream_combiner_pad_init(GstStreamCombinerPad * mixerpad)61 gst_stream_combiner_pad_init (GstStreamCombinerPad * mixerpad)
62 {
63   mixerpad->is_eos = FALSE;
64 }
65 
66 static void
gst_stream_combiner_class_init(GstStreamCombinerClass * klass)67 gst_stream_combiner_class_init (GstStreamCombinerClass * klass)
68 {
69   GObjectClass *gobject_klass;
70   GstElementClass *gstelement_klass;
71 
72   gobject_klass = (GObjectClass *) klass;
73   gstelement_klass = (GstElementClass *) klass;
74 
75   gobject_klass->finalize = gst_stream_combiner_finalize;
76 
77   GST_DEBUG_CATEGORY_INIT (gst_stream_combiner_debug, "streamcombiner", 0,
78       "Stream Combiner");
79 
80   gst_element_class_add_static_pad_template (gstelement_klass, &src_template);
81   gst_element_class_add_static_pad_template (gstelement_klass, &sink_template);
82 
83   gstelement_klass->request_new_pad =
84       GST_DEBUG_FUNCPTR (gst_stream_combiner_request_new_pad);
85   gstelement_klass->release_pad =
86       GST_DEBUG_FUNCPTR (gst_stream_combiner_release_pad);
87 
88   gst_element_class_set_static_metadata (gstelement_klass,
89       "streamcombiner", "Generic",
90       "Recombines streams splitted by the streamsplitter element",
91       "Edward Hervey <edward.hervey@collabora.co.uk>");
92 }
93 
94 static void
gst_stream_combiner_finalize(GObject * object)95 gst_stream_combiner_finalize (GObject * object)
96 {
97   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) object;
98 
99   g_mutex_clear (&stream_combiner->lock);
100 
101   G_OBJECT_CLASS (gst_stream_combiner_parent_class)->finalize (object);
102 }
103 
104 static GstFlowReturn
gst_stream_combiner_chain(GstPad * pad,GstObject * parent,GstBuffer * buf)105 gst_stream_combiner_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
106 {
107   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) parent;
108   /* FIXME : IMPLEMENT */
109 
110   /* with lock taken, check if we're the active stream, if not drop */
111   return gst_pad_push (stream_combiner->srcpad, buf);
112 }
113 
114 static gboolean
_all_sink_pads_eos(GstStreamCombiner * combiner)115 _all_sink_pads_eos (GstStreamCombiner * combiner)
116 {
117   GList *tmp;
118 
119   for (tmp = combiner->sinkpads; tmp; tmp = tmp->next) {
120     if (!(GST_STREAM_COMBINER_PAD (tmp->data))->is_eos)
121       return FALSE;
122   }
123 
124   return TRUE;
125 }
126 
127 static gboolean
gst_stream_combiner_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)128 gst_stream_combiner_sink_event (GstPad * pad, GstObject * parent,
129     GstEvent * event)
130 {
131   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) parent;
132   GstStreamCombinerPad *combiner_pad = GST_STREAM_COMBINER_PAD (pad);
133   /* FIXME : IMPLEMENT */
134 
135   GST_DEBUG_OBJECT (pad, "Got event %s", GST_EVENT_TYPE_NAME (event));
136 
137   switch (GST_EVENT_TYPE (event)) {
138     case GST_EVENT_EOS:
139       STREAMS_LOCK (stream_combiner);
140       combiner_pad->is_eos = TRUE;
141       if (!_all_sink_pads_eos (stream_combiner)) {
142         gst_event_unref (event);
143         event = NULL;
144       } else {
145         GST_DEBUG_OBJECT (stream_combiner, "All sink pads eos, pushing eos");
146       }
147       STREAMS_UNLOCK (stream_combiner);
148       break;
149     default:
150       break;
151   }
152 
153   /* SEGMENT : lock, wait for other stream to EOS, select stream, unlock, push */
154   /* FLUSH_START : lock, mark as flushing, unlock. if wasn't flushing forward */
155   /* FLUSH_STOP : lock, unmark as flushing, unlock, if was flushing forward */
156   /* OTHER : if selected pad forward */
157   if (event)
158     return gst_pad_push_event (stream_combiner->srcpad, event);
159   return FALSE;
160 }
161 
162 static gboolean
gst_stream_combiner_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)163 gst_stream_combiner_sink_query (GstPad * pad, GstObject * parent,
164     GstQuery * query)
165 {
166   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) parent;
167 
168   return gst_pad_peer_query (stream_combiner->srcpad, query);
169 }
170 
171 static gboolean
gst_stream_combiner_src_event(GstPad * pad,GstObject * parent,GstEvent * event)172 gst_stream_combiner_src_event (GstPad * pad, GstObject * parent,
173     GstEvent * event)
174 {
175   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) parent;
176   GstPad *sinkpad = NULL;
177 
178   STREAMS_LOCK (stream_combiner);
179   if (stream_combiner->current)
180     sinkpad = stream_combiner->current;
181   else if (stream_combiner->sinkpads)
182     sinkpad = (GstPad *) stream_combiner->sinkpads->data;
183   STREAMS_UNLOCK (stream_combiner);
184 
185   if (sinkpad)
186     /* Forward upstream as is */
187     return gst_pad_push_event (sinkpad, event);
188 
189   return FALSE;
190 }
191 
192 static gboolean
gst_stream_combiner_src_query(GstPad * pad,GstObject * parent,GstQuery * query)193 gst_stream_combiner_src_query (GstPad * pad, GstObject * parent,
194     GstQuery * query)
195 {
196   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) parent;
197   GstPad *sinkpad = NULL;
198   gboolean ret = FALSE;
199 
200   switch (GST_QUERY_TYPE (query)) {
201     case GST_QUERY_CAPS:
202       ret = gst_pad_query_default (pad, parent, query);
203       break;
204     default:
205       STREAMS_LOCK (stream_combiner);
206       if (stream_combiner->current)
207         sinkpad = stream_combiner->current;
208       else if (stream_combiner->sinkpads)
209         sinkpad = (GstPad *) stream_combiner->sinkpads->data;
210       STREAMS_UNLOCK (stream_combiner);
211 
212       if (sinkpad)
213         /* Forward upstream as is */
214         ret = gst_pad_peer_query (sinkpad, query);
215       break;
216   }
217   return ret;
218 }
219 
220 static void
gst_stream_combiner_init(GstStreamCombiner * stream_combiner)221 gst_stream_combiner_init (GstStreamCombiner * stream_combiner)
222 {
223   stream_combiner->srcpad =
224       gst_pad_new_from_static_template (&src_template, "src");
225   gst_pad_set_event_function (stream_combiner->srcpad,
226       gst_stream_combiner_src_event);
227   gst_pad_set_query_function (stream_combiner->srcpad,
228       gst_stream_combiner_src_query);
229   gst_element_add_pad (GST_ELEMENT (stream_combiner), stream_combiner->srcpad);
230 
231   g_mutex_init (&stream_combiner->lock);
232 }
233 
234 static GstPad *
gst_stream_combiner_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name,const GstCaps * caps)235 gst_stream_combiner_request_new_pad (GstElement * element,
236     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
237 {
238   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) element;
239   GstStreamCombinerPad *combiner_pad;
240   GstPad *sinkpad;
241   GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
242   GstPadTemplate *template =
243       gst_element_class_get_pad_template (klass, "sink_%u");
244 
245   GST_DEBUG_OBJECT (element, "templ:%p, name:%s", templ, name);
246 
247   combiner_pad = g_object_new (GST_TYPE_STREAM_COMBINER_PAD, "name", name,
248       "template", template, "direction", template->direction, NULL);
249 
250   sinkpad = GST_PAD_CAST (combiner_pad);
251   gst_pad_set_chain_function (sinkpad, gst_stream_combiner_chain);
252   gst_pad_set_event_function (sinkpad, gst_stream_combiner_sink_event);
253   gst_pad_set_query_function (sinkpad, gst_stream_combiner_sink_query);
254 
255   STREAMS_LOCK (stream_combiner);
256   stream_combiner->sinkpads =
257       g_list_append (stream_combiner->sinkpads, sinkpad);
258   gst_pad_set_active (sinkpad, TRUE);
259   gst_element_add_pad (element, sinkpad);
260   stream_combiner->cookie++;
261   STREAMS_UNLOCK (stream_combiner);
262 
263   GST_DEBUG_OBJECT (element, "Returning pad %p", sinkpad);
264 
265   return sinkpad;
266 }
267 
268 static void
gst_stream_combiner_release_pad(GstElement * element,GstPad * pad)269 gst_stream_combiner_release_pad (GstElement * element, GstPad * pad)
270 {
271   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) element;
272   GList *tmp;
273 
274   GST_DEBUG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
275 
276   STREAMS_LOCK (stream_combiner);
277   tmp = g_list_find (stream_combiner->sinkpads, pad);
278   if (tmp) {
279     GstPad *pad = (GstPad *) tmp->data;
280 
281     stream_combiner->sinkpads =
282         g_list_delete_link (stream_combiner->sinkpads, tmp);
283     stream_combiner->cookie++;
284 
285     if (pad == stream_combiner->current) {
286       /* Deactivate current flow */
287       GST_DEBUG_OBJECT (element, "Removed pad was the current one");
288       stream_combiner->current = NULL;
289     }
290     GST_DEBUG_OBJECT (element, "Removing pad from ourself");
291     gst_element_remove_pad (element, pad);
292   }
293   STREAMS_UNLOCK (stream_combiner);
294 
295   return;
296 }
297