1 /* GStreamer Stream Splitter
2  * Copyright (C) 2010 Edward Hervey <edward.hervey@collabora.co.uk>
3  *           (C) 2009 Nokia Corporation
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include "gststreamsplitter.h"
26 
27 static GstStaticPadTemplate src_template =
28 GST_STATIC_PAD_TEMPLATE ("src_%u", GST_PAD_SRC, GST_PAD_REQUEST,
29     GST_STATIC_CAPS_ANY);
30 
31 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
32     GST_PAD_SINK,
33     GST_PAD_ALWAYS,
34     GST_STATIC_CAPS_ANY);
35 
36 GST_DEBUG_CATEGORY_STATIC (gst_stream_splitter_debug);
37 #define GST_CAT_DEFAULT gst_stream_splitter_debug
38 
39 G_DEFINE_TYPE (GstStreamSplitter, gst_stream_splitter, GST_TYPE_ELEMENT);
40 
41 #define STREAMS_LOCK(obj) (g_mutex_lock(&obj->lock))
42 #define STREAMS_UNLOCK(obj) (g_mutex_unlock(&obj->lock))
43 
44 static void gst_stream_splitter_dispose (GObject * object);
45 static void gst_stream_splitter_finalize (GObject * object);
46 
47 static gboolean gst_stream_splitter_sink_setcaps (GstPad * pad, GstCaps * caps);
48 
49 static GstPad *gst_stream_splitter_request_new_pad (GstElement * element,
50     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
51 static void gst_stream_splitter_release_pad (GstElement * element,
52     GstPad * pad);
53 
54 static void
gst_stream_splitter_class_init(GstStreamSplitterClass * klass)55 gst_stream_splitter_class_init (GstStreamSplitterClass * klass)
56 {
57   GObjectClass *gobject_klass;
58   GstElementClass *gstelement_klass;
59 
60   gobject_klass = (GObjectClass *) klass;
61   gstelement_klass = (GstElementClass *) klass;
62 
63   gobject_klass->dispose = gst_stream_splitter_dispose;
64   gobject_klass->finalize = gst_stream_splitter_finalize;
65 
66   GST_DEBUG_CATEGORY_INIT (gst_stream_splitter_debug, "streamsplitter", 0,
67       "Stream Splitter");
68 
69   gst_element_class_add_static_pad_template (gstelement_klass, &src_template);
70   gst_element_class_add_static_pad_template (gstelement_klass, &sink_template);
71 
72   gstelement_klass->request_new_pad =
73       GST_DEBUG_FUNCPTR (gst_stream_splitter_request_new_pad);
74   gstelement_klass->release_pad =
75       GST_DEBUG_FUNCPTR (gst_stream_splitter_release_pad);
76 
77   gst_element_class_set_static_metadata (gstelement_klass,
78       "streamsplitter", "Generic",
79       "Splits streams based on their media type",
80       "Edward Hervey <edward.hervey@collabora.co.uk>");
81 }
82 
83 static void
gst_stream_splitter_dispose(GObject * object)84 gst_stream_splitter_dispose (GObject * object)
85 {
86   GstStreamSplitter *stream_splitter = (GstStreamSplitter *) object;
87 
88   g_list_foreach (stream_splitter->pending_events, (GFunc) gst_event_unref,
89       NULL);
90   g_list_free (stream_splitter->pending_events);
91   stream_splitter->pending_events = NULL;
92 
93   G_OBJECT_CLASS (gst_stream_splitter_parent_class)->dispose (object);
94 }
95 
96 static void
gst_stream_splitter_finalize(GObject * object)97 gst_stream_splitter_finalize (GObject * object)
98 {
99   GstStreamSplitter *stream_splitter = (GstStreamSplitter *) object;
100 
101   g_mutex_clear (&stream_splitter->lock);
102 
103   G_OBJECT_CLASS (gst_stream_splitter_parent_class)->finalize (object);
104 }
105 
106 static void
gst_stream_splitter_push_pending_events(GstStreamSplitter * splitter,GstPad * srcpad)107 gst_stream_splitter_push_pending_events (GstStreamSplitter * splitter,
108     GstPad * srcpad)
109 {
110   GList *tmp;
111   GST_DEBUG_OBJECT (srcpad, "Pushing out pending events");
112 
113   for (tmp = splitter->pending_events; tmp; tmp = tmp->next) {
114     GstEvent *event = (GstEvent *) tmp->data;
115     gst_pad_push_event (srcpad, event);
116   }
117   g_list_free (splitter->pending_events);
118   splitter->pending_events = NULL;
119 }
120 
121 static GstFlowReturn
gst_stream_splitter_chain(GstPad * pad,GstObject * parent,GstBuffer * buf)122 gst_stream_splitter_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
123 {
124   GstStreamSplitter *stream_splitter = (GstStreamSplitter *) parent;
125   GstFlowReturn res;
126   GstPad *srcpad = NULL;
127 
128   STREAMS_LOCK (stream_splitter);
129   if (stream_splitter->current)
130     srcpad = gst_object_ref (stream_splitter->current);
131   STREAMS_UNLOCK (stream_splitter);
132 
133   if (G_UNLIKELY (srcpad == NULL))
134     goto nopad;
135 
136   if (G_UNLIKELY (stream_splitter->pending_events))
137     gst_stream_splitter_push_pending_events (stream_splitter, srcpad);
138 
139   /* Forward to currently activated stream */
140   res = gst_pad_push (srcpad, buf);
141   gst_object_unref (srcpad);
142 
143   return res;
144 
145 nopad:
146   GST_WARNING_OBJECT (stream_splitter, "No output pad was configured");
147   return GST_FLOW_ERROR;
148 }
149 
150 static GList *
_flush_events(GstPad * pad,GList * events)151 _flush_events (GstPad * pad, GList * events)
152 {
153   GList *tmp;
154 
155   for (tmp = events; tmp; tmp = tmp->next) {
156     if (GST_EVENT_TYPE (tmp->data) != GST_EVENT_EOS &&
157         GST_EVENT_TYPE (tmp->data) != GST_EVENT_SEGMENT &&
158         GST_EVENT_IS_STICKY (tmp->data) && pad != NULL) {
159       gst_pad_store_sticky_event (pad, GST_EVENT_CAST (tmp->data));
160     }
161     gst_event_unref (tmp->data);
162   }
163   g_list_free (events);
164 
165   return NULL;
166 }
167 
168 static gboolean
gst_stream_splitter_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)169 gst_stream_splitter_sink_event (GstPad * pad, GstObject * parent,
170     GstEvent * event)
171 {
172   GstStreamSplitter *stream_splitter = (GstStreamSplitter *) parent;
173   gboolean res = TRUE;
174   gboolean toall = FALSE;
175   gboolean store = FALSE;
176   /* FLUSH_START/STOP : forward to all
177    * INBAND events : store to send in chain function to selected chain
178    * OUT_OF_BAND events : send to all
179    */
180 
181   GST_DEBUG_OBJECT (stream_splitter, "Got event %s",
182       GST_EVENT_TYPE_NAME (event));
183 
184   switch (GST_EVENT_TYPE (event)) {
185     case GST_EVENT_CAPS:
186     {
187       GstCaps *caps;
188 
189       gst_event_parse_caps (event, &caps);
190       res = gst_stream_splitter_sink_setcaps (pad, caps);
191 
192       store = TRUE;
193       break;
194     }
195     case GST_EVENT_FLUSH_STOP:
196       toall = TRUE;
197       STREAMS_LOCK (stream_splitter);
198       stream_splitter->pending_events = _flush_events (stream_splitter->current,
199           stream_splitter->pending_events);
200       STREAMS_UNLOCK (stream_splitter);
201       break;
202     case GST_EVENT_FLUSH_START:
203       toall = TRUE;
204       break;
205     case GST_EVENT_EOS:
206 
207       if (G_UNLIKELY (stream_splitter->pending_events)) {
208         GstPad *srcpad = NULL;
209 
210         STREAMS_LOCK (stream_splitter);
211         if (stream_splitter->current)
212           srcpad = gst_object_ref (stream_splitter->current);
213         STREAMS_UNLOCK (stream_splitter);
214 
215         if (srcpad) {
216           gst_stream_splitter_push_pending_events (stream_splitter, srcpad);
217           gst_object_unref (srcpad);
218         }
219       }
220 
221       toall = TRUE;
222       break;
223     default:
224       if (GST_EVENT_TYPE (event) & GST_EVENT_TYPE_SERIALIZED)
225         store = TRUE;
226   }
227 
228   if (store) {
229     stream_splitter->pending_events =
230         g_list_append (stream_splitter->pending_events, event);
231   } else if (toall) {
232     GList *tmp;
233     guint32 cookie;
234 
235     /* Send to all pads */
236     STREAMS_LOCK (stream_splitter);
237   resync:
238     if (G_UNLIKELY (stream_splitter->srcpads == NULL)) {
239       STREAMS_UNLOCK (stream_splitter);
240       /* No source pads */
241       gst_event_unref (event);
242       res = FALSE;
243       goto beach;
244     }
245     tmp = stream_splitter->srcpads;
246     cookie = stream_splitter->cookie;
247     while (tmp) {
248       GstPad *srcpad = (GstPad *) tmp->data;
249       STREAMS_UNLOCK (stream_splitter);
250       gst_event_ref (event);
251       res = gst_pad_push_event (srcpad, event);
252       STREAMS_LOCK (stream_splitter);
253       if (G_UNLIKELY (cookie != stream_splitter->cookie))
254         goto resync;
255       tmp = tmp->next;
256     }
257     STREAMS_UNLOCK (stream_splitter);
258     gst_event_unref (event);
259   } else {
260     GstPad *pad;
261 
262     /* Only send to current pad */
263 
264     STREAMS_LOCK (stream_splitter);
265     pad = stream_splitter->current;
266     STREAMS_UNLOCK (stream_splitter);
267     if (pad)
268       res = gst_pad_push_event (pad, event);
269     else {
270       gst_event_unref (event);
271       res = FALSE;
272     }
273   }
274 
275 beach:
276   return res;
277 }
278 
279 static GstCaps *
gst_stream_splitter_sink_getcaps(GstPad * pad,GstCaps * filter)280 gst_stream_splitter_sink_getcaps (GstPad * pad, GstCaps * filter)
281 {
282   GstStreamSplitter *stream_splitter =
283       (GstStreamSplitter *) GST_PAD_PARENT (pad);
284   guint32 cookie;
285   GList *tmp;
286   GstCaps *res = NULL;
287 
288   /* Return the combination of all downstream caps */
289 
290   STREAMS_LOCK (stream_splitter);
291 
292 resync:
293   if (G_UNLIKELY (stream_splitter->srcpads == NULL)) {
294     res = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
295     goto beach;
296   }
297 
298   res = NULL;
299   cookie = stream_splitter->cookie;
300   tmp = stream_splitter->srcpads;
301 
302   while (tmp) {
303     GstPad *srcpad = (GstPad *) tmp->data;
304 
305     /* Ensure srcpad doesn't get destroyed while we query peer */
306     gst_object_ref (srcpad);
307     STREAMS_UNLOCK (stream_splitter);
308     if (res) {
309       GstCaps *peercaps = gst_pad_peer_query_caps (srcpad, filter);
310       if (peercaps)
311         res = gst_caps_merge (res, peercaps);
312     } else {
313       res = gst_pad_peer_query_caps (srcpad, filter);
314     }
315     STREAMS_LOCK (stream_splitter);
316     gst_object_unref (srcpad);
317 
318     if (G_UNLIKELY (cookie != stream_splitter->cookie)) {
319       if (res)
320         gst_caps_unref (res);
321       goto resync;
322     }
323     tmp = tmp->next;
324   }
325 
326 beach:
327   STREAMS_UNLOCK (stream_splitter);
328   return res;
329 }
330 
331 static gboolean
gst_stream_splitter_sink_acceptcaps(GstPad * pad,GstCaps * caps)332 gst_stream_splitter_sink_acceptcaps (GstPad * pad, GstCaps * caps)
333 {
334   GstStreamSplitter *stream_splitter =
335       (GstStreamSplitter *) GST_PAD_PARENT (pad);
336   guint32 cookie;
337   GList *tmp;
338   gboolean res = FALSE;
339 
340   /* check if one of the downstream elements accepts the caps */
341   STREAMS_LOCK (stream_splitter);
342 
343 resync:
344   res = FALSE;
345 
346   if (G_UNLIKELY (stream_splitter->srcpads == NULL))
347     goto beach;
348 
349   cookie = stream_splitter->cookie;
350   tmp = stream_splitter->srcpads;
351 
352   while (tmp) {
353     GstPad *srcpad = (GstPad *) tmp->data;
354 
355     /* Ensure srcpad doesn't get destroyed while we query peer */
356     gst_object_ref (srcpad);
357     STREAMS_UNLOCK (stream_splitter);
358 
359     res = gst_pad_peer_query_accept_caps (srcpad, caps);
360 
361     STREAMS_LOCK (stream_splitter);
362     gst_object_unref (srcpad);
363 
364     if (G_UNLIKELY (cookie != stream_splitter->cookie))
365       goto resync;
366 
367     if (res)
368       break;
369 
370     tmp = tmp->next;
371   }
372 
373 beach:
374   STREAMS_UNLOCK (stream_splitter);
375   return res;
376 }
377 
378 static gboolean
gst_stream_splitter_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)379 gst_stream_splitter_sink_query (GstPad * pad, GstObject * parent,
380     GstQuery * query)
381 {
382   gboolean res;
383 
384   switch (GST_QUERY_TYPE (query)) {
385     case GST_QUERY_CAPS:
386     {
387       GstCaps *filter, *caps;
388 
389       gst_query_parse_caps (query, &filter);
390       caps = gst_stream_splitter_sink_getcaps (pad, filter);
391       gst_query_set_caps_result (query, caps);
392       gst_caps_unref (caps);
393       res = TRUE;
394       break;
395     }
396     case GST_QUERY_ACCEPT_CAPS:
397     {
398       GstCaps *caps;
399       gboolean result;
400 
401       gst_query_parse_accept_caps (query, &caps);
402       result = gst_stream_splitter_sink_acceptcaps (pad, caps);
403       gst_query_set_accept_caps_result (query, result);
404       res = TRUE;
405       break;
406     }
407     default:
408       res = gst_pad_query_default (pad, parent, query);
409       break;
410   }
411   return res;
412 }
413 
414 static gboolean
gst_stream_splitter_sink_setcaps(GstPad * pad,GstCaps * caps)415 gst_stream_splitter_sink_setcaps (GstPad * pad, GstCaps * caps)
416 {
417   GstStreamSplitter *stream_splitter =
418       (GstStreamSplitter *) GST_PAD_PARENT (pad);
419   guint32 cookie;
420   GList *tmp;
421   gboolean res;
422 
423   GST_DEBUG_OBJECT (stream_splitter, "caps %" GST_PTR_FORMAT, caps);
424 
425   /* Try on all pads, choose the one that succeeds as the current stream */
426   STREAMS_LOCK (stream_splitter);
427 
428 resync:
429   if (G_UNLIKELY (stream_splitter->srcpads == NULL)) {
430     res = FALSE;
431     goto beach;
432   }
433 
434   res = FALSE;
435   tmp = stream_splitter->srcpads;
436   cookie = stream_splitter->cookie;
437 
438   while (tmp) {
439     GstPad *srcpad = (GstPad *) tmp->data;
440     GstCaps *peercaps;
441 
442     STREAMS_UNLOCK (stream_splitter);
443     peercaps = gst_pad_peer_query_caps (srcpad, NULL);
444     if (peercaps) {
445       res = gst_caps_can_intersect (caps, peercaps);
446       gst_caps_unref (peercaps);
447     }
448     STREAMS_LOCK (stream_splitter);
449 
450     if (G_UNLIKELY (cookie != stream_splitter->cookie))
451       goto resync;
452 
453     if (res) {
454       /* FIXME : we need to switch properly */
455       GST_DEBUG_OBJECT (srcpad, "Setting caps on this pad was successful");
456       stream_splitter->current = srcpad;
457       goto beach;
458     }
459     tmp = tmp->next;
460   }
461 
462 beach:
463   STREAMS_UNLOCK (stream_splitter);
464   return res;
465 }
466 
467 static void
gst_stream_splitter_init(GstStreamSplitter * stream_splitter)468 gst_stream_splitter_init (GstStreamSplitter * stream_splitter)
469 {
470   stream_splitter->sinkpad =
471       gst_pad_new_from_static_template (&sink_template, "sink");
472   gst_pad_set_chain_function (stream_splitter->sinkpad,
473       gst_stream_splitter_chain);
474   gst_pad_set_event_function (stream_splitter->sinkpad,
475       gst_stream_splitter_sink_event);
476   gst_pad_set_query_function (stream_splitter->sinkpad,
477       gst_stream_splitter_sink_query);
478   gst_element_add_pad (GST_ELEMENT (stream_splitter), stream_splitter->sinkpad);
479 
480   g_mutex_init (&stream_splitter->lock);
481 }
482 
483 static GstPad *
gst_stream_splitter_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name,const GstCaps * caps)484 gst_stream_splitter_request_new_pad (GstElement * element,
485     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
486 {
487   GstStreamSplitter *stream_splitter = (GstStreamSplitter *) element;
488   GstPad *srcpad;
489 
490   srcpad = gst_pad_new_from_static_template (&src_template, name);
491 
492   STREAMS_LOCK (stream_splitter);
493   stream_splitter->srcpads = g_list_append (stream_splitter->srcpads, srcpad);
494   gst_pad_set_active (srcpad, TRUE);
495   gst_element_add_pad (element, srcpad);
496   stream_splitter->cookie++;
497   STREAMS_UNLOCK (stream_splitter);
498 
499   return srcpad;
500 }
501 
502 static void
gst_stream_splitter_release_pad(GstElement * element,GstPad * pad)503 gst_stream_splitter_release_pad (GstElement * element, GstPad * pad)
504 {
505   GstStreamSplitter *stream_splitter = (GstStreamSplitter *) element;
506   GList *tmp;
507 
508   STREAMS_LOCK (stream_splitter);
509   tmp = g_list_find (stream_splitter->srcpads, pad);
510   if (tmp) {
511     GstPad *pad = (GstPad *) tmp->data;
512 
513     stream_splitter->srcpads =
514         g_list_delete_link (stream_splitter->srcpads, tmp);
515     stream_splitter->cookie++;
516 
517     if (pad == stream_splitter->current) {
518       /* Deactivate current flow */
519       GST_DEBUG_OBJECT (element, "Removed pad was the current one");
520       stream_splitter->current = NULL;
521     }
522 
523     gst_element_remove_pad (element, pad);
524   }
525   STREAMS_UNLOCK (stream_splitter);
526 
527   return;
528 }
529