1 /* GStreamer Stream Splitter
2 * Copyright (C) 2010 Edward Hervey <edward.hervey@collabora.co.uk>
3 * (C) 2009 Nokia Corporation
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
14 *
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include "gststreamsplitter.h"
26
27 static GstStaticPadTemplate src_template =
28 GST_STATIC_PAD_TEMPLATE ("src_%u", GST_PAD_SRC, GST_PAD_REQUEST,
29 GST_STATIC_CAPS_ANY);
30
31 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
32 GST_PAD_SINK,
33 GST_PAD_ALWAYS,
34 GST_STATIC_CAPS_ANY);
35
36 GST_DEBUG_CATEGORY_STATIC (gst_stream_splitter_debug);
37 #define GST_CAT_DEFAULT gst_stream_splitter_debug
38
39 G_DEFINE_TYPE (GstStreamSplitter, gst_stream_splitter, GST_TYPE_ELEMENT);
40
41 #define STREAMS_LOCK(obj) (g_mutex_lock(&obj->lock))
42 #define STREAMS_UNLOCK(obj) (g_mutex_unlock(&obj->lock))
43
44 static void gst_stream_splitter_dispose (GObject * object);
45 static void gst_stream_splitter_finalize (GObject * object);
46
47 static gboolean gst_stream_splitter_sink_setcaps (GstPad * pad, GstCaps * caps);
48
49 static GstPad *gst_stream_splitter_request_new_pad (GstElement * element,
50 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
51 static void gst_stream_splitter_release_pad (GstElement * element,
52 GstPad * pad);
53
54 static void
gst_stream_splitter_class_init(GstStreamSplitterClass * klass)55 gst_stream_splitter_class_init (GstStreamSplitterClass * klass)
56 {
57 GObjectClass *gobject_klass;
58 GstElementClass *gstelement_klass;
59
60 gobject_klass = (GObjectClass *) klass;
61 gstelement_klass = (GstElementClass *) klass;
62
63 gobject_klass->dispose = gst_stream_splitter_dispose;
64 gobject_klass->finalize = gst_stream_splitter_finalize;
65
66 GST_DEBUG_CATEGORY_INIT (gst_stream_splitter_debug, "streamsplitter", 0,
67 "Stream Splitter");
68
69 gst_element_class_add_static_pad_template (gstelement_klass, &src_template);
70 gst_element_class_add_static_pad_template (gstelement_klass, &sink_template);
71
72 gstelement_klass->request_new_pad =
73 GST_DEBUG_FUNCPTR (gst_stream_splitter_request_new_pad);
74 gstelement_klass->release_pad =
75 GST_DEBUG_FUNCPTR (gst_stream_splitter_release_pad);
76
77 gst_element_class_set_static_metadata (gstelement_klass,
78 "streamsplitter", "Generic",
79 "Splits streams based on their media type",
80 "Edward Hervey <edward.hervey@collabora.co.uk>");
81 }
82
83 static void
gst_stream_splitter_dispose(GObject * object)84 gst_stream_splitter_dispose (GObject * object)
85 {
86 GstStreamSplitter *stream_splitter = (GstStreamSplitter *) object;
87
88 g_list_foreach (stream_splitter->pending_events, (GFunc) gst_event_unref,
89 NULL);
90 g_list_free (stream_splitter->pending_events);
91 stream_splitter->pending_events = NULL;
92
93 G_OBJECT_CLASS (gst_stream_splitter_parent_class)->dispose (object);
94 }
95
96 static void
gst_stream_splitter_finalize(GObject * object)97 gst_stream_splitter_finalize (GObject * object)
98 {
99 GstStreamSplitter *stream_splitter = (GstStreamSplitter *) object;
100
101 g_mutex_clear (&stream_splitter->lock);
102
103 G_OBJECT_CLASS (gst_stream_splitter_parent_class)->finalize (object);
104 }
105
106 static void
gst_stream_splitter_push_pending_events(GstStreamSplitter * splitter,GstPad * srcpad)107 gst_stream_splitter_push_pending_events (GstStreamSplitter * splitter,
108 GstPad * srcpad)
109 {
110 GList *tmp;
111 GST_DEBUG_OBJECT (srcpad, "Pushing out pending events");
112
113 for (tmp = splitter->pending_events; tmp; tmp = tmp->next) {
114 GstEvent *event = (GstEvent *) tmp->data;
115 gst_pad_push_event (srcpad, event);
116 }
117 g_list_free (splitter->pending_events);
118 splitter->pending_events = NULL;
119 }
120
121 static GstFlowReturn
gst_stream_splitter_chain(GstPad * pad,GstObject * parent,GstBuffer * buf)122 gst_stream_splitter_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
123 {
124 GstStreamSplitter *stream_splitter = (GstStreamSplitter *) parent;
125 GstFlowReturn res;
126 GstPad *srcpad = NULL;
127
128 STREAMS_LOCK (stream_splitter);
129 if (stream_splitter->current)
130 srcpad = gst_object_ref (stream_splitter->current);
131 STREAMS_UNLOCK (stream_splitter);
132
133 if (G_UNLIKELY (srcpad == NULL))
134 goto nopad;
135
136 if (G_UNLIKELY (stream_splitter->pending_events))
137 gst_stream_splitter_push_pending_events (stream_splitter, srcpad);
138
139 /* Forward to currently activated stream */
140 res = gst_pad_push (srcpad, buf);
141 gst_object_unref (srcpad);
142
143 return res;
144
145 nopad:
146 GST_WARNING_OBJECT (stream_splitter, "No output pad was configured");
147 return GST_FLOW_ERROR;
148 }
149
150 static GList *
_flush_events(GstPad * pad,GList * events)151 _flush_events (GstPad * pad, GList * events)
152 {
153 GList *tmp;
154
155 for (tmp = events; tmp; tmp = tmp->next) {
156 if (GST_EVENT_TYPE (tmp->data) != GST_EVENT_EOS &&
157 GST_EVENT_TYPE (tmp->data) != GST_EVENT_SEGMENT &&
158 GST_EVENT_IS_STICKY (tmp->data) && pad != NULL) {
159 gst_pad_store_sticky_event (pad, GST_EVENT_CAST (tmp->data));
160 }
161 gst_event_unref (tmp->data);
162 }
163 g_list_free (events);
164
165 return NULL;
166 }
167
168 static gboolean
gst_stream_splitter_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)169 gst_stream_splitter_sink_event (GstPad * pad, GstObject * parent,
170 GstEvent * event)
171 {
172 GstStreamSplitter *stream_splitter = (GstStreamSplitter *) parent;
173 gboolean res = TRUE;
174 gboolean toall = FALSE;
175 gboolean store = FALSE;
176 /* FLUSH_START/STOP : forward to all
177 * INBAND events : store to send in chain function to selected chain
178 * OUT_OF_BAND events : send to all
179 */
180
181 GST_DEBUG_OBJECT (stream_splitter, "Got event %s",
182 GST_EVENT_TYPE_NAME (event));
183
184 switch (GST_EVENT_TYPE (event)) {
185 case GST_EVENT_CAPS:
186 {
187 GstCaps *caps;
188
189 gst_event_parse_caps (event, &caps);
190 res = gst_stream_splitter_sink_setcaps (pad, caps);
191
192 store = TRUE;
193 break;
194 }
195 case GST_EVENT_FLUSH_STOP:
196 toall = TRUE;
197 STREAMS_LOCK (stream_splitter);
198 stream_splitter->pending_events = _flush_events (stream_splitter->current,
199 stream_splitter->pending_events);
200 STREAMS_UNLOCK (stream_splitter);
201 break;
202 case GST_EVENT_FLUSH_START:
203 toall = TRUE;
204 break;
205 case GST_EVENT_EOS:
206
207 if (G_UNLIKELY (stream_splitter->pending_events)) {
208 GstPad *srcpad = NULL;
209
210 STREAMS_LOCK (stream_splitter);
211 if (stream_splitter->current)
212 srcpad = gst_object_ref (stream_splitter->current);
213 STREAMS_UNLOCK (stream_splitter);
214
215 if (srcpad) {
216 gst_stream_splitter_push_pending_events (stream_splitter, srcpad);
217 gst_object_unref (srcpad);
218 }
219 }
220
221 toall = TRUE;
222 break;
223 default:
224 if (GST_EVENT_TYPE (event) & GST_EVENT_TYPE_SERIALIZED)
225 store = TRUE;
226 }
227
228 if (store) {
229 stream_splitter->pending_events =
230 g_list_append (stream_splitter->pending_events, event);
231 } else if (toall) {
232 GList *tmp;
233 guint32 cookie;
234
235 /* Send to all pads */
236 STREAMS_LOCK (stream_splitter);
237 resync:
238 if (G_UNLIKELY (stream_splitter->srcpads == NULL)) {
239 STREAMS_UNLOCK (stream_splitter);
240 /* No source pads */
241 gst_event_unref (event);
242 res = FALSE;
243 goto beach;
244 }
245 tmp = stream_splitter->srcpads;
246 cookie = stream_splitter->cookie;
247 while (tmp) {
248 GstPad *srcpad = (GstPad *) tmp->data;
249 STREAMS_UNLOCK (stream_splitter);
250 gst_event_ref (event);
251 res = gst_pad_push_event (srcpad, event);
252 STREAMS_LOCK (stream_splitter);
253 if (G_UNLIKELY (cookie != stream_splitter->cookie))
254 goto resync;
255 tmp = tmp->next;
256 }
257 STREAMS_UNLOCK (stream_splitter);
258 gst_event_unref (event);
259 } else {
260 GstPad *pad;
261
262 /* Only send to current pad */
263
264 STREAMS_LOCK (stream_splitter);
265 pad = stream_splitter->current;
266 STREAMS_UNLOCK (stream_splitter);
267 if (pad)
268 res = gst_pad_push_event (pad, event);
269 else {
270 gst_event_unref (event);
271 res = FALSE;
272 }
273 }
274
275 beach:
276 return res;
277 }
278
279 static GstCaps *
gst_stream_splitter_sink_getcaps(GstPad * pad,GstCaps * filter)280 gst_stream_splitter_sink_getcaps (GstPad * pad, GstCaps * filter)
281 {
282 GstStreamSplitter *stream_splitter =
283 (GstStreamSplitter *) GST_PAD_PARENT (pad);
284 guint32 cookie;
285 GList *tmp;
286 GstCaps *res = NULL;
287
288 /* Return the combination of all downstream caps */
289
290 STREAMS_LOCK (stream_splitter);
291
292 resync:
293 if (G_UNLIKELY (stream_splitter->srcpads == NULL)) {
294 res = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
295 goto beach;
296 }
297
298 res = NULL;
299 cookie = stream_splitter->cookie;
300 tmp = stream_splitter->srcpads;
301
302 while (tmp) {
303 GstPad *srcpad = (GstPad *) tmp->data;
304
305 /* Ensure srcpad doesn't get destroyed while we query peer */
306 gst_object_ref (srcpad);
307 STREAMS_UNLOCK (stream_splitter);
308 if (res) {
309 GstCaps *peercaps = gst_pad_peer_query_caps (srcpad, filter);
310 if (peercaps)
311 res = gst_caps_merge (res, peercaps);
312 } else {
313 res = gst_pad_peer_query_caps (srcpad, filter);
314 }
315 STREAMS_LOCK (stream_splitter);
316 gst_object_unref (srcpad);
317
318 if (G_UNLIKELY (cookie != stream_splitter->cookie)) {
319 if (res)
320 gst_caps_unref (res);
321 goto resync;
322 }
323 tmp = tmp->next;
324 }
325
326 beach:
327 STREAMS_UNLOCK (stream_splitter);
328 return res;
329 }
330
331 static gboolean
gst_stream_splitter_sink_acceptcaps(GstPad * pad,GstCaps * caps)332 gst_stream_splitter_sink_acceptcaps (GstPad * pad, GstCaps * caps)
333 {
334 GstStreamSplitter *stream_splitter =
335 (GstStreamSplitter *) GST_PAD_PARENT (pad);
336 guint32 cookie;
337 GList *tmp;
338 gboolean res = FALSE;
339
340 /* check if one of the downstream elements accepts the caps */
341 STREAMS_LOCK (stream_splitter);
342
343 resync:
344 res = FALSE;
345
346 if (G_UNLIKELY (stream_splitter->srcpads == NULL))
347 goto beach;
348
349 cookie = stream_splitter->cookie;
350 tmp = stream_splitter->srcpads;
351
352 while (tmp) {
353 GstPad *srcpad = (GstPad *) tmp->data;
354
355 /* Ensure srcpad doesn't get destroyed while we query peer */
356 gst_object_ref (srcpad);
357 STREAMS_UNLOCK (stream_splitter);
358
359 res = gst_pad_peer_query_accept_caps (srcpad, caps);
360
361 STREAMS_LOCK (stream_splitter);
362 gst_object_unref (srcpad);
363
364 if (G_UNLIKELY (cookie != stream_splitter->cookie))
365 goto resync;
366
367 if (res)
368 break;
369
370 tmp = tmp->next;
371 }
372
373 beach:
374 STREAMS_UNLOCK (stream_splitter);
375 return res;
376 }
377
378 static gboolean
gst_stream_splitter_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)379 gst_stream_splitter_sink_query (GstPad * pad, GstObject * parent,
380 GstQuery * query)
381 {
382 gboolean res;
383
384 switch (GST_QUERY_TYPE (query)) {
385 case GST_QUERY_CAPS:
386 {
387 GstCaps *filter, *caps;
388
389 gst_query_parse_caps (query, &filter);
390 caps = gst_stream_splitter_sink_getcaps (pad, filter);
391 gst_query_set_caps_result (query, caps);
392 gst_caps_unref (caps);
393 res = TRUE;
394 break;
395 }
396 case GST_QUERY_ACCEPT_CAPS:
397 {
398 GstCaps *caps;
399 gboolean result;
400
401 gst_query_parse_accept_caps (query, &caps);
402 result = gst_stream_splitter_sink_acceptcaps (pad, caps);
403 gst_query_set_accept_caps_result (query, result);
404 res = TRUE;
405 break;
406 }
407 default:
408 res = gst_pad_query_default (pad, parent, query);
409 break;
410 }
411 return res;
412 }
413
414 static gboolean
gst_stream_splitter_sink_setcaps(GstPad * pad,GstCaps * caps)415 gst_stream_splitter_sink_setcaps (GstPad * pad, GstCaps * caps)
416 {
417 GstStreamSplitter *stream_splitter =
418 (GstStreamSplitter *) GST_PAD_PARENT (pad);
419 guint32 cookie;
420 GList *tmp;
421 gboolean res;
422
423 GST_DEBUG_OBJECT (stream_splitter, "caps %" GST_PTR_FORMAT, caps);
424
425 /* Try on all pads, choose the one that succeeds as the current stream */
426 STREAMS_LOCK (stream_splitter);
427
428 resync:
429 if (G_UNLIKELY (stream_splitter->srcpads == NULL)) {
430 res = FALSE;
431 goto beach;
432 }
433
434 res = FALSE;
435 tmp = stream_splitter->srcpads;
436 cookie = stream_splitter->cookie;
437
438 while (tmp) {
439 GstPad *srcpad = (GstPad *) tmp->data;
440 GstCaps *peercaps;
441
442 STREAMS_UNLOCK (stream_splitter);
443 peercaps = gst_pad_peer_query_caps (srcpad, NULL);
444 if (peercaps) {
445 res = gst_caps_can_intersect (caps, peercaps);
446 gst_caps_unref (peercaps);
447 }
448 STREAMS_LOCK (stream_splitter);
449
450 if (G_UNLIKELY (cookie != stream_splitter->cookie))
451 goto resync;
452
453 if (res) {
454 /* FIXME : we need to switch properly */
455 GST_DEBUG_OBJECT (srcpad, "Setting caps on this pad was successful");
456 stream_splitter->current = srcpad;
457 goto beach;
458 }
459 tmp = tmp->next;
460 }
461
462 beach:
463 STREAMS_UNLOCK (stream_splitter);
464 return res;
465 }
466
467 static void
gst_stream_splitter_init(GstStreamSplitter * stream_splitter)468 gst_stream_splitter_init (GstStreamSplitter * stream_splitter)
469 {
470 stream_splitter->sinkpad =
471 gst_pad_new_from_static_template (&sink_template, "sink");
472 gst_pad_set_chain_function (stream_splitter->sinkpad,
473 gst_stream_splitter_chain);
474 gst_pad_set_event_function (stream_splitter->sinkpad,
475 gst_stream_splitter_sink_event);
476 gst_pad_set_query_function (stream_splitter->sinkpad,
477 gst_stream_splitter_sink_query);
478 gst_element_add_pad (GST_ELEMENT (stream_splitter), stream_splitter->sinkpad);
479
480 g_mutex_init (&stream_splitter->lock);
481 }
482
483 static GstPad *
gst_stream_splitter_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name,const GstCaps * caps)484 gst_stream_splitter_request_new_pad (GstElement * element,
485 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
486 {
487 GstStreamSplitter *stream_splitter = (GstStreamSplitter *) element;
488 GstPad *srcpad;
489
490 srcpad = gst_pad_new_from_static_template (&src_template, name);
491
492 STREAMS_LOCK (stream_splitter);
493 stream_splitter->srcpads = g_list_append (stream_splitter->srcpads, srcpad);
494 gst_pad_set_active (srcpad, TRUE);
495 gst_element_add_pad (element, srcpad);
496 stream_splitter->cookie++;
497 STREAMS_UNLOCK (stream_splitter);
498
499 return srcpad;
500 }
501
502 static void
gst_stream_splitter_release_pad(GstElement * element,GstPad * pad)503 gst_stream_splitter_release_pad (GstElement * element, GstPad * pad)
504 {
505 GstStreamSplitter *stream_splitter = (GstStreamSplitter *) element;
506 GList *tmp;
507
508 STREAMS_LOCK (stream_splitter);
509 tmp = g_list_find (stream_splitter->srcpads, pad);
510 if (tmp) {
511 GstPad *pad = (GstPad *) tmp->data;
512
513 stream_splitter->srcpads =
514 g_list_delete_link (stream_splitter->srcpads, tmp);
515 stream_splitter->cookie++;
516
517 if (pad == stream_splitter->current) {
518 /* Deactivate current flow */
519 GST_DEBUG_OBJECT (element, "Removed pad was the current one");
520 stream_splitter->current = NULL;
521 }
522
523 gst_element_remove_pad (element, pad);
524 }
525 STREAMS_UNLOCK (stream_splitter);
526
527 return;
528 }
529