1 /* GStreamer
2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000,2001,2002,2003,2004,2005 Wim Taymans <wim@fluendo.com>
4 * 2007 Wim Taymans <wim.taymans@gmail.com>
5 *
6 * gsttee.c: Tee element, one in N out
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
17 *
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
22 */
23
24 /**
25 * SECTION:element-tee
26 * @title: tee
27 * @see_also: #GstIdentity
28 *
29 * Split data to multiple pads. Branching the data flow is useful when e.g.
30 * capturing a video where the video is shown on the screen and also encoded and
31 * written to a file. Another example is playing music and hooking up a
32 * visualisation module.
33 *
34 * One needs to use separate queue elements (or a multiqueue) in each branch to
35 * provide separate threads for each branch. Otherwise a blocked dataflow in one
36 * branch would stall the other branches.
37 *
38 * ## Example launch line
39 * |[
40 * gst-launch-1.0 filesrc location=song.ogg ! decodebin ! tee name=t ! queue ! audioconvert ! audioresample ! autoaudiosink t. ! queue ! audioconvert ! goom ! videoconvert ! autovideosink
41 * ]|
42 *
43 * Play song.ogg audio file which must be in the current working directory
44 * and render visualisations using the goom element (this can be easier done
45 * using the playbin element, this is just an example pipeline).
46 */
47
48 #ifdef HAVE_CONFIG_H
49 # include "config.h"
50 #endif
51
52 #include "gsttee.h"
53 #include "gst/glib-compat-private.h"
54
55 #include <string.h>
56 #include <stdio.h>
57
58 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
59 GST_PAD_SINK,
60 GST_PAD_ALWAYS,
61 GST_STATIC_CAPS_ANY);
62
63 GST_DEBUG_CATEGORY_STATIC (gst_tee_debug);
64 #define GST_CAT_DEFAULT gst_tee_debug
65
66 #define GST_TYPE_TEE_PULL_MODE (gst_tee_pull_mode_get_type())
67 static GType
gst_tee_pull_mode_get_type(void)68 gst_tee_pull_mode_get_type (void)
69 {
70 static GType type = 0;
71 static const GEnumValue data[] = {
72 {GST_TEE_PULL_MODE_NEVER, "Never activate in pull mode", "never"},
73 {GST_TEE_PULL_MODE_SINGLE, "Only one src pad can be active in pull mode",
74 "single"},
75 {0, NULL, NULL},
76 };
77
78 if (!type) {
79 type = g_enum_register_static ("GstTeePullMode", data);
80 }
81 return type;
82 }
83
84 #define DEFAULT_PROP_NUM_SRC_PADS 0
85 #define DEFAULT_PROP_HAS_CHAIN TRUE
86 #define DEFAULT_PROP_SILENT TRUE
87 #define DEFAULT_PROP_LAST_MESSAGE NULL
88 #define DEFAULT_PULL_MODE GST_TEE_PULL_MODE_NEVER
89 #define DEFAULT_PROP_ALLOW_NOT_LINKED FALSE
90
91 enum
92 {
93 PROP_0,
94 PROP_NUM_SRC_PADS,
95 PROP_HAS_CHAIN,
96 PROP_SILENT,
97 PROP_LAST_MESSAGE,
98 PROP_PULL_MODE,
99 PROP_ALLOC_PAD,
100 PROP_ALLOW_NOT_LINKED,
101 };
102
103 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
104 GST_PAD_SRC,
105 GST_PAD_REQUEST,
106 GST_STATIC_CAPS_ANY);
107
108 #define _do_init \
109 GST_DEBUG_CATEGORY_INIT (gst_tee_debug, "tee", 0, "tee element");
110 #define gst_tee_parent_class parent_class
111 G_DEFINE_TYPE_WITH_CODE (GstTee, gst_tee, GST_TYPE_ELEMENT, _do_init);
112
113 static GParamSpec *pspec_last_message = NULL;
114 static GParamSpec *pspec_alloc_pad = NULL;
115
116 GType gst_tee_pad_get_type (void);
117
118 #define GST_TYPE_TEE_PAD \
119 (gst_tee_pad_get_type())
120 #define GST_TEE_PAD(obj) \
121 (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_TEE_PAD, GstTeePad))
122 #define GST_TEE_PAD_CLASS(klass) \
123 (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_TEE_PAD, GstTeePadClass))
124 #define GST_IS_TEE_PAD(obj) \
125 (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_TEE_PAD))
126 #define GST_IS_TEE_PAD_CLASS(klass) \
127 (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_TEE_PAD))
128 #define GST_TEE_PAD_CAST(obj) \
129 ((GstTeePad *)(obj))
130
131 typedef struct _GstTeePad GstTeePad;
132 typedef struct _GstTeePadClass GstTeePadClass;
133
134 struct _GstTeePad
135 {
136 GstPad parent;
137
138 guint index;
139 gboolean pushed;
140 GstFlowReturn result;
141 gboolean removed;
142 };
143
144 struct _GstTeePadClass
145 {
146 GstPadClass parent;
147 };
148
149 G_DEFINE_TYPE (GstTeePad, gst_tee_pad, GST_TYPE_PAD);
150
151 static void
gst_tee_pad_class_init(GstTeePadClass * klass)152 gst_tee_pad_class_init (GstTeePadClass * klass)
153 {
154 }
155
156 static void
gst_tee_pad_reset(GstTeePad * pad)157 gst_tee_pad_reset (GstTeePad * pad)
158 {
159 pad->pushed = FALSE;
160 pad->result = GST_FLOW_NOT_LINKED;
161 pad->removed = FALSE;
162 }
163
164 static void
gst_tee_pad_init(GstTeePad * pad)165 gst_tee_pad_init (GstTeePad * pad)
166 {
167 gst_tee_pad_reset (pad);
168 }
169
170 static GstPad *gst_tee_request_new_pad (GstElement * element,
171 GstPadTemplate * temp, const gchar * unused, const GstCaps * caps);
172 static void gst_tee_release_pad (GstElement * element, GstPad * pad);
173
174 static void gst_tee_finalize (GObject * object);
175 static void gst_tee_set_property (GObject * object, guint prop_id,
176 const GValue * value, GParamSpec * pspec);
177 static void gst_tee_get_property (GObject * object, guint prop_id,
178 GValue * value, GParamSpec * pspec);
179 static void gst_tee_dispose (GObject * object);
180
181 static GstFlowReturn gst_tee_chain (GstPad * pad, GstObject * parent,
182 GstBuffer * buffer);
183 static GstFlowReturn gst_tee_chain_list (GstPad * pad, GstObject * parent,
184 GstBufferList * list);
185 static gboolean gst_tee_sink_event (GstPad * pad, GstObject * parent,
186 GstEvent * event);
187 static gboolean gst_tee_sink_query (GstPad * pad, GstObject * parent,
188 GstQuery * query);
189 static gboolean gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent,
190 GstPadMode mode, gboolean active);
191 static gboolean gst_tee_src_query (GstPad * pad, GstObject * parent,
192 GstQuery * query);
193 static gboolean gst_tee_src_activate_mode (GstPad * pad, GstObject * parent,
194 GstPadMode mode, gboolean active);
195 static GstFlowReturn gst_tee_src_get_range (GstPad * pad, GstObject * parent,
196 guint64 offset, guint length, GstBuffer ** buf);
197
198 static void
gst_tee_dispose(GObject * object)199 gst_tee_dispose (GObject * object)
200 {
201 GList *item;
202
203 restart:
204 for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
205 GstPad *pad = GST_PAD (item->data);
206 if (GST_PAD_IS_SRC (pad)) {
207 gst_element_release_request_pad (GST_ELEMENT (object), pad);
208 goto restart;
209 }
210 }
211
212 G_OBJECT_CLASS (parent_class)->dispose (object);
213 }
214
215 static void
gst_tee_finalize(GObject * object)216 gst_tee_finalize (GObject * object)
217 {
218 GstTee *tee;
219
220 tee = GST_TEE (object);
221
222 g_hash_table_unref (tee->pad_indexes);
223
224 g_free (tee->last_message);
225
226 G_OBJECT_CLASS (parent_class)->finalize (object);
227 }
228
229 static void
gst_tee_class_init(GstTeeClass * klass)230 gst_tee_class_init (GstTeeClass * klass)
231 {
232 GObjectClass *gobject_class;
233 GstElementClass *gstelement_class;
234
235 gobject_class = G_OBJECT_CLASS (klass);
236 gstelement_class = GST_ELEMENT_CLASS (klass);
237
238 gobject_class->finalize = gst_tee_finalize;
239 gobject_class->set_property = gst_tee_set_property;
240 gobject_class->get_property = gst_tee_get_property;
241 gobject_class->dispose = gst_tee_dispose;
242
243 g_object_class_install_property (gobject_class, PROP_NUM_SRC_PADS,
244 g_param_spec_int ("num-src-pads", "Num Src Pads",
245 "The number of source pads", 0, G_MAXINT, DEFAULT_PROP_NUM_SRC_PADS,
246 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
247 g_object_class_install_property (gobject_class, PROP_HAS_CHAIN,
248 g_param_spec_boolean ("has-chain", "Has Chain",
249 "If the element can operate in push mode", DEFAULT_PROP_HAS_CHAIN,
250 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
251 g_object_class_install_property (gobject_class, PROP_SILENT,
252 g_param_spec_boolean ("silent", "Silent",
253 "Don't produce last_message events", DEFAULT_PROP_SILENT,
254 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
255 pspec_last_message = g_param_spec_string ("last-message", "Last Message",
256 "The message describing current status", DEFAULT_PROP_LAST_MESSAGE,
257 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
258 g_object_class_install_property (gobject_class, PROP_LAST_MESSAGE,
259 pspec_last_message);
260 g_object_class_install_property (gobject_class, PROP_PULL_MODE,
261 g_param_spec_enum ("pull-mode", "Pull mode",
262 "Behavior of tee in pull mode", GST_TYPE_TEE_PULL_MODE,
263 DEFAULT_PULL_MODE,
264 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
265 pspec_alloc_pad = g_param_spec_object ("alloc-pad", "Allocation Src Pad",
266 "The pad ALLOCATION queries will be proxied to (DEPRECATED, has no effect)",
267 GST_TYPE_PAD,
268 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED);
269 g_object_class_install_property (gobject_class, PROP_ALLOC_PAD,
270 pspec_alloc_pad);
271
272 /**
273 * GstTee:allow-not-linked
274 *
275 * This property makes sink pad return GST_FLOW_OK even if there are no
276 * source pads or any of them is linked.
277 *
278 * This is useful to avoid errors when you have a dynamic pipeline and during
279 * a reconnection you can have all the pads unlinked or removed.
280 *
281 * Since: 1.6
282 */
283 g_object_class_install_property (gobject_class, PROP_ALLOW_NOT_LINKED,
284 g_param_spec_boolean ("allow-not-linked", "Allow not linked",
285 "Return GST_FLOW_OK even if there are no source pads or they are "
286 "all unlinked", DEFAULT_PROP_ALLOW_NOT_LINKED,
287 G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
288
289 gst_element_class_set_static_metadata (gstelement_class,
290 "Tee pipe fitting",
291 "Generic",
292 "1-to-N pipe fitting",
293 "Erik Walthinsen <omega@cse.ogi.edu>, " "Wim Taymans <wim@fluendo.com>");
294 gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
295 gst_element_class_add_static_pad_template (gstelement_class, &src_template);
296
297 gstelement_class->request_new_pad =
298 GST_DEBUG_FUNCPTR (gst_tee_request_new_pad);
299 gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_tee_release_pad);
300 }
301
302 static void
gst_tee_init(GstTee * tee)303 gst_tee_init (GstTee * tee)
304 {
305 tee->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
306 tee->sink_mode = GST_PAD_MODE_NONE;
307
308 gst_pad_set_event_function (tee->sinkpad,
309 GST_DEBUG_FUNCPTR (gst_tee_sink_event));
310 gst_pad_set_query_function (tee->sinkpad,
311 GST_DEBUG_FUNCPTR (gst_tee_sink_query));
312 gst_pad_set_activatemode_function (tee->sinkpad,
313 GST_DEBUG_FUNCPTR (gst_tee_sink_activate_mode));
314 gst_pad_set_chain_function (tee->sinkpad, GST_DEBUG_FUNCPTR (gst_tee_chain));
315 gst_pad_set_chain_list_function (tee->sinkpad,
316 GST_DEBUG_FUNCPTR (gst_tee_chain_list));
317 GST_OBJECT_FLAG_SET (tee->sinkpad, GST_PAD_FLAG_PROXY_CAPS);
318 gst_element_add_pad (GST_ELEMENT (tee), tee->sinkpad);
319
320 tee->pad_indexes = g_hash_table_new (NULL, NULL);
321
322 tee->last_message = NULL;
323 }
324
325 static void
gst_tee_notify_alloc_pad(GstTee * tee)326 gst_tee_notify_alloc_pad (GstTee * tee)
327 {
328 g_object_notify_by_pspec ((GObject *) tee, pspec_alloc_pad);
329 }
330
331 static gboolean
forward_sticky_events(GstPad * pad,GstEvent ** event,gpointer user_data)332 forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
333 {
334 GstPad *srcpad = GST_PAD_CAST (user_data);
335 GstFlowReturn ret;
336
337 ret = gst_pad_store_sticky_event (srcpad, *event);
338 if (ret != GST_FLOW_OK) {
339 GST_DEBUG_OBJECT (srcpad, "storing sticky event %p (%s) failed: %s", *event,
340 GST_EVENT_TYPE_NAME (*event), gst_flow_get_name (ret));
341 }
342
343 return TRUE;
344 }
345
346 static GstPad *
gst_tee_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name_templ,const GstCaps * caps)347 gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ,
348 const gchar * name_templ, const GstCaps * caps)
349 {
350 gchar *name;
351 GstPad *srcpad;
352 GstTee *tee;
353 GstPadMode mode;
354 gboolean res;
355 guint index = 0;
356
357 tee = GST_TEE (element);
358
359 GST_DEBUG_OBJECT (tee, "requesting pad");
360
361 GST_OBJECT_LOCK (tee);
362
363 if (name_templ && sscanf (name_templ, "src_%u", &index) == 1) {
364 GST_LOG_OBJECT (element, "name: %s (index %d)", name_templ, index);
365 if (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index))) {
366 GST_ERROR_OBJECT (element, "pad name %s is not unique", name_templ);
367 GST_OBJECT_UNLOCK (tee);
368 return NULL;
369 }
370 if (index >= tee->next_pad_index)
371 tee->next_pad_index = index + 1;
372 } else {
373 index = tee->next_pad_index;
374
375 while (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index)))
376 index++;
377
378 tee->next_pad_index = index + 1;
379 }
380
381 g_hash_table_insert (tee->pad_indexes, GUINT_TO_POINTER (index), NULL);
382
383 name = g_strdup_printf ("src_%u", index);
384
385 srcpad = GST_PAD_CAST (g_object_new (GST_TYPE_TEE_PAD,
386 "name", name, "direction", templ->direction, "template", templ,
387 NULL));
388 GST_TEE_PAD_CAST (srcpad)->index = index;
389 g_free (name);
390
391 mode = tee->sink_mode;
392
393 GST_OBJECT_UNLOCK (tee);
394
395 switch (mode) {
396 case GST_PAD_MODE_PULL:
397 /* we already have a src pad in pull mode, and our pull mode can only be
398 SINGLE, so fall through to activate this new pad in push mode */
399 case GST_PAD_MODE_PUSH:
400 res = gst_pad_activate_mode (srcpad, GST_PAD_MODE_PUSH, TRUE);
401 break;
402 default:
403 res = TRUE;
404 break;
405 }
406
407 if (!res)
408 goto activate_failed;
409
410 gst_pad_set_activatemode_function (srcpad,
411 GST_DEBUG_FUNCPTR (gst_tee_src_activate_mode));
412 gst_pad_set_query_function (srcpad, GST_DEBUG_FUNCPTR (gst_tee_src_query));
413 gst_pad_set_getrange_function (srcpad,
414 GST_DEBUG_FUNCPTR (gst_tee_src_get_range));
415 GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS);
416 /* Forward sticky events to the new srcpad */
417 gst_pad_sticky_events_foreach (tee->sinkpad, forward_sticky_events, srcpad);
418 gst_element_add_pad (GST_ELEMENT_CAST (tee), srcpad);
419
420 return srcpad;
421
422 /* ERRORS */
423 activate_failed:
424 {
425 gboolean changed = FALSE;
426
427 GST_OBJECT_LOCK (tee);
428 GST_DEBUG_OBJECT (tee, "warning failed to activate request pad");
429 if (tee->allocpad == srcpad) {
430 tee->allocpad = NULL;
431 changed = TRUE;
432 }
433 GST_OBJECT_UNLOCK (tee);
434 gst_object_unref (srcpad);
435 if (changed) {
436 gst_tee_notify_alloc_pad (tee);
437 }
438 return NULL;
439 }
440 }
441
442 static void
gst_tee_release_pad(GstElement * element,GstPad * pad)443 gst_tee_release_pad (GstElement * element, GstPad * pad)
444 {
445 GstTee *tee;
446 gboolean changed = FALSE;
447 guint index;
448
449 tee = GST_TEE (element);
450
451 GST_DEBUG_OBJECT (tee, "releasing pad");
452
453 GST_OBJECT_LOCK (tee);
454 index = GST_TEE_PAD_CAST (pad)->index;
455 /* mark the pad as removed so that future pad_alloc fails with NOT_LINKED. */
456 GST_TEE_PAD_CAST (pad)->removed = TRUE;
457 if (tee->allocpad == pad) {
458 tee->allocpad = NULL;
459 changed = TRUE;
460 }
461 GST_OBJECT_UNLOCK (tee);
462
463 gst_pad_set_active (pad, FALSE);
464 gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad);
465
466 if (changed) {
467 gst_tee_notify_alloc_pad (tee);
468 }
469
470 GST_OBJECT_LOCK (tee);
471 g_hash_table_remove (tee->pad_indexes, GUINT_TO_POINTER (index));
472 GST_OBJECT_UNLOCK (tee);
473 }
474
475 static void
gst_tee_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)476 gst_tee_set_property (GObject * object, guint prop_id, const GValue * value,
477 GParamSpec * pspec)
478 {
479 GstTee *tee = GST_TEE (object);
480
481 GST_OBJECT_LOCK (tee);
482 switch (prop_id) {
483 case PROP_HAS_CHAIN:
484 tee->has_chain = g_value_get_boolean (value);
485 break;
486 case PROP_SILENT:
487 tee->silent = g_value_get_boolean (value);
488 break;
489 case PROP_PULL_MODE:
490 tee->pull_mode = (GstTeePullMode) g_value_get_enum (value);
491 break;
492 case PROP_ALLOC_PAD:
493 {
494 GstPad *pad = g_value_get_object (value);
495 GST_OBJECT_LOCK (pad);
496 if (GST_OBJECT_PARENT (pad) == GST_OBJECT_CAST (object))
497 tee->allocpad = pad;
498 else
499 GST_WARNING_OBJECT (object, "Tried to set alloc pad %s which"
500 " is not my pad", GST_OBJECT_NAME (pad));
501 GST_OBJECT_UNLOCK (pad);
502 break;
503 }
504 case PROP_ALLOW_NOT_LINKED:
505 tee->allow_not_linked = g_value_get_boolean (value);
506 break;
507 default:
508 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
509 break;
510 }
511 GST_OBJECT_UNLOCK (tee);
512 }
513
514 static void
gst_tee_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)515 gst_tee_get_property (GObject * object, guint prop_id, GValue * value,
516 GParamSpec * pspec)
517 {
518 GstTee *tee = GST_TEE (object);
519
520 GST_OBJECT_LOCK (tee);
521 switch (prop_id) {
522 case PROP_NUM_SRC_PADS:
523 g_value_set_int (value, GST_ELEMENT (tee)->numsrcpads);
524 break;
525 case PROP_HAS_CHAIN:
526 g_value_set_boolean (value, tee->has_chain);
527 break;
528 case PROP_SILENT:
529 g_value_set_boolean (value, tee->silent);
530 break;
531 case PROP_LAST_MESSAGE:
532 g_value_set_string (value, tee->last_message);
533 break;
534 case PROP_PULL_MODE:
535 g_value_set_enum (value, tee->pull_mode);
536 break;
537 case PROP_ALLOC_PAD:
538 g_value_set_object (value, tee->allocpad);
539 break;
540 case PROP_ALLOW_NOT_LINKED:
541 g_value_set_boolean (value, tee->allow_not_linked);
542 break;
543 default:
544 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
545 break;
546 }
547 GST_OBJECT_UNLOCK (tee);
548 }
549
550 static gboolean
gst_tee_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)551 gst_tee_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
552 {
553 gboolean res;
554
555 switch (GST_EVENT_TYPE (event)) {
556 default:
557 res = gst_pad_event_default (pad, parent, event);
558 break;
559 }
560
561 return res;
562 }
563
564 struct AllocQueryCtx
565 {
566 GstTee *tee;
567 GstQuery *query;
568 GstAllocationParams params;
569 guint size;
570 guint min_buffers;
571 gboolean first_query;
572 guint num_pads;
573 };
574
575 /* This function will aggregate some of the allocation query information with
576 * the strategy to force upstream allocation. Depending on downstream
577 * allocation would otherwise make dynamic pipelines much more complicated as
578 * application would need to now drain buffer in certain cases before getting
579 * rid of a tee branch. */
580 static gboolean
gst_tee_query_allocation(const GValue * item,GValue * ret,gpointer user_data)581 gst_tee_query_allocation (const GValue * item, GValue * ret, gpointer user_data)
582 {
583 struct AllocQueryCtx *ctx = user_data;
584 GstPad *src_pad = g_value_get_object (item);
585 GstPad *peer_pad;
586 GstCaps *caps;
587 GstQuery *query;
588 guint count, i, size, min;
589
590 GST_DEBUG_OBJECT (ctx->tee, "Aggregating allocation from pad %s:%s",
591 GST_DEBUG_PAD_NAME (src_pad));
592
593 peer_pad = gst_pad_get_peer (src_pad);
594 if (!peer_pad) {
595 if (ctx->tee->allow_not_linked) {
596 GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, but allowed.",
597 GST_DEBUG_PAD_NAME (src_pad));
598 return TRUE;
599 } else {
600 GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, ignoring allocation.",
601 GST_DEBUG_PAD_NAME (src_pad));
602 g_value_set_boolean (ret, FALSE);
603 return FALSE;
604 }
605 }
606
607 gst_query_parse_allocation (ctx->query, &caps, NULL);
608
609 query = gst_query_new_allocation (caps, FALSE);
610 if (!gst_pad_query (peer_pad, query)) {
611 GST_DEBUG_OBJECT (ctx->tee,
612 "Allocation query failed on pad %s, ignoring allocation",
613 GST_PAD_NAME (src_pad));
614 g_value_set_boolean (ret, FALSE);
615 gst_query_unref (query);
616 gst_object_unref (peer_pad);
617 return FALSE;
618 }
619
620 gst_object_unref (peer_pad);
621
622 /* Allocation Params:
623 * store the maximum alignment, prefix and padding, but ignore the
624 * allocators and the flags which are tied to downstream allocation */
625 count = gst_query_get_n_allocation_params (query);
626 for (i = 0; i < count; i++) {
627 GstAllocationParams params = { 0, };
628
629 gst_query_parse_nth_allocation_param (query, i, NULL, ¶ms);
630
631 GST_DEBUG_OBJECT (ctx->tee, "Aggregating AllocationParams align=%"
632 G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%"
633 G_GSIZE_FORMAT, params.align, params.prefix, params.padding);
634
635 if (ctx->params.align < params.align)
636 ctx->params.align = params.align;
637
638 if (ctx->params.prefix < params.prefix)
639 ctx->params.prefix = params.prefix;
640
641 if (ctx->params.padding < params.padding)
642 ctx->params.padding = params.padding;
643 }
644
645 /* Allocation Pool:
646 * We want to keep the biggest size and biggest minimum number of buffers to
647 * make sure downstream requirement can be satisfied. We don't really care
648 * about the maximum, as this is a parameter of the downstream provided
649 * pool. We only read the first allocation pool as the minimum number of
650 * buffers is normally constant regardless of the pool being used. */
651 if (gst_query_get_n_allocation_pools (query) > 0) {
652 gst_query_parse_nth_allocation_pool (query, 0, NULL, &size, &min, NULL);
653
654 GST_DEBUG_OBJECT (ctx->tee,
655 "Aggregating allocation pool size=%u min_buffers=%u", size, min);
656
657 if (ctx->size < size)
658 ctx->size = size;
659
660 if (ctx->min_buffers < min)
661 ctx->min_buffers = min;
662 }
663
664 /* Allocation Meta:
665 * For allocation meta, we'll need to aggregate the argument using the new
666 * GstMetaInfo::agggregate_func */
667 count = gst_query_get_n_allocation_metas (query);
668 for (i = 0; i < count; i++) {
669 guint ctx_index;
670 GType api;
671 const GstStructure *param;
672
673 api = gst_query_parse_nth_allocation_meta (query, i, ¶m);
674
675 /* For the first query, copy all metas */
676 if (ctx->first_query) {
677 gst_query_add_allocation_meta (ctx->query, api, param);
678 continue;
679 }
680
681 /* Afterward, aggregate the common params */
682 if (gst_query_find_allocation_meta (ctx->query, api, &ctx_index)) {
683 const GstStructure *ctx_param;
684
685 gst_query_parse_nth_allocation_meta (ctx->query, ctx_index, &ctx_param);
686
687 /* Keep meta which has no params */
688 if (ctx_param == NULL && param == NULL)
689 continue;
690
691 GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s",
692 g_type_name (api));
693 gst_query_remove_nth_allocation_meta (ctx->query, ctx_index);
694 }
695 }
696
697 /* Finally, cleanup metas from the stored query that aren't support on this
698 * pad. */
699 count = gst_query_get_n_allocation_metas (ctx->query);
700 for (i = 0; i < count;) {
701 GType api = gst_query_parse_nth_allocation_meta (ctx->query, i, NULL);
702
703 if (!gst_query_find_allocation_meta (query, api, NULL)) {
704 GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s",
705 g_type_name (api));
706 gst_query_remove_nth_allocation_meta (ctx->query, i);
707 count--;
708 continue;
709 }
710
711 i++;
712 }
713
714 ctx->first_query = FALSE;
715 ctx->num_pads++;
716 gst_query_unref (query);
717
718 return TRUE;
719 }
720
721
722 static void
gst_tee_clear_query_allocation_meta(GstQuery * query)723 gst_tee_clear_query_allocation_meta (GstQuery * query)
724 {
725 guint count = gst_query_get_n_allocation_metas (query);
726 guint i;
727
728 for (i = 1; i <= count; i++)
729 gst_query_remove_nth_allocation_meta (query, count - i);
730 }
731
732 static gboolean
gst_tee_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)733 gst_tee_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
734 {
735 GstTee *tee = GST_TEE (parent);
736 gboolean res;
737
738 switch (GST_QUERY_TYPE (query)) {
739 case GST_QUERY_ALLOCATION:
740 {
741 GstIterator *iter;
742 GValue ret = G_VALUE_INIT;
743 struct AllocQueryCtx ctx = { tee, query, };
744
745 g_value_init (&ret, G_TYPE_BOOLEAN);
746 g_value_set_boolean (&ret, TRUE);
747
748 ctx.first_query = TRUE;
749 gst_allocation_params_init (&ctx.params);
750
751 iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
752 while (GST_ITERATOR_RESYNC ==
753 gst_iterator_fold (iter, gst_tee_query_allocation, &ret, &ctx)) {
754 gst_iterator_resync (iter);
755 ctx.first_query = TRUE;
756 gst_allocation_params_init (&ctx.params);
757 ctx.size = 0;
758 ctx.min_buffers = 0;
759 ctx.num_pads = 0;
760 gst_tee_clear_query_allocation_meta (query);
761 }
762
763 gst_iterator_free (iter);
764 res = g_value_get_boolean (&ret);
765 g_value_unset (&ret);
766
767 if (res) {
768 GST_DEBUG_OBJECT (tee, "Aggregated AllocationParams to align=%"
769 G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%"
770 G_GSIZE_FORMAT, ctx.params.align, ctx.params.prefix,
771 ctx.params.padding);
772
773 GST_DEBUG_OBJECT (tee,
774 "Aggregated allocation pools size=%u min_buffers=%u", ctx.size,
775 ctx.min_buffers);
776
777 #ifndef GST_DISABLE_GST_DEBUG
778 {
779 guint count = gst_query_get_n_allocation_metas (query);
780 guint i;
781
782 GST_DEBUG_OBJECT (tee, "Aggregated %u allocation meta:", count);
783
784 for (i = 0; i < count; i++)
785 GST_DEBUG_OBJECT (tee, " + aggregated allocation meta %s",
786 g_type_name (gst_query_parse_nth_allocation_meta (ctx.query, i,
787 NULL)));
788 }
789 #endif
790
791 /* Allocate one more buffers when multiplexing so we don't starve the
792 * downstream threads. */
793 if (ctx.num_pads > 1)
794 ctx.min_buffers++;
795
796 /* Check that we actually have parameters besides the defaults. */
797 if (ctx.params.align || ctx.params.prefix || ctx.params.padding) {
798 gst_query_add_allocation_param (ctx.query, NULL, &ctx.params);
799 }
800 /* When size == 0, buffers created from this pool would have no memory
801 * allocated. */
802 if (ctx.size) {
803 gst_query_add_allocation_pool (ctx.query, NULL, ctx.size,
804 ctx.min_buffers, 0);
805 }
806 } else {
807 gst_tee_clear_query_allocation_meta (query);
808 }
809 break;
810 }
811 default:
812 res = gst_pad_query_default (pad, parent, query);
813 break;
814 }
815 return res;
816 }
817
818 static void
gst_tee_do_message(GstTee * tee,GstPad * pad,gpointer data,gboolean is_list)819 gst_tee_do_message (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
820 {
821 GST_OBJECT_LOCK (tee);
822 g_free (tee->last_message);
823 if (is_list) {
824 tee->last_message =
825 g_strdup_printf ("chain-list ******* (%s:%s)t %p",
826 GST_DEBUG_PAD_NAME (pad), data);
827 } else {
828 tee->last_message =
829 g_strdup_printf ("chain ******* (%s:%s)t (%" G_GSIZE_FORMAT
830 " bytes, %" G_GUINT64_FORMAT ") %p", GST_DEBUG_PAD_NAME (pad),
831 gst_buffer_get_size (data), GST_BUFFER_TIMESTAMP (data), data);
832 }
833 GST_OBJECT_UNLOCK (tee);
834
835 g_object_notify_by_pspec ((GObject *) tee, pspec_last_message);
836 }
837
838 static GstFlowReturn
gst_tee_do_push(GstTee * tee,GstPad * pad,gpointer data,gboolean is_list)839 gst_tee_do_push (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list)
840 {
841 GstFlowReturn res;
842
843 /* Push */
844 if (pad == tee->pull_pad) {
845 /* don't push on the pad we're pulling from */
846 res = GST_FLOW_OK;
847 } else if (is_list) {
848 res =
849 gst_pad_push_list (pad,
850 gst_buffer_list_ref (GST_BUFFER_LIST_CAST (data)));
851 } else {
852 res = gst_pad_push (pad, gst_buffer_ref (GST_BUFFER_CAST (data)));
853 }
854 return res;
855 }
856
857 static void
clear_pads(GstPad * pad,GstTee * tee)858 clear_pads (GstPad * pad, GstTee * tee)
859 {
860 GST_TEE_PAD_CAST (pad)->pushed = FALSE;
861 GST_TEE_PAD_CAST (pad)->result = GST_FLOW_NOT_LINKED;
862 }
863
864 static GstFlowReturn
gst_tee_handle_data(GstTee * tee,gpointer data,gboolean is_list)865 gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list)
866 {
867 GList *pads;
868 guint32 cookie;
869 GstFlowReturn ret, cret;
870
871 if (G_UNLIKELY (!tee->silent))
872 gst_tee_do_message (tee, tee->sinkpad, data, is_list);
873
874 GST_OBJECT_LOCK (tee);
875 pads = GST_ELEMENT_CAST (tee)->srcpads;
876
877 /* special case for zero pads */
878 if (G_UNLIKELY (!pads))
879 goto no_pads;
880
881 /* special case for just one pad that avoids reffing the buffer */
882 if (!pads->next) {
883 GstPad *pad = GST_PAD_CAST (pads->data);
884
885 /* Keep another ref around, a pad probe
886 * might release and destroy the pad */
887 gst_object_ref (pad);
888 GST_OBJECT_UNLOCK (tee);
889
890 if (pad == tee->pull_pad) {
891 ret = GST_FLOW_OK;
892 } else if (is_list) {
893 ret = gst_pad_push_list (pad, GST_BUFFER_LIST_CAST (data));
894 } else {
895 ret = gst_pad_push (pad, GST_BUFFER_CAST (data));
896 }
897
898 GST_OBJECT_LOCK (tee);
899 if (GST_TEE_PAD_CAST (pad)->removed)
900 ret = GST_FLOW_NOT_LINKED;
901
902 if (ret == GST_FLOW_NOT_LINKED && tee->allow_not_linked) {
903 ret = GST_FLOW_OK;
904 }
905 GST_OBJECT_UNLOCK (tee);
906
907 gst_object_unref (pad);
908
909 return ret;
910 }
911
912 /* mark all pads as 'not pushed on yet' */
913 g_list_foreach (pads, (GFunc) clear_pads, tee);
914
915 restart:
916 if (tee->allow_not_linked) {
917 cret = GST_FLOW_OK;
918 } else {
919 cret = GST_FLOW_NOT_LINKED;
920 }
921 pads = GST_ELEMENT_CAST (tee)->srcpads;
922 cookie = GST_ELEMENT_CAST (tee)->pads_cookie;
923
924 while (pads) {
925 GstPad *pad;
926
927 pad = GST_PAD_CAST (pads->data);
928
929 if (G_LIKELY (!GST_TEE_PAD_CAST (pad)->pushed)) {
930 /* not yet pushed, release lock and start pushing */
931 gst_object_ref (pad);
932 GST_OBJECT_UNLOCK (tee);
933
934 GST_LOG_OBJECT (pad, "Starting to push %s %p",
935 is_list ? "list" : "buffer", data);
936
937 ret = gst_tee_do_push (tee, pad, data, is_list);
938
939 GST_LOG_OBJECT (pad, "Pushing item %p yielded result %s", data,
940 gst_flow_get_name (ret));
941
942 GST_OBJECT_LOCK (tee);
943 /* keep track of which pad we pushed and the result value */
944 if (GST_TEE_PAD_CAST (pad)->removed)
945 ret = GST_FLOW_NOT_LINKED;
946 GST_TEE_PAD_CAST (pad)->pushed = TRUE;
947 GST_TEE_PAD_CAST (pad)->result = ret;
948 gst_object_unref (pad);
949 pad = NULL;
950 } else {
951 /* already pushed, use previous return value */
952 ret = GST_TEE_PAD_CAST (pad)->result;
953 GST_LOG_OBJECT (pad, "pad already pushed with %s",
954 gst_flow_get_name (ret));
955 }
956
957 /* before we go combining the return value, check if the pad list is still
958 * the same. It could be possible that the pad we just pushed was removed
959 * and the return value it not valid anymore */
960 if (G_UNLIKELY (GST_ELEMENT_CAST (tee)->pads_cookie != cookie)) {
961 GST_LOG_OBJECT (tee, "pad list changed");
962 /* the list of pads changed, restart iteration. Pads that we already
963 * pushed on and are still in the new list, will not be pushed on
964 * again. */
965 goto restart;
966 }
967
968 /* stop pushing more buffers when we have a fatal error */
969 if (G_UNLIKELY (ret != GST_FLOW_OK && ret != GST_FLOW_NOT_LINKED))
970 goto error;
971
972 /* keep all other return values, overwriting the previous one. */
973 if (G_LIKELY (ret != GST_FLOW_NOT_LINKED)) {
974 GST_LOG_OBJECT (tee, "Replacing ret val %d with %d", cret, ret);
975 cret = ret;
976 }
977 pads = g_list_next (pads);
978 }
979 GST_OBJECT_UNLOCK (tee);
980
981 gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
982
983 /* no need to unset gvalue */
984 return cret;
985
986 /* ERRORS */
987 no_pads:
988 {
989 if (tee->allow_not_linked) {
990 GST_DEBUG_OBJECT (tee, "there are no pads, dropping %s",
991 is_list ? "buffer-list" : "buffer");
992 ret = GST_FLOW_OK;
993 } else {
994 GST_DEBUG_OBJECT (tee, "there are no pads, return not-linked");
995 ret = GST_FLOW_NOT_LINKED;
996 }
997 goto end;
998 }
999 error:
1000 {
1001 GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret));
1002 goto end;
1003 }
1004 end:
1005 {
1006 GST_OBJECT_UNLOCK (tee);
1007 gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
1008 return ret;
1009 }
1010 }
1011
1012 static GstFlowReturn
gst_tee_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)1013 gst_tee_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1014 {
1015 GstFlowReturn res;
1016 GstTee *tee;
1017
1018 tee = GST_TEE_CAST (parent);
1019
1020 GST_DEBUG_OBJECT (tee, "received buffer %p", buffer);
1021
1022 res = gst_tee_handle_data (tee, buffer, FALSE);
1023
1024 GST_DEBUG_OBJECT (tee, "handled buffer %s", gst_flow_get_name (res));
1025
1026 return res;
1027 }
1028
1029 static GstFlowReturn
gst_tee_chain_list(GstPad * pad,GstObject * parent,GstBufferList * list)1030 gst_tee_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
1031 {
1032 GstFlowReturn res;
1033 GstTee *tee;
1034
1035 tee = GST_TEE_CAST (parent);
1036
1037 GST_DEBUG_OBJECT (tee, "received list %p", list);
1038
1039 res = gst_tee_handle_data (tee, list, TRUE);
1040
1041 GST_DEBUG_OBJECT (tee, "handled list %s", gst_flow_get_name (res));
1042
1043 return res;
1044 }
1045
1046 static gboolean
gst_tee_sink_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)1047 gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
1048 gboolean active)
1049 {
1050 gboolean res;
1051 GstTee *tee;
1052
1053 tee = GST_TEE (parent);
1054
1055 switch (mode) {
1056 case GST_PAD_MODE_PUSH:
1057 {
1058 GST_OBJECT_LOCK (tee);
1059 tee->sink_mode = active ? mode : GST_PAD_MODE_NONE;
1060
1061 if (active && !tee->has_chain)
1062 goto no_chain;
1063 GST_OBJECT_UNLOCK (tee);
1064 res = TRUE;
1065 break;
1066 }
1067 default:
1068 res = FALSE;
1069 break;
1070 }
1071 return res;
1072
1073 /* ERRORS */
1074 no_chain:
1075 {
1076 GST_OBJECT_UNLOCK (tee);
1077 GST_INFO_OBJECT (tee,
1078 "Tee cannot operate in push mode with has-chain==FALSE");
1079 return FALSE;
1080 }
1081 }
1082
1083 static gboolean
gst_tee_src_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)1084 gst_tee_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
1085 gboolean active)
1086 {
1087 GstTee *tee;
1088 gboolean res;
1089 GstPad *sinkpad;
1090
1091 tee = GST_TEE (parent);
1092
1093 switch (mode) {
1094 case GST_PAD_MODE_PULL:
1095 {
1096 GST_OBJECT_LOCK (tee);
1097
1098 if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER)
1099 goto cannot_pull;
1100
1101 if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && active && tee->pull_pad)
1102 goto cannot_pull_multiple_srcs;
1103
1104 sinkpad = gst_object_ref (tee->sinkpad);
1105
1106 GST_OBJECT_UNLOCK (tee);
1107
1108 res = gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, active);
1109 gst_object_unref (sinkpad);
1110
1111 if (!res)
1112 goto sink_activate_failed;
1113
1114 GST_OBJECT_LOCK (tee);
1115 if (active) {
1116 if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE)
1117 tee->pull_pad = pad;
1118 } else {
1119 if (pad == tee->pull_pad)
1120 tee->pull_pad = NULL;
1121 }
1122 tee->sink_mode = (active ? GST_PAD_MODE_PULL : GST_PAD_MODE_NONE);
1123 GST_OBJECT_UNLOCK (tee);
1124 break;
1125 }
1126 default:
1127 res = TRUE;
1128 break;
1129 }
1130
1131 return res;
1132
1133 /* ERRORS */
1134 cannot_pull:
1135 {
1136 GST_OBJECT_UNLOCK (tee);
1137 GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
1138 "set to NEVER");
1139 return FALSE;
1140 }
1141 cannot_pull_multiple_srcs:
1142 {
1143 GST_OBJECT_UNLOCK (tee);
1144 GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
1145 "pull-mode set to SINGLE");
1146 return FALSE;
1147 }
1148 sink_activate_failed:
1149 {
1150 GST_INFO_OBJECT (tee, "Failed to %sactivate sink pad in pull mode",
1151 active ? "" : "de");
1152 return FALSE;
1153 }
1154 }
1155
1156 static gboolean
gst_tee_src_query(GstPad * pad,GstObject * parent,GstQuery * query)1157 gst_tee_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1158 {
1159 GstTee *tee;
1160 gboolean res;
1161 GstPad *sinkpad;
1162
1163 tee = GST_TEE (parent);
1164
1165 switch (GST_QUERY_TYPE (query)) {
1166 case GST_QUERY_SCHEDULING:
1167 {
1168 gboolean pull_mode;
1169
1170 GST_OBJECT_LOCK (tee);
1171 pull_mode = TRUE;
1172 if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER) {
1173 GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
1174 "set to NEVER");
1175 pull_mode = FALSE;
1176 } else if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && tee->pull_pad) {
1177 GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
1178 "pull-mode set to SINGLE");
1179 pull_mode = FALSE;
1180 }
1181
1182 sinkpad = gst_object_ref (tee->sinkpad);
1183 GST_OBJECT_UNLOCK (tee);
1184
1185 if (pull_mode) {
1186 /* ask peer if we can operate in pull mode */
1187 res = gst_pad_peer_query (sinkpad, query);
1188 } else {
1189 res = TRUE;
1190 }
1191 gst_object_unref (sinkpad);
1192 break;
1193 }
1194 default:
1195 res = gst_pad_query_default (pad, parent, query);
1196 break;
1197 }
1198
1199 return res;
1200 }
1201
1202 static void
gst_tee_push_eos(const GValue * vpad,GstTee * tee)1203 gst_tee_push_eos (const GValue * vpad, GstTee * tee)
1204 {
1205 GstPad *pad = g_value_get_object (vpad);
1206
1207 if (pad != tee->pull_pad)
1208 gst_pad_push_event (pad, gst_event_new_eos ());
1209 }
1210
1211 static void
gst_tee_pull_eos(GstTee * tee)1212 gst_tee_pull_eos (GstTee * tee)
1213 {
1214 GstIterator *iter;
1215
1216 iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
1217 while (gst_iterator_foreach (iter,
1218 (GstIteratorForeachFunction) gst_tee_push_eos,
1219 tee) == GST_ITERATOR_RESYNC)
1220 gst_iterator_resync (iter);
1221 gst_iterator_free (iter);
1222 }
1223
1224 static GstFlowReturn
gst_tee_src_get_range(GstPad * pad,GstObject * parent,guint64 offset,guint length,GstBuffer ** buf)1225 gst_tee_src_get_range (GstPad * pad, GstObject * parent, guint64 offset,
1226 guint length, GstBuffer ** buf)
1227 {
1228 GstTee *tee;
1229 GstFlowReturn ret;
1230
1231 tee = GST_TEE (parent);
1232
1233 ret = gst_pad_pull_range (tee->sinkpad, offset, length, buf);
1234
1235 if (ret == GST_FLOW_OK)
1236 ret = gst_tee_handle_data (tee, gst_buffer_ref (*buf), FALSE);
1237 else if (ret == GST_FLOW_EOS)
1238 gst_tee_pull_eos (tee);
1239
1240 return ret;
1241 }
1242