1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2001 Thomas <thomas@apestaart.org>
4  *               2005,2006 Wim Taymans <wim@fluendo.com>
5  *
6  * adder.c: Adder element, N in, one out, samples are added
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 /**
24  * SECTION:element-adder
25  * @title: adder
26  *
27  * The adder allows to mix several streams into one by adding the data.
28  * Mixed data is clamped to the min/max values of the data format.
29  *
30  * The adder currently mixes all data received on the sinkpads as soon as
31  * possible without trying to synchronize the streams.
32  *
33  * Check out the audiomixer element in gst-plugins-bad for a better-behaving
34  * audio mixing element: It will sync input streams correctly and also handle
35  * live inputs properly.
36  *
37  * Caps negotiation is inherently racy with the adder element. You can set
38  * the "caps" property to force adder to operate in a specific audio
39  * format, sample rate and channel count. In this case you may also need
40  * audioconvert and/or audioresample elements for each input stream before the
41  * adder element to make sure the input branch can produce the forced format.
42  *
43  * ## Example launch line
44  * |[
45  * gst-launch-1.0 audiotestsrc freq=100 ! adder name=mix ! audioconvert ! autoaudiosink audiotestsrc freq=500 ! mix.
46  * ]|
47  *  This pipeline produces two sine waves mixed together.
48  *
49  */
50 /* Element-Checklist-Version: 5 */
51 
52 #ifdef HAVE_CONFIG_H
53 #include "config.h"
54 #endif
55 
56 #include "gstadder.h"
57 #include <gst/audio/audio.h>
58 #include <string.h>             /* strcmp */
59 #include "gstadderorc.h"
60 
61 #define GST_CAT_DEFAULT gst_adder_debug
62 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
63 
64 #define DEFAULT_PAD_VOLUME (1.0)
65 #define DEFAULT_PAD_MUTE (FALSE)
66 
67 /* some defines for audio processing */
68 /* the volume factor is a range from 0.0 to (arbitrary) VOLUME_MAX_DOUBLE = 10.0
69  * we map 1.0 to VOLUME_UNITY_INT*
70  */
71 #define VOLUME_UNITY_INT8            8  /* internal int for unity 2^(8-5) */
72 #define VOLUME_UNITY_INT8_BIT_SHIFT  3  /* number of bits to shift for unity */
73 #define VOLUME_UNITY_INT16           2048       /* internal int for unity 2^(16-5) */
74 #define VOLUME_UNITY_INT16_BIT_SHIFT 11 /* number of bits to shift for unity */
75 #define VOLUME_UNITY_INT24           524288     /* internal int for unity 2^(24-5) */
76 #define VOLUME_UNITY_INT24_BIT_SHIFT 19 /* number of bits to shift for unity */
77 #define VOLUME_UNITY_INT32           134217728  /* internal int for unity 2^(32-5) */
78 #define VOLUME_UNITY_INT32_BIT_SHIFT 27
79 
80 enum
81 {
82   PROP_PAD_0,
83   PROP_PAD_VOLUME,
84   PROP_PAD_MUTE
85 };
86 
87 G_DEFINE_TYPE (GstAdderPad, gst_adder_pad, GST_TYPE_PAD);
88 
89 static void
gst_adder_pad_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)90 gst_adder_pad_get_property (GObject * object, guint prop_id,
91     GValue * value, GParamSpec * pspec)
92 {
93   GstAdderPad *pad = GST_ADDER_PAD (object);
94 
95   switch (prop_id) {
96     case PROP_PAD_VOLUME:
97       g_value_set_double (value, pad->volume);
98       break;
99     case PROP_PAD_MUTE:
100       g_value_set_boolean (value, pad->mute);
101       break;
102     default:
103       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
104       break;
105   }
106 }
107 
108 static void
gst_adder_pad_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)109 gst_adder_pad_set_property (GObject * object, guint prop_id,
110     const GValue * value, GParamSpec * pspec)
111 {
112   GstAdderPad *pad = GST_ADDER_PAD (object);
113 
114   switch (prop_id) {
115     case PROP_PAD_VOLUME:
116       GST_OBJECT_LOCK (pad);
117       pad->volume = g_value_get_double (value);
118       pad->volume_i8 = pad->volume * VOLUME_UNITY_INT8;
119       pad->volume_i16 = pad->volume * VOLUME_UNITY_INT16;
120       pad->volume_i32 = pad->volume * VOLUME_UNITY_INT32;
121       GST_OBJECT_UNLOCK (pad);
122       break;
123     case PROP_PAD_MUTE:
124       GST_OBJECT_LOCK (pad);
125       pad->mute = g_value_get_boolean (value);
126       GST_OBJECT_UNLOCK (pad);
127       break;
128     default:
129       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
130       break;
131   }
132 }
133 
134 static void
gst_adder_pad_class_init(GstAdderPadClass * klass)135 gst_adder_pad_class_init (GstAdderPadClass * klass)
136 {
137   GObjectClass *gobject_class = (GObjectClass *) klass;
138 
139   gobject_class->set_property = gst_adder_pad_set_property;
140   gobject_class->get_property = gst_adder_pad_get_property;
141 
142   g_object_class_install_property (gobject_class, PROP_PAD_VOLUME,
143       g_param_spec_double ("volume", "Volume", "Volume of this pad",
144           0.0, 10.0, DEFAULT_PAD_VOLUME,
145           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
146   g_object_class_install_property (gobject_class, PROP_PAD_MUTE,
147       g_param_spec_boolean ("mute", "Mute", "Mute this pad",
148           DEFAULT_PAD_MUTE,
149           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
150 }
151 
152 static void
gst_adder_pad_init(GstAdderPad * pad)153 gst_adder_pad_init (GstAdderPad * pad)
154 {
155   pad->volume = DEFAULT_PAD_VOLUME;
156   pad->mute = DEFAULT_PAD_MUTE;
157 }
158 
159 enum
160 {
161   PROP_0,
162   PROP_FILTER_CAPS
163 };
164 
165 /* elementfactory information */
166 
167 #if G_BYTE_ORDER == G_LITTLE_ENDIAN
168 #define CAPS \
169   GST_AUDIO_CAPS_MAKE ("{ S32LE, U32LE, S16LE, U16LE, S8, U8, F32LE, F64LE }") \
170   ", layout = (string) { interleaved }"
171 #else
172 #define CAPS \
173   GST_AUDIO_CAPS_MAKE ("{ S32BE, U32BE, S16BE, U16BE, S8, U8, F32BE, F64BE }") \
174   ", layout = (string) { interleaved }"
175 #endif
176 
177 static GstStaticPadTemplate gst_adder_src_template =
178 GST_STATIC_PAD_TEMPLATE ("src",
179     GST_PAD_SRC,
180     GST_PAD_ALWAYS,
181     GST_STATIC_CAPS (CAPS)
182     );
183 
184 static GstStaticPadTemplate gst_adder_sink_template =
185 GST_STATIC_PAD_TEMPLATE ("sink_%u",
186     GST_PAD_SINK,
187     GST_PAD_REQUEST,
188     GST_STATIC_CAPS (CAPS)
189     );
190 
191 static void gst_adder_child_proxy_init (gpointer g_iface, gpointer iface_data);
192 
193 #define gst_adder_parent_class parent_class
194 G_DEFINE_TYPE_WITH_CODE (GstAdder, gst_adder, GST_TYPE_ELEMENT,
195     G_IMPLEMENT_INTERFACE (GST_TYPE_CHILD_PROXY, gst_adder_child_proxy_init));
196 
197 static void gst_adder_dispose (GObject * object);
198 static void gst_adder_set_property (GObject * object, guint prop_id,
199     const GValue * value, GParamSpec * pspec);
200 static void gst_adder_get_property (GObject * object, guint prop_id,
201     GValue * value, GParamSpec * pspec);
202 
203 static gboolean gst_adder_setcaps (GstAdder * adder, GstPad * pad,
204     GstCaps * caps);
205 static gboolean gst_adder_src_query (GstPad * pad, GstObject * parent,
206     GstQuery * query);
207 static gboolean gst_adder_sink_query (GstCollectPads * pads,
208     GstCollectData * pad, GstQuery * query, gpointer user_data);
209 static gboolean gst_adder_src_event (GstPad * pad, GstObject * parent,
210     GstEvent * event);
211 static gboolean gst_adder_sink_event (GstCollectPads * pads,
212     GstCollectData * pad, GstEvent * event, gpointer user_data);
213 
214 static GstPad *gst_adder_request_new_pad (GstElement * element,
215     GstPadTemplate * temp, const gchar * unused, const GstCaps * caps);
216 static void gst_adder_release_pad (GstElement * element, GstPad * pad);
217 
218 static GstStateChangeReturn gst_adder_change_state (GstElement * element,
219     GstStateChange transition);
220 
221 static GstFlowReturn gst_adder_do_clip (GstCollectPads * pads,
222     GstCollectData * data, GstBuffer * buffer, GstBuffer ** out,
223     gpointer user_data);
224 static GstFlowReturn gst_adder_collected (GstCollectPads * pads,
225     gpointer user_data);
226 
227 /* we can only accept caps that we and downstream can handle.
228  * if we have filtercaps set, use those to constrain the target caps.
229  */
230 static GstCaps *
gst_adder_sink_getcaps(GstPad * pad,GstCaps * filter)231 gst_adder_sink_getcaps (GstPad * pad, GstCaps * filter)
232 {
233   GstAdder *adder;
234   GstCaps *result, *peercaps, *current_caps, *filter_caps;
235   GstStructure *s;
236   gint i, n;
237 
238   adder = GST_ADDER (GST_PAD_PARENT (pad));
239 
240   GST_OBJECT_LOCK (adder);
241   /* take filter */
242   if ((filter_caps = adder->filter_caps)) {
243     if (filter)
244       filter_caps =
245           gst_caps_intersect_full (filter, filter_caps,
246           GST_CAPS_INTERSECT_FIRST);
247     else
248       gst_caps_ref (filter_caps);
249   } else {
250     filter_caps = filter ? gst_caps_ref (filter) : NULL;
251   }
252   GST_OBJECT_UNLOCK (adder);
253 
254   if (filter_caps && gst_caps_is_empty (filter_caps)) {
255     GST_WARNING_OBJECT (pad, "Empty filter caps");
256     return filter_caps;
257   }
258 
259   /* get the downstream possible caps */
260   peercaps = gst_pad_peer_query_caps (adder->srcpad, filter_caps);
261 
262   /* get the allowed caps on this sinkpad */
263   GST_OBJECT_LOCK (adder);
264   current_caps =
265       adder->current_caps ? gst_caps_ref (adder->current_caps) : NULL;
266   if (current_caps == NULL) {
267     current_caps = gst_pad_get_pad_template_caps (pad);
268     if (!current_caps)
269       current_caps = gst_caps_new_any ();
270   }
271   GST_OBJECT_UNLOCK (adder);
272 
273   if (peercaps) {
274     /* if the peer has caps, intersect */
275     GST_DEBUG_OBJECT (adder, "intersecting peer and our caps");
276     result =
277         gst_caps_intersect_full (peercaps, current_caps,
278         GST_CAPS_INTERSECT_FIRST);
279     gst_caps_unref (peercaps);
280     gst_caps_unref (current_caps);
281   } else {
282     /* the peer has no caps (or there is no peer), just use the allowed caps
283      * of this sinkpad. */
284     /* restrict with filter-caps if any */
285     if (filter_caps) {
286       GST_DEBUG_OBJECT (adder, "no peer caps, using filtered caps");
287       result =
288           gst_caps_intersect_full (filter_caps, current_caps,
289           GST_CAPS_INTERSECT_FIRST);
290       gst_caps_unref (current_caps);
291     } else {
292       GST_DEBUG_OBJECT (adder, "no peer caps, using our caps");
293       result = current_caps;
294     }
295   }
296 
297   result = gst_caps_make_writable (result);
298 
299   n = gst_caps_get_size (result);
300   for (i = 0; i < n; i++) {
301     GstStructure *sref;
302 
303     s = gst_caps_get_structure (result, i);
304     sref = gst_structure_copy (s);
305     gst_structure_set (sref, "channels", GST_TYPE_INT_RANGE, 0, 2, NULL);
306     if (gst_structure_is_subset (s, sref)) {
307       /* This field is irrelevant when in mono or stereo */
308       gst_structure_remove_field (s, "channel-mask");
309     }
310     gst_structure_free (sref);
311   }
312 
313   if (filter_caps)
314     gst_caps_unref (filter_caps);
315 
316   GST_LOG_OBJECT (adder, "getting caps on pad %p,%s to %" GST_PTR_FORMAT, pad,
317       GST_PAD_NAME (pad), result);
318 
319   return result;
320 }
321 
322 static gboolean
gst_adder_sink_query(GstCollectPads * pads,GstCollectData * pad,GstQuery * query,gpointer user_data)323 gst_adder_sink_query (GstCollectPads * pads, GstCollectData * pad,
324     GstQuery * query, gpointer user_data)
325 {
326   gboolean res = FALSE;
327 
328   switch (GST_QUERY_TYPE (query)) {
329     case GST_QUERY_CAPS:
330     {
331       GstCaps *filter, *caps;
332 
333       gst_query_parse_caps (query, &filter);
334       caps = gst_adder_sink_getcaps (pad->pad, filter);
335       gst_query_set_caps_result (query, caps);
336       gst_caps_unref (caps);
337       res = TRUE;
338       break;
339     }
340     default:
341       res = gst_collect_pads_query_default (pads, pad, query, FALSE);
342       break;
343   }
344 
345   return res;
346 }
347 
348 /* the first caps we receive on any of the sinkpads will define the caps for all
349  * the other sinkpads because we can only mix streams with the same caps.
350  */
351 static gboolean
gst_adder_setcaps(GstAdder * adder,GstPad * pad,GstCaps * orig_caps)352 gst_adder_setcaps (GstAdder * adder, GstPad * pad, GstCaps * orig_caps)
353 {
354   GstCaps *caps;
355   GstAudioInfo info;
356   GstStructure *s;
357   gint channels;
358 
359   caps = gst_caps_copy (orig_caps);
360 
361   s = gst_caps_get_structure (caps, 0);
362   if (gst_structure_get_int (s, "channels", &channels))
363     if (channels <= 2)
364       gst_structure_remove_field (s, "channel-mask");
365 
366   if (!gst_audio_info_from_caps (&info, caps))
367     goto invalid_format;
368 
369   GST_OBJECT_LOCK (adder);
370   /* don't allow reconfiguration for now; there's still a race between the
371    * different upstream threads doing query_caps + accept_caps + sending
372    * (possibly different) CAPS events, but there's not much we can do about
373    * that, upstream needs to deal with it. */
374   if (adder->current_caps != NULL) {
375     if (gst_audio_info_is_equal (&info, &adder->info)) {
376       GST_OBJECT_UNLOCK (adder);
377       gst_caps_unref (caps);
378       return TRUE;
379     } else {
380       GST_DEBUG_OBJECT (pad, "got input caps %" GST_PTR_FORMAT ", but "
381           "current caps are %" GST_PTR_FORMAT, caps, adder->current_caps);
382       GST_OBJECT_UNLOCK (adder);
383       gst_pad_push_event (pad, gst_event_new_reconfigure ());
384       gst_caps_unref (caps);
385       return FALSE;
386     }
387   }
388 
389   GST_INFO_OBJECT (pad, "setting caps to %" GST_PTR_FORMAT, caps);
390   adder->current_caps = gst_caps_ref (caps);
391 
392   memcpy (&adder->info, &info, sizeof (info));
393   GST_OBJECT_UNLOCK (adder);
394   /* send caps event later, after stream-start event */
395 
396   GST_INFO_OBJECT (pad, "handle caps change to %" GST_PTR_FORMAT, caps);
397 
398   gst_caps_unref (caps);
399 
400   return TRUE;
401 
402   /* ERRORS */
403 invalid_format:
404   {
405     gst_caps_unref (caps);
406     GST_WARNING_OBJECT (adder, "invalid format set as caps");
407     return FALSE;
408   }
409 }
410 
411 /* FIXME, the duration query should reflect how long you will produce
412  * data, that is the amount of stream time until you will emit EOS.
413  *
414  * For synchronized mixing this is always the max of all the durations
415  * of upstream since we emit EOS when all of them finished.
416  *
417  * We don't do synchronized mixing so this really depends on where the
418  * streams where punched in and what their relative offsets are against
419  * eachother which we can get from the first timestamps we see.
420  *
421  * When we add a new stream (or remove a stream) the duration might
422  * also become invalid again and we need to post a new DURATION
423  * message to notify this fact to the parent.
424  * For now we take the max of all the upstream elements so the simple
425  * cases work at least somewhat.
426  */
427 static gboolean
gst_adder_query_duration(GstAdder * adder,GstQuery * query)428 gst_adder_query_duration (GstAdder * adder, GstQuery * query)
429 {
430   gint64 max;
431   gboolean res;
432   GstFormat format;
433   GstIterator *it;
434   gboolean done;
435   GValue item = { 0, };
436 
437   /* parse format */
438   gst_query_parse_duration (query, &format, NULL);
439 
440   max = -1;
441   res = TRUE;
442   done = FALSE;
443 
444   it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (adder));
445   while (!done) {
446     GstIteratorResult ires;
447 
448     ires = gst_iterator_next (it, &item);
449     switch (ires) {
450       case GST_ITERATOR_DONE:
451         done = TRUE;
452         break;
453       case GST_ITERATOR_OK:
454       {
455         GstPad *pad = g_value_get_object (&item);
456         gint64 duration;
457 
458         /* ask sink peer for duration */
459         res &= gst_pad_peer_query_duration (pad, format, &duration);
460         /* take max from all valid return values */
461         if (res) {
462           /* valid unknown length, stop searching */
463           if (duration == -1) {
464             max = duration;
465             done = TRUE;
466           }
467           /* else see if bigger than current max */
468           else if (duration > max)
469             max = duration;
470         }
471         g_value_reset (&item);
472         break;
473       }
474       case GST_ITERATOR_RESYNC:
475         max = -1;
476         res = TRUE;
477         gst_iterator_resync (it);
478         break;
479       default:
480         res = FALSE;
481         done = TRUE;
482         break;
483     }
484   }
485   g_value_unset (&item);
486   gst_iterator_free (it);
487 
488   if (res) {
489     /* and store the max */
490     GST_DEBUG_OBJECT (adder, "Total duration in format %s: %"
491         GST_TIME_FORMAT, gst_format_get_name (format), GST_TIME_ARGS (max));
492     gst_query_set_duration (query, format, max);
493   }
494 
495   return res;
496 }
497 
498 static gboolean
gst_adder_src_query(GstPad * pad,GstObject * parent,GstQuery * query)499 gst_adder_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
500 {
501   GstAdder *adder = GST_ADDER (parent);
502   gboolean res = FALSE;
503 
504   switch (GST_QUERY_TYPE (query)) {
505     case GST_QUERY_POSITION:
506     {
507       GstFormat format;
508 
509       gst_query_parse_position (query, &format, NULL);
510 
511       switch (format) {
512         case GST_FORMAT_TIME:
513           /* FIXME, bring to stream time, might be tricky */
514           gst_query_set_position (query, format, adder->segment.position);
515           res = TRUE;
516           break;
517         case GST_FORMAT_DEFAULT:
518           gst_query_set_position (query, format, adder->offset);
519           res = TRUE;
520           break;
521         default:
522           break;
523       }
524       break;
525     }
526     case GST_QUERY_DURATION:
527       res = gst_adder_query_duration (adder, query);
528       break;
529     default:
530       /* FIXME, needs a custom query handler because we have multiple
531        * sinkpads */
532       res = gst_pad_query_default (pad, parent, query);
533       break;
534   }
535 
536   return res;
537 }
538 
539 /* event handling */
540 
541 typedef struct
542 {
543   GstEvent *event;
544   gboolean flush;
545 } EventData;
546 
547 static gboolean
forward_event_func(const GValue * val,GValue * ret,EventData * data)548 forward_event_func (const GValue * val, GValue * ret, EventData * data)
549 {
550   GstPad *pad = g_value_get_object (val);
551   GstEvent *event = data->event;
552   GstPad *peer;
553 
554   gst_event_ref (event);
555   GST_LOG_OBJECT (pad, "About to send event %s", GST_EVENT_TYPE_NAME (event));
556   peer = gst_pad_get_peer (pad);
557   /* collect pad might have been set flushing,
558    * so bypass core checking that and send directly to peer */
559   if (!peer || !gst_pad_send_event (peer, event)) {
560     if (!peer)
561       gst_event_unref (event);
562     GST_WARNING_OBJECT (pad, "Sending event  %p (%s) failed.",
563         event, GST_EVENT_TYPE_NAME (event));
564     /* quick hack to unflush the pads, ideally we need a way to just unflush
565      * this single collect pad */
566     if (data->flush)
567       gst_pad_send_event (pad, gst_event_new_flush_stop (TRUE));
568   } else {
569     g_value_set_boolean (ret, TRUE);
570     GST_LOG_OBJECT (pad, "Sent event  %p (%s).",
571         event, GST_EVENT_TYPE_NAME (event));
572   }
573   if (peer)
574     gst_object_unref (peer);
575 
576   /* continue on other pads, even if one failed */
577   return TRUE;
578 }
579 
580 /* forwards the event to all sinkpads, takes ownership of the
581  * event
582  *
583  * Returns: TRUE if the event could be forwarded on all
584  * sinkpads.
585  */
586 static gboolean
forward_event(GstAdder * adder,GstEvent * event,gboolean flush)587 forward_event (GstAdder * adder, GstEvent * event, gboolean flush)
588 {
589   gboolean ret;
590   GstIterator *it;
591   GstIteratorResult ires;
592   GValue vret = { 0 };
593   EventData data;
594 
595   GST_LOG_OBJECT (adder, "Forwarding event %p (%s)", event,
596       GST_EVENT_TYPE_NAME (event));
597 
598   data.event = event;
599   data.flush = flush;
600 
601   g_value_init (&vret, G_TYPE_BOOLEAN);
602   g_value_set_boolean (&vret, FALSE);
603   it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (adder));
604   while (TRUE) {
605     ires =
606         gst_iterator_fold (it, (GstIteratorFoldFunction) forward_event_func,
607         &vret, &data);
608     switch (ires) {
609       case GST_ITERATOR_RESYNC:
610         GST_WARNING ("resync");
611         gst_iterator_resync (it);
612         g_value_set_boolean (&vret, TRUE);
613         break;
614       case GST_ITERATOR_OK:
615       case GST_ITERATOR_DONE:
616         ret = g_value_get_boolean (&vret);
617         goto done;
618       default:
619         ret = FALSE;
620         goto done;
621     }
622   }
623 done:
624   gst_iterator_free (it);
625   GST_LOG_OBJECT (adder, "Forwarded event %p (%s), ret=%d", event,
626       GST_EVENT_TYPE_NAME (event), ret);
627   gst_event_unref (event);
628 
629   return ret;
630 }
631 
632 static gboolean
gst_adder_src_event(GstPad * pad,GstObject * parent,GstEvent * event)633 gst_adder_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
634 {
635   GstAdder *adder;
636   gboolean result;
637 
638   adder = GST_ADDER (parent);
639 
640   GST_DEBUG_OBJECT (pad, "Got %s event on src pad: %" GST_PTR_FORMAT,
641       GST_EVENT_TYPE_NAME (event), event);
642 
643   switch (GST_EVENT_TYPE (event)) {
644     case GST_EVENT_SEEK:
645     {
646       GstSeekFlags flags;
647       gdouble rate;
648       GstSeekType start_type, stop_type;
649       gint64 start, stop;
650       GstFormat seek_format, dest_format;
651       gboolean flush;
652 
653       /* parse the seek parameters */
654       gst_event_parse_seek (event, &rate, &seek_format, &flags, &start_type,
655           &start, &stop_type, &stop);
656 
657       if ((start_type != GST_SEEK_TYPE_NONE)
658           && (start_type != GST_SEEK_TYPE_SET)) {
659         result = FALSE;
660         GST_DEBUG_OBJECT (adder,
661             "seeking failed, unhandled seek type for start: %d", start_type);
662         goto done;
663       }
664       if ((stop_type != GST_SEEK_TYPE_NONE) && (stop_type != GST_SEEK_TYPE_SET)) {
665         result = FALSE;
666         GST_DEBUG_OBJECT (adder,
667             "seeking failed, unhandled seek type for end: %d", stop_type);
668         goto done;
669       }
670 
671       dest_format = adder->segment.format;
672       if (seek_format != dest_format) {
673         result = FALSE;
674         GST_DEBUG_OBJECT (adder,
675             "seeking failed, unhandled seek format: %d", seek_format);
676         goto done;
677       }
678 
679       flush = (flags & GST_SEEK_FLAG_FLUSH) == GST_SEEK_FLAG_FLUSH;
680 
681       /* check if we are flushing */
682       if (flush) {
683         /* flushing seek, start flush downstream, the flush will be done
684          * when all pads received a FLUSH_STOP.
685          * Make sure we accept nothing anymore and return WRONG_STATE.
686          * We send a flush-start before, to ensure no streaming is done
687          * as we need to take the stream lock.
688          */
689         gst_pad_push_event (adder->srcpad, gst_event_new_flush_start ());
690         gst_collect_pads_set_flushing (adder->collect, TRUE);
691 
692         /* We can't send FLUSH_STOP here since upstream could start pushing data
693          * after we unlock adder->collect.
694          * We set flush_stop_pending to TRUE instead and send FLUSH_STOP after
695          * forwarding the seek upstream or from gst_adder_collected,
696          * whichever happens first.
697          */
698         GST_COLLECT_PADS_STREAM_LOCK (adder->collect);
699         adder->flush_stop_pending = TRUE;
700         GST_COLLECT_PADS_STREAM_UNLOCK (adder->collect);
701         GST_DEBUG_OBJECT (adder, "mark pending flush stop event");
702       }
703       GST_DEBUG_OBJECT (adder, "handling seek event: %" GST_PTR_FORMAT, event);
704 
705       /* now wait for the collected to be finished and mark a new
706        * segment. After we have the lock, no collect function is running and no
707        * new collect function will be called for as long as we're flushing. */
708       GST_COLLECT_PADS_STREAM_LOCK (adder->collect);
709 
710       /* clip position and update our segment */
711       if (adder->segment.stop != -1) {
712         adder->segment.position = adder->segment.stop;
713       }
714       gst_segment_do_seek (&adder->segment, rate, seek_format, flags,
715           start_type, start, stop_type, stop, NULL);
716 
717       if (flush) {
718         /* Yes, we need to call _set_flushing again *WHEN* the streaming threads
719          * have stopped so that the cookie gets properly updated. */
720         gst_collect_pads_set_flushing (adder->collect, TRUE);
721       }
722       GST_COLLECT_PADS_STREAM_UNLOCK (adder->collect);
723       GST_DEBUG_OBJECT (adder, "forwarding seek event: %" GST_PTR_FORMAT,
724           event);
725       GST_DEBUG_OBJECT (adder, "updated segment: %" GST_SEGMENT_FORMAT,
726           &adder->segment);
727 
728       /* we're forwarding seek to all upstream peers and wait for one to reply
729        * with a newsegment-event before we send a newsegment-event downstream */
730       g_atomic_int_set (&adder->new_segment_pending, TRUE);
731       result = forward_event (adder, event, flush);
732       if (!result) {
733         /* seek failed. maybe source is a live source. */
734         GST_DEBUG_OBJECT (adder, "seeking failed");
735       }
736       if (g_atomic_int_compare_and_exchange (&adder->flush_stop_pending,
737               TRUE, FALSE)) {
738         GST_DEBUG_OBJECT (adder, "pending flush stop");
739         if (!gst_pad_push_event (adder->srcpad,
740                 gst_event_new_flush_stop (TRUE))) {
741           GST_WARNING_OBJECT (adder, "Sending flush stop event failed");
742         }
743       }
744       break;
745     }
746     case GST_EVENT_QOS:
747       /* QoS might be tricky */
748       result = FALSE;
749       gst_event_unref (event);
750       break;
751     case GST_EVENT_NAVIGATION:
752       /* navigation is rather pointless. */
753       result = FALSE;
754       gst_event_unref (event);
755       break;
756     default:
757       /* just forward the rest for now */
758       GST_DEBUG_OBJECT (adder, "forward unhandled event: %s",
759           GST_EVENT_TYPE_NAME (event));
760       result = forward_event (adder, event, FALSE);
761       break;
762   }
763 
764 done:
765 
766   return result;
767 }
768 
769 static gboolean
gst_adder_sink_event(GstCollectPads * pads,GstCollectData * pad,GstEvent * event,gpointer user_data)770 gst_adder_sink_event (GstCollectPads * pads, GstCollectData * pad,
771     GstEvent * event, gpointer user_data)
772 {
773   GstAdder *adder = GST_ADDER (user_data);
774   gboolean res = TRUE, discard = FALSE;
775 
776   GST_DEBUG_OBJECT (pad->pad, "Got %s event on sink pad",
777       GST_EVENT_TYPE_NAME (event));
778 
779   switch (GST_EVENT_TYPE (event)) {
780     case GST_EVENT_CAPS:
781     {
782       GstCaps *caps;
783 
784       gst_event_parse_caps (event, &caps);
785       res = gst_adder_setcaps (adder, pad->pad, caps);
786       gst_event_unref (event);
787       event = NULL;
788       break;
789     }
790     case GST_EVENT_FLUSH_START:
791       /* ensure that we will send a flush stop */
792       res = gst_collect_pads_event_default (pads, pad, event, discard);
793       event = NULL;
794       GST_COLLECT_PADS_STREAM_LOCK (adder->collect);
795       adder->flush_stop_pending = TRUE;
796       GST_COLLECT_PADS_STREAM_UNLOCK (adder->collect);
797       break;
798     case GST_EVENT_FLUSH_STOP:
799       /* we received a flush-stop. We will only forward it when
800        * flush_stop_pending is set, and we will unset it then.
801        */
802       g_atomic_int_set (&adder->new_segment_pending, TRUE);
803       GST_COLLECT_PADS_STREAM_LOCK (adder->collect);
804       if (adder->flush_stop_pending) {
805         GST_DEBUG_OBJECT (pad->pad, "forwarding flush stop");
806         res = gst_collect_pads_event_default (pads, pad, event, discard);
807         adder->flush_stop_pending = FALSE;
808         event = NULL;
809       } else {
810         discard = TRUE;
811         GST_DEBUG_OBJECT (pad->pad, "eating flush stop");
812       }
813       GST_COLLECT_PADS_STREAM_UNLOCK (adder->collect);
814       /* Clear pending tags */
815       if (adder->pending_events) {
816         g_list_foreach (adder->pending_events, (GFunc) gst_event_unref, NULL);
817         g_list_free (adder->pending_events);
818         adder->pending_events = NULL;
819       }
820       break;
821     case GST_EVENT_TAG:
822       /* collect tags here so we can push them out when we collect data */
823       adder->pending_events = g_list_append (adder->pending_events, event);
824       event = NULL;
825       break;
826     case GST_EVENT_SEGMENT:{
827       const GstSegment *segment;
828       gst_event_parse_segment (event, &segment);
829       if (segment->rate != adder->segment.rate) {
830         GST_ERROR_OBJECT (pad->pad,
831             "Got segment event with wrong rate %lf, expected %lf",
832             segment->rate, adder->segment.rate);
833         res = FALSE;
834         gst_event_unref (event);
835         event = NULL;
836       }
837       discard = TRUE;
838       break;
839     }
840     default:
841       break;
842   }
843 
844   if (G_LIKELY (event))
845     return gst_collect_pads_event_default (pads, pad, event, discard);
846   else
847     return res;
848 }
849 
850 static void
gst_adder_class_init(GstAdderClass * klass)851 gst_adder_class_init (GstAdderClass * klass)
852 {
853   GObjectClass *gobject_class = (GObjectClass *) klass;
854   GstElementClass *gstelement_class = (GstElementClass *) klass;
855 
856   gobject_class->set_property = gst_adder_set_property;
857   gobject_class->get_property = gst_adder_get_property;
858   gobject_class->dispose = gst_adder_dispose;
859 
860   g_object_class_install_property (gobject_class, PROP_FILTER_CAPS,
861       g_param_spec_boxed ("caps", "Target caps",
862           "Set target format for mixing (NULL means ANY). "
863           "Setting this property takes a reference to the supplied GstCaps "
864           "object.", GST_TYPE_CAPS,
865           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
866 
867   gst_element_class_add_static_pad_template (gstelement_class,
868       &gst_adder_src_template);
869   gst_element_class_add_static_pad_template (gstelement_class,
870       &gst_adder_sink_template);
871   gst_element_class_set_static_metadata (gstelement_class, "Adder",
872       "Generic/Audio", "Add N audio channels together",
873       "Thomas Vander Stichele <thomas at apestaart dot org>");
874 
875   gstelement_class->request_new_pad =
876       GST_DEBUG_FUNCPTR (gst_adder_request_new_pad);
877   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_adder_release_pad);
878   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_adder_change_state);
879 }
880 
881 static void
gst_adder_init(GstAdder * adder)882 gst_adder_init (GstAdder * adder)
883 {
884   GstPadTemplate *template;
885 
886   template = gst_static_pad_template_get (&gst_adder_src_template);
887   adder->srcpad = gst_pad_new_from_template (template, "src");
888   gst_object_unref (template);
889 
890   gst_pad_set_query_function (adder->srcpad,
891       GST_DEBUG_FUNCPTR (gst_adder_src_query));
892   gst_pad_set_event_function (adder->srcpad,
893       GST_DEBUG_FUNCPTR (gst_adder_src_event));
894   GST_PAD_SET_PROXY_CAPS (adder->srcpad);
895   gst_element_add_pad (GST_ELEMENT (adder), adder->srcpad);
896 
897   adder->current_caps = NULL;
898   gst_audio_info_init (&adder->info);
899   adder->padcount = 0;
900 
901   adder->filter_caps = NULL;
902 
903   /* keep track of the sinkpads requested */
904   adder->collect = gst_collect_pads_new ();
905   gst_collect_pads_set_function (adder->collect,
906       GST_DEBUG_FUNCPTR (gst_adder_collected), adder);
907   gst_collect_pads_set_clip_function (adder->collect,
908       GST_DEBUG_FUNCPTR (gst_adder_do_clip), adder);
909   gst_collect_pads_set_event_function (adder->collect,
910       GST_DEBUG_FUNCPTR (gst_adder_sink_event), adder);
911   gst_collect_pads_set_query_function (adder->collect,
912       GST_DEBUG_FUNCPTR (gst_adder_sink_query), adder);
913 }
914 
915 static void
gst_adder_dispose(GObject * object)916 gst_adder_dispose (GObject * object)
917 {
918   GstAdder *adder = GST_ADDER (object);
919 
920   if (adder->collect) {
921     gst_object_unref (adder->collect);
922     adder->collect = NULL;
923   }
924   gst_caps_replace (&adder->filter_caps, NULL);
925   gst_caps_replace (&adder->current_caps, NULL);
926 
927   if (adder->pending_events) {
928     g_list_foreach (adder->pending_events, (GFunc) gst_event_unref, NULL);
929     g_list_free (adder->pending_events);
930     adder->pending_events = NULL;
931   }
932 
933   G_OBJECT_CLASS (parent_class)->dispose (object);
934 }
935 
936 static void
gst_adder_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)937 gst_adder_set_property (GObject * object, guint prop_id,
938     const GValue * value, GParamSpec * pspec)
939 {
940   GstAdder *adder = GST_ADDER (object);
941 
942   switch (prop_id) {
943     case PROP_FILTER_CAPS:{
944       GstCaps *new_caps = NULL;
945       GstCaps *old_caps;
946       const GstCaps *new_caps_val = gst_value_get_caps (value);
947 
948       if (new_caps_val != NULL) {
949         new_caps = (GstCaps *) new_caps_val;
950         gst_caps_ref (new_caps);
951       }
952 
953       GST_OBJECT_LOCK (adder);
954       old_caps = adder->filter_caps;
955       adder->filter_caps = new_caps;
956       GST_OBJECT_UNLOCK (adder);
957 
958       if (old_caps)
959         gst_caps_unref (old_caps);
960 
961       GST_DEBUG_OBJECT (adder, "set new caps %" GST_PTR_FORMAT, new_caps);
962       break;
963     }
964     default:
965       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
966       break;
967   }
968 }
969 
970 static void
gst_adder_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)971 gst_adder_get_property (GObject * object, guint prop_id, GValue * value,
972     GParamSpec * pspec)
973 {
974   GstAdder *adder = GST_ADDER (object);
975 
976   switch (prop_id) {
977     case PROP_FILTER_CAPS:
978       GST_OBJECT_LOCK (adder);
979       gst_value_set_caps (value, adder->filter_caps);
980       GST_OBJECT_UNLOCK (adder);
981       break;
982     default:
983       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
984       break;
985   }
986 }
987 
988 
989 static GstPad *
gst_adder_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * unused,const GstCaps * caps)990 gst_adder_request_new_pad (GstElement * element, GstPadTemplate * templ,
991     const gchar * unused, const GstCaps * caps)
992 {
993   gchar *name;
994   GstAdder *adder;
995   GstPad *newpad;
996   gint padcount;
997 
998   if (templ->direction != GST_PAD_SINK)
999     goto not_sink;
1000 
1001   adder = GST_ADDER (element);
1002 
1003   /* increment pad counter */
1004   padcount = g_atomic_int_add (&adder->padcount, 1);
1005 
1006   name = g_strdup_printf ("sink_%u", padcount);
1007   newpad = g_object_new (GST_TYPE_ADDER_PAD, "name", name, "direction",
1008       templ->direction, "template", templ, NULL);
1009   GST_DEBUG_OBJECT (adder, "request new pad %s", name);
1010   g_free (name);
1011 
1012   gst_collect_pads_add_pad (adder->collect, newpad, sizeof (GstCollectData),
1013       NULL, TRUE);
1014 
1015   /* takes ownership of the pad */
1016   if (!gst_element_add_pad (GST_ELEMENT (adder), newpad))
1017     goto could_not_add;
1018 
1019   gst_child_proxy_child_added (GST_CHILD_PROXY (adder), G_OBJECT (newpad),
1020       GST_OBJECT_NAME (newpad));
1021 
1022   return newpad;
1023 
1024   /* errors */
1025 not_sink:
1026   {
1027     g_warning ("gstadder: request new pad that is not a SINK pad\n");
1028     return NULL;
1029   }
1030 could_not_add:
1031   {
1032     GST_DEBUG_OBJECT (adder, "could not add pad");
1033     gst_collect_pads_remove_pad (adder->collect, newpad);
1034     gst_object_unref (newpad);
1035     return NULL;
1036   }
1037 }
1038 
1039 static void
gst_adder_release_pad(GstElement * element,GstPad * pad)1040 gst_adder_release_pad (GstElement * element, GstPad * pad)
1041 {
1042   GstAdder *adder;
1043 
1044   adder = GST_ADDER (element);
1045 
1046   GST_DEBUG_OBJECT (adder, "release pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1047 
1048   gst_child_proxy_child_removed (GST_CHILD_PROXY (adder), G_OBJECT (pad),
1049       GST_OBJECT_NAME (pad));
1050   if (adder->collect)
1051     gst_collect_pads_remove_pad (adder->collect, pad);
1052   gst_element_remove_pad (element, pad);
1053 }
1054 
1055 static GstFlowReturn
gst_adder_do_clip(GstCollectPads * pads,GstCollectData * data,GstBuffer * buffer,GstBuffer ** out,gpointer user_data)1056 gst_adder_do_clip (GstCollectPads * pads, GstCollectData * data,
1057     GstBuffer * buffer, GstBuffer ** out, gpointer user_data)
1058 {
1059   GstAdder *adder = GST_ADDER (user_data);
1060   gint rate, bpf;
1061 
1062   rate = GST_AUDIO_INFO_RATE (&adder->info);
1063   bpf = GST_AUDIO_INFO_BPF (&adder->info);
1064 
1065   buffer = gst_audio_buffer_clip (buffer, &data->segment, rate, bpf);
1066 
1067   *out = buffer;
1068   return GST_FLOW_OK;
1069 }
1070 
1071 /*
1072  * gst_adder_collected:
1073  *
1074  * Combine audio streams by adding data values.
1075  * basic algorithm :
1076  * - this function is called when all pads have a buffer
1077  * - get available bytes on all pads.
1078  * - repeat for each input pad :
1079  *   - read available bytes, copy or add to target buffer
1080  *   - if there's an EOS event, remove the input channel
1081  * - push out the output buffer
1082  *
1083  * Note: this code will run in one of the upstream threads.
1084  *
1085  * TODO: it would be nice to have a mixing mode, instead of only adding
1086  * - for float we could downscale after collect loop
1087  * - for int we need to downscale each input to avoid clipping or
1088  *   mix into a temp (float) buffer and scale afterwards as well
1089  */
1090 static GstFlowReturn
gst_adder_collected(GstCollectPads * pads,gpointer user_data)1091 gst_adder_collected (GstCollectPads * pads, gpointer user_data)
1092 {
1093   GstAdder *adder;
1094   GSList *collected, *next = NULL;
1095   GstFlowReturn ret;
1096   GstBuffer *outbuf = NULL, *gapbuf = NULL;
1097   GstMapInfo outmap = { NULL };
1098   guint outsize;
1099   gint64 next_offset;
1100   gint64 next_timestamp;
1101   gint rate, bps, bpf;
1102   gboolean had_mute = FALSE;
1103   gboolean is_eos = TRUE;
1104   gboolean is_discont = FALSE;
1105 
1106   adder = GST_ADDER (user_data);
1107 
1108   /* this is fatal */
1109   if (G_UNLIKELY (adder->info.finfo->format == GST_AUDIO_FORMAT_UNKNOWN))
1110     goto not_negotiated;
1111 
1112   if (adder->flush_stop_pending) {
1113     GST_INFO_OBJECT (adder->srcpad, "send pending flush stop event");
1114     if (!gst_pad_push_event (adder->srcpad, gst_event_new_flush_stop (TRUE))) {
1115       GST_WARNING_OBJECT (adder->srcpad, "Sending flush stop event failed");
1116     }
1117 
1118     adder->flush_stop_pending = FALSE;
1119   }
1120 
1121   if (adder->send_stream_start) {
1122     gchar s_id[32];
1123     GstEvent *event;
1124 
1125     GST_INFO_OBJECT (adder->srcpad, "send pending stream start event");
1126     /* FIXME: create id based on input ids, we can't use
1127      * gst_pad_create_stream_id() though as that only handles 0..1 sink-pad
1128      */
1129     g_snprintf (s_id, sizeof (s_id), "adder-%08x", g_random_int ());
1130     event = gst_event_new_stream_start (s_id);
1131     gst_event_set_group_id (event, gst_util_group_id_next ());
1132 
1133     if (!gst_pad_push_event (adder->srcpad, event)) {
1134       GST_WARNING_OBJECT (adder->srcpad, "Sending stream start event failed");
1135     }
1136     adder->send_stream_start = FALSE;
1137   }
1138 
1139   if (adder->send_caps) {
1140     GstEvent *caps_event;
1141 
1142     caps_event = gst_event_new_caps (adder->current_caps);
1143     GST_INFO_OBJECT (adder->srcpad, "send pending caps event %" GST_PTR_FORMAT,
1144         caps_event);
1145     if (!gst_pad_push_event (adder->srcpad, caps_event)) {
1146       GST_WARNING_OBJECT (adder->srcpad, "Sending caps event failed");
1147     }
1148     adder->send_caps = FALSE;
1149   }
1150 
1151   rate = GST_AUDIO_INFO_RATE (&adder->info);
1152   bps = GST_AUDIO_INFO_BPS (&adder->info);
1153   bpf = GST_AUDIO_INFO_BPF (&adder->info);
1154 
1155   if (g_atomic_int_compare_and_exchange (&adder->new_segment_pending, TRUE,
1156           FALSE)) {
1157     GstEvent *event;
1158 
1159     /*
1160      * When seeking we set the start and stop positions as given in the seek
1161      * event. We also adjust offset & timestamp accordingly.
1162      * This basically ignores all newsegments sent by upstream.
1163      */
1164     event = gst_event_new_segment (&adder->segment);
1165     if (adder->segment.rate > 0.0) {
1166       adder->segment.position = adder->segment.start;
1167     } else {
1168       adder->segment.position = adder->segment.stop;
1169     }
1170     adder->offset = gst_util_uint64_scale (adder->segment.position,
1171         rate, GST_SECOND);
1172 
1173     GST_INFO_OBJECT (adder->srcpad, "sending pending new segment event %"
1174         GST_SEGMENT_FORMAT, &adder->segment);
1175     if (event) {
1176       if (!gst_pad_push_event (adder->srcpad, event)) {
1177         GST_WARNING_OBJECT (adder->srcpad, "Sending new segment event failed");
1178       }
1179     } else {
1180       GST_WARNING_OBJECT (adder->srcpad, "Creating new segment event for "
1181           "start:%" G_GINT64_FORMAT ", end:%" G_GINT64_FORMAT " failed",
1182           adder->segment.start, adder->segment.stop);
1183     }
1184     is_discont = TRUE;
1185   }
1186 
1187   /* get available bytes for reading, this can be 0 which could mean empty
1188    * buffers or EOS, which we will catch when we loop over the pads. */
1189   outsize = gst_collect_pads_available (pads);
1190 
1191   GST_LOG_OBJECT (adder,
1192       "starting to cycle through channels, %d bytes available (bps = %d, bpf = %d)",
1193       outsize, bps, bpf);
1194 
1195   for (collected = pads->data; collected; collected = next) {
1196     GstCollectData *collect_data;
1197     GstBuffer *inbuf;
1198     gboolean is_gap;
1199     GstAdderPad *pad;
1200     GstClockTime timestamp, stream_time;
1201 
1202     /* take next to see if this is the last collectdata */
1203     next = g_slist_next (collected);
1204 
1205     collect_data = (GstCollectData *) collected->data;
1206     pad = GST_ADDER_PAD (collect_data->pad);
1207 
1208     /* get a buffer of size bytes, if we get a buffer, it is at least outsize
1209      * bytes big. */
1210     inbuf = gst_collect_pads_take_buffer (pads, collect_data, outsize);
1211 
1212     if (!GST_COLLECT_PADS_STATE_IS_SET (collect_data,
1213             GST_COLLECT_PADS_STATE_EOS))
1214       is_eos = FALSE;
1215 
1216     /* NULL means EOS or an empty buffer so we still need to flush in
1217      * case of an empty buffer. */
1218     if (inbuf == NULL) {
1219       GST_LOG_OBJECT (adder, "channel %p: no bytes available", collect_data);
1220       continue;
1221     }
1222 
1223     timestamp = GST_BUFFER_TIMESTAMP (inbuf);
1224     stream_time =
1225         gst_segment_to_stream_time (&collect_data->segment, GST_FORMAT_TIME,
1226         timestamp);
1227 
1228     /* sync object properties on stream time */
1229     if (GST_CLOCK_TIME_IS_VALID (stream_time))
1230       gst_object_sync_values (GST_OBJECT (pad), stream_time);
1231 
1232     GST_OBJECT_LOCK (pad);
1233     if (pad->mute || pad->volume < G_MINDOUBLE) {
1234       had_mute = TRUE;
1235       GST_DEBUG_OBJECT (adder, "channel %p: skipping muted pad", collect_data);
1236       gst_buffer_unref (inbuf);
1237       GST_OBJECT_UNLOCK (pad);
1238       continue;
1239     }
1240 
1241     is_gap = GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_GAP);
1242 
1243     /* Try to make an output buffer */
1244     if (outbuf == NULL) {
1245       /* if this is a gap buffer but we have some more pads to check, skip it.
1246        * If we are at the last buffer, take it, regardless if it is a GAP
1247        * buffer or not. */
1248       if (is_gap && next) {
1249         GST_DEBUG_OBJECT (adder, "skipping, non-last GAP buffer");
1250         /* we keep the GAP buffer, if we don't have anymore buffers (all pads
1251          * EOS, we can use this one as the output buffer. */
1252         if (gapbuf == NULL)
1253           gapbuf = inbuf;
1254         else
1255           gst_buffer_unref (inbuf);
1256         GST_OBJECT_UNLOCK (pad);
1257         continue;
1258       }
1259 
1260       GST_LOG_OBJECT (adder, "channel %p: preparing output buffer of %d bytes",
1261           collect_data, outsize);
1262 
1263       /* make data and metadata writable, can simply return the inbuf when we
1264        * are the only one referencing this buffer. If this is the last (and
1265        * only) GAP buffer, it will automatically copy the GAP flag. */
1266       outbuf = gst_buffer_make_writable (inbuf);
1267       gst_buffer_map (outbuf, &outmap, GST_MAP_READWRITE);
1268 
1269       if (pad->volume != 1.0) {
1270         switch (adder->info.finfo->format) {
1271           case GST_AUDIO_FORMAT_U8:
1272             adder_orc_volume_u8 ((gpointer) outmap.data, pad->volume_i8,
1273                 outmap.size / bps);
1274             break;
1275           case GST_AUDIO_FORMAT_S8:
1276             adder_orc_volume_s8 ((gpointer) outmap.data, pad->volume_i8,
1277                 outmap.size / bps);
1278             break;
1279           case GST_AUDIO_FORMAT_U16:
1280             adder_orc_volume_u16 ((gpointer) outmap.data, pad->volume_i16,
1281                 outmap.size / bps);
1282             break;
1283           case GST_AUDIO_FORMAT_S16:
1284             adder_orc_volume_s16 ((gpointer) outmap.data, pad->volume_i16,
1285                 outmap.size / bps);
1286             break;
1287           case GST_AUDIO_FORMAT_U32:
1288             adder_orc_volume_u32 ((gpointer) outmap.data, pad->volume_i32,
1289                 outmap.size / bps);
1290             break;
1291           case GST_AUDIO_FORMAT_S32:
1292             adder_orc_volume_s32 ((gpointer) outmap.data, pad->volume_i32,
1293                 outmap.size / bps);
1294             break;
1295           case GST_AUDIO_FORMAT_F32:
1296             adder_orc_volume_f32 ((gpointer) outmap.data, pad->volume,
1297                 outmap.size / bps);
1298             break;
1299           case GST_AUDIO_FORMAT_F64:
1300             adder_orc_volume_f64 ((gpointer) outmap.data, pad->volume,
1301                 outmap.size / bps);
1302             break;
1303           default:
1304             g_assert_not_reached ();
1305             break;
1306         }
1307       }
1308     } else {
1309       if (!is_gap) {
1310         /* we had a previous output buffer, mix this non-GAP buffer */
1311         GstMapInfo inmap;
1312 
1313         gst_buffer_map (inbuf, &inmap, GST_MAP_READ);
1314 
1315         /* all buffers should have outsize, there are no short buffers because we
1316          * asked for the max size above */
1317         g_assert (inmap.size == outmap.size);
1318 
1319         GST_LOG_OBJECT (adder, "channel %p: mixing %" G_GSIZE_FORMAT " bytes"
1320             " from data %p", collect_data, inmap.size, inmap.data);
1321 
1322         /* further buffers, need to add them */
1323         if (pad->volume == 1.0) {
1324           switch (adder->info.finfo->format) {
1325             case GST_AUDIO_FORMAT_U8:
1326               adder_orc_add_u8 ((gpointer) outmap.data,
1327                   (gpointer) inmap.data, inmap.size / bps);
1328               break;
1329             case GST_AUDIO_FORMAT_S8:
1330               adder_orc_add_s8 ((gpointer) outmap.data,
1331                   (gpointer) inmap.data, inmap.size / bps);
1332               break;
1333             case GST_AUDIO_FORMAT_U16:
1334               adder_orc_add_u16 ((gpointer) outmap.data,
1335                   (gpointer) inmap.data, inmap.size / bps);
1336               break;
1337             case GST_AUDIO_FORMAT_S16:
1338               adder_orc_add_s16 ((gpointer) outmap.data,
1339                   (gpointer) inmap.data, inmap.size / bps);
1340               break;
1341             case GST_AUDIO_FORMAT_U32:
1342               adder_orc_add_u32 ((gpointer) outmap.data,
1343                   (gpointer) inmap.data, inmap.size / bps);
1344               break;
1345             case GST_AUDIO_FORMAT_S32:
1346               adder_orc_add_s32 ((gpointer) outmap.data,
1347                   (gpointer) inmap.data, inmap.size / bps);
1348               break;
1349             case GST_AUDIO_FORMAT_F32:
1350               adder_orc_add_f32 ((gpointer) outmap.data,
1351                   (gpointer) inmap.data, inmap.size / bps);
1352               break;
1353             case GST_AUDIO_FORMAT_F64:
1354               adder_orc_add_f64 ((gpointer) outmap.data,
1355                   (gpointer) inmap.data, inmap.size / bps);
1356               break;
1357             default:
1358               g_assert_not_reached ();
1359               break;
1360           }
1361         } else {
1362           switch (adder->info.finfo->format) {
1363             case GST_AUDIO_FORMAT_U8:
1364               adder_orc_add_volume_u8 ((gpointer) outmap.data,
1365                   (gpointer) inmap.data, pad->volume_i8, inmap.size / bps);
1366               break;
1367             case GST_AUDIO_FORMAT_S8:
1368               adder_orc_add_volume_s8 ((gpointer) outmap.data,
1369                   (gpointer) inmap.data, pad->volume_i8, inmap.size / bps);
1370               break;
1371             case GST_AUDIO_FORMAT_U16:
1372               adder_orc_add_volume_u16 ((gpointer) outmap.data,
1373                   (gpointer) inmap.data, pad->volume_i16, inmap.size / bps);
1374               break;
1375             case GST_AUDIO_FORMAT_S16:
1376               adder_orc_add_volume_s16 ((gpointer) outmap.data,
1377                   (gpointer) inmap.data, pad->volume_i16, inmap.size / bps);
1378               break;
1379             case GST_AUDIO_FORMAT_U32:
1380               adder_orc_add_volume_u32 ((gpointer) outmap.data,
1381                   (gpointer) inmap.data, pad->volume_i32, inmap.size / bps);
1382               break;
1383             case GST_AUDIO_FORMAT_S32:
1384               adder_orc_add_volume_s32 ((gpointer) outmap.data,
1385                   (gpointer) inmap.data, pad->volume_i32, inmap.size / bps);
1386               break;
1387             case GST_AUDIO_FORMAT_F32:
1388               adder_orc_add_volume_f32 ((gpointer) outmap.data,
1389                   (gpointer) inmap.data, pad->volume, inmap.size / bps);
1390               break;
1391             case GST_AUDIO_FORMAT_F64:
1392               adder_orc_add_volume_f64 ((gpointer) outmap.data,
1393                   (gpointer) inmap.data, pad->volume, inmap.size / bps);
1394               break;
1395             default:
1396               g_assert_not_reached ();
1397               break;
1398           }
1399         }
1400         gst_buffer_unmap (inbuf, &inmap);
1401       } else {
1402         /* skip gap buffer */
1403         GST_LOG_OBJECT (adder, "channel %p: skipping GAP buffer", collect_data);
1404       }
1405       gst_buffer_unref (inbuf);
1406     }
1407     GST_OBJECT_UNLOCK (pad);
1408   }
1409 
1410   if (outbuf)
1411     gst_buffer_unmap (outbuf, &outmap);
1412 
1413   if (is_eos)
1414     goto eos;
1415 
1416   if (outbuf == NULL) {
1417     /* no output buffer, reuse one of the GAP buffers then if we have one */
1418     if (gapbuf) {
1419       GST_LOG_OBJECT (adder, "reusing GAP buffer %p", gapbuf);
1420       outbuf = gapbuf;
1421     } else if (had_mute) {
1422       GstMapInfo map;
1423 
1424       /* Means we had all pads muted, create some silence */
1425       outbuf = gst_buffer_new_allocate (NULL, outsize, NULL);
1426       gst_buffer_map (outbuf, &map, GST_MAP_WRITE);
1427       gst_audio_format_fill_silence (adder->info.finfo, map.data, outsize);
1428       gst_buffer_unmap (outbuf, &map);
1429       GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_GAP);
1430     } else {
1431       /* assume EOS otherwise, this should not happen, really */
1432       goto eos;
1433     }
1434   } else if (gapbuf) {
1435     /* we had an output buffer, unref the gapbuffer we kept */
1436     gst_buffer_unref (gapbuf);
1437   }
1438 
1439   if (G_UNLIKELY (adder->pending_events)) {
1440     GList *tmp = adder->pending_events;
1441 
1442     while (tmp) {
1443       GstEvent *ev = (GstEvent *) tmp->data;
1444 
1445       gst_pad_push_event (adder->srcpad, ev);
1446       tmp = g_list_next (tmp);
1447     }
1448     g_list_free (adder->pending_events);
1449     adder->pending_events = NULL;
1450   }
1451 
1452   /* for the next timestamp, use the sample counter, which will
1453    * never accumulate rounding errors */
1454   if (adder->segment.rate > 0.0) {
1455     next_offset = adder->offset + outsize / bpf;
1456   } else {
1457     next_offset = adder->offset - outsize / bpf;
1458   }
1459   next_timestamp = gst_util_uint64_scale (next_offset, GST_SECOND, rate);
1460 
1461   /* set timestamps on the output buffer */
1462   GST_BUFFER_DTS (outbuf) = GST_CLOCK_TIME_NONE;
1463   if (adder->segment.rate > 0.0) {
1464     GST_BUFFER_PTS (outbuf) = adder->segment.position;
1465     GST_BUFFER_OFFSET (outbuf) = adder->offset;
1466     GST_BUFFER_OFFSET_END (outbuf) = next_offset;
1467     GST_BUFFER_DURATION (outbuf) = next_timestamp - adder->segment.position;
1468   } else {
1469     GST_BUFFER_PTS (outbuf) = next_timestamp;
1470     GST_BUFFER_OFFSET (outbuf) = next_offset;
1471     GST_BUFFER_OFFSET_END (outbuf) = adder->offset;
1472     GST_BUFFER_DURATION (outbuf) = adder->segment.position - next_timestamp;
1473   }
1474   if (is_discont) {
1475     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
1476   } else {
1477     GST_BUFFER_FLAG_UNSET (outbuf, GST_BUFFER_FLAG_DISCONT);
1478   }
1479 
1480   adder->offset = next_offset;
1481   adder->segment.position = next_timestamp;
1482 
1483   /* send it out */
1484   GST_LOG_OBJECT (adder, "pushing outbuf %p, timestamp %" GST_TIME_FORMAT
1485       " offset %" G_GINT64_FORMAT, outbuf,
1486       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
1487       GST_BUFFER_OFFSET (outbuf));
1488   ret = gst_pad_push (adder->srcpad, outbuf);
1489 
1490   GST_LOG_OBJECT (adder, "pushed outbuf, result = %s", gst_flow_get_name (ret));
1491 
1492   return ret;
1493 
1494   /* ERRORS */
1495 not_negotiated:
1496   {
1497     GST_ELEMENT_ERROR (adder, STREAM, FORMAT, (NULL),
1498         ("Unknown data received, not negotiated"));
1499     return GST_FLOW_NOT_NEGOTIATED;
1500   }
1501 eos:
1502   {
1503     GST_DEBUG_OBJECT (adder, "no data available, must be EOS");
1504     gst_pad_push_event (adder->srcpad, gst_event_new_eos ());
1505     return GST_FLOW_EOS;
1506   }
1507 }
1508 
1509 static GstStateChangeReturn
gst_adder_change_state(GstElement * element,GstStateChange transition)1510 gst_adder_change_state (GstElement * element, GstStateChange transition)
1511 {
1512   GstAdder *adder;
1513   GstStateChangeReturn ret;
1514 
1515   adder = GST_ADDER (element);
1516 
1517   switch (transition) {
1518     case GST_STATE_CHANGE_NULL_TO_READY:
1519       break;
1520     case GST_STATE_CHANGE_READY_TO_PAUSED:
1521       adder->offset = 0;
1522       adder->flush_stop_pending = FALSE;
1523       adder->new_segment_pending = TRUE;
1524       adder->send_stream_start = TRUE;
1525       adder->send_caps = TRUE;
1526       gst_caps_replace (&adder->current_caps, NULL);
1527       gst_segment_init (&adder->segment, GST_FORMAT_TIME);
1528       gst_collect_pads_start (adder->collect);
1529       break;
1530     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1531       break;
1532     case GST_STATE_CHANGE_PAUSED_TO_READY:
1533       /* need to unblock the collectpads before calling the
1534        * parent change_state so that streaming can finish */
1535       gst_collect_pads_stop (adder->collect);
1536       break;
1537     default:
1538       break;
1539   }
1540 
1541   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1542 
1543   switch (transition) {
1544     default:
1545       break;
1546   }
1547 
1548   return ret;
1549 }
1550 
1551 /* GstChildProxy implementation */
1552 static GObject *
gst_adder_child_proxy_get_child_by_index(GstChildProxy * child_proxy,guint index)1553 gst_adder_child_proxy_get_child_by_index (GstChildProxy * child_proxy,
1554     guint index)
1555 {
1556   GstAdder *adder = GST_ADDER (child_proxy);
1557   GObject *obj = NULL;
1558 
1559   GST_OBJECT_LOCK (adder);
1560   obj = g_list_nth_data (GST_ELEMENT_CAST (adder)->sinkpads, index);
1561   if (obj)
1562     gst_object_ref (obj);
1563   GST_OBJECT_UNLOCK (adder);
1564   return obj;
1565 }
1566 
1567 static guint
gst_adder_child_proxy_get_children_count(GstChildProxy * child_proxy)1568 gst_adder_child_proxy_get_children_count (GstChildProxy * child_proxy)
1569 {
1570   guint count = 0;
1571   GstAdder *adder = GST_ADDER (child_proxy);
1572 
1573   GST_OBJECT_LOCK (adder);
1574   count = GST_ELEMENT_CAST (adder)->numsinkpads;
1575   GST_OBJECT_UNLOCK (adder);
1576   GST_INFO_OBJECT (adder, "Children Count: %d", count);
1577   return count;
1578 }
1579 
1580 static void
gst_adder_child_proxy_init(gpointer g_iface,gpointer iface_data)1581 gst_adder_child_proxy_init (gpointer g_iface, gpointer iface_data)
1582 {
1583   GstChildProxyInterface *iface = g_iface;
1584 
1585   GST_INFO ("intializing child proxy interface");
1586   iface->get_child_by_index = gst_adder_child_proxy_get_child_by_index;
1587   iface->get_children_count = gst_adder_child_proxy_get_children_count;
1588 }
1589 
1590 static gboolean
plugin_init(GstPlugin * plugin)1591 plugin_init (GstPlugin * plugin)
1592 {
1593   GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "adder", 0,
1594       "audio channel mixing element");
1595 
1596   if (!gst_element_register (plugin, "adder", GST_RANK_NONE, GST_TYPE_ADDER)) {
1597     return FALSE;
1598   }
1599 
1600   return TRUE;
1601 }
1602 
1603 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
1604     GST_VERSION_MINOR,
1605     adder,
1606     "Adds multiple streams",
1607     plugin_init, VERSION, "LGPL", GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN)
1608