1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2001 Thomas <thomas@apestaart.org>
4  *               2005,2006 Wim Taymans <wim@fluendo.com>
5  *                    2013 Sebastian Dröge <sebastian@centricular.com>
6  *                    2014 Collabora
7  *                             Olivier Crete <olivier.crete@collabora.com>
8  *
9  * gstaudioaggregator.c:
10  *
11  * This library is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Library General Public
13  * License as published by the Free Software Foundation; either
14  * version 2 of the License, or (at your option) any later version.
15  *
16  * This library is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * Library General Public License for more details.
20  *
21  * You should have received a copy of the GNU Library General Public
22  * License along with this library; if not, write to the
23  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24  * Boston, MA 02110-1301, USA.
25  */
26 /**
27  * SECTION: gstaudioaggregator
28  * @title: GstAudioAggregator
29  * @short_description: Base class that manages a set of audio input pads
30  * with the purpose of aggregating or mixing their raw audio input buffers
31  * @see_also: #GstAggregator, #GstAudioMixer
32  *
33  * Subclasses must use (a subclass of) #GstAudioAggregatorPad for both
34  * their source and sink pads,
35  * gst_element_class_add_static_pad_template_with_gtype() is a convenient
36  * helper.
37  *
38  * #GstAudioAggregator can perform conversion on the data arriving
39  * on its sink pads, based on the format expected downstream: in order
40  * to enable that behaviour, the GType of the sink pads must either be
41  * a (subclass of) #GstAudioAggregatorConvertPad to use the default
42  * #GstAudioConverter implementation, or a subclass of #GstAudioAggregatorPad
43  * implementing #GstAudioAggregatorPadClass.convert_buffer.
44  *
45  * To allow for the output caps to change, the mechanism is the same as
46  * above, with the GType of the source pad.
47  *
48  * See #GstAudioMixer for an example.
49  *
50  * When conversion is enabled, #GstAudioAggregator will accept
51  * any type of raw audio caps and perform conversion
52  * on the data arriving on its sink pads, with whatever downstream
53  * expects as the target format.
54  *
55  * In case downstream caps are not fully fixated, it will use
56  * the first configured sink pad to finish fixating its source pad
57  * caps.
58  *
59  * A notable exception for now is the sample rate, sink pads must
60  * have the same sample rate as either the downstream requirement,
61  * or the first configured pad, or a combination of both (when
62  * downstream specifies a range or a set of acceptable rates).
63  */
64 
65 
66 #ifdef HAVE_CONFIG_H
67 #  include "config.h"
68 #endif
69 
70 #include "gstaudioaggregator.h"
71 
72 #include <string.h>
73 
74 GST_DEBUG_CATEGORY_STATIC (audio_aggregator_debug);
75 #define GST_CAT_DEFAULT audio_aggregator_debug
76 
77 struct _GstAudioAggregatorPadPrivate
78 {
79   /* All members are protected by the pad object lock */
80 
81   GstBuffer *buffer;            /* current buffer we're mixing, for
82                                    comparison with a new input buffer from
83                                    aggregator to see if we need to update our
84                                    cached values. */
85 
86   guint position, size;         /* position in the input buffer and size of the
87                                    input buffer in number of samples */
88 
89   GstBuffer *input_buffer;
90 
91   guint64 output_offset;        /* Sample offset in output segment relative to
92                                    pad.segment.start that position refers to
93                                    in the current buffer. */
94 
95   guint64 next_offset;          /* Next expected sample offset relative to
96                                    pad.segment.start */
97 
98   /* Last time we noticed a discont */
99   GstClockTime discont_time;
100 
101   /* A new unhandled segment event has been received */
102   gboolean new_segment;
103 };
104 
105 
106 /*****************************************
107  * GstAudioAggregatorPad implementation  *
108  *****************************************/
109 G_DEFINE_TYPE_WITH_PRIVATE (GstAudioAggregatorPad, gst_audio_aggregator_pad,
110     GST_TYPE_AGGREGATOR_PAD);
111 
112 enum
113 {
114   PROP_PAD_0,
115   PROP_PAD_CONVERTER_CONFIG,
116 };
117 
118 static GstFlowReturn
119 gst_audio_aggregator_pad_flush_pad (GstAggregatorPad * aggpad,
120     GstAggregator * aggregator);
121 
122 static void
gst_audio_aggregator_pad_finalize(GObject * object)123 gst_audio_aggregator_pad_finalize (GObject * object)
124 {
125   GstAudioAggregatorPad *pad = (GstAudioAggregatorPad *) object;
126 
127   gst_buffer_replace (&pad->priv->buffer, NULL);
128   gst_buffer_replace (&pad->priv->input_buffer, NULL);
129 
130   G_OBJECT_CLASS (gst_audio_aggregator_pad_parent_class)->finalize (object);
131 }
132 
133 static void
gst_audio_aggregator_pad_class_init(GstAudioAggregatorPadClass * klass)134 gst_audio_aggregator_pad_class_init (GstAudioAggregatorPadClass * klass)
135 {
136   GObjectClass *gobject_class = (GObjectClass *) klass;
137   GstAggregatorPadClass *aggpadclass = (GstAggregatorPadClass *) klass;
138 
139   gobject_class->finalize = gst_audio_aggregator_pad_finalize;
140   aggpadclass->flush = GST_DEBUG_FUNCPTR (gst_audio_aggregator_pad_flush_pad);
141 }
142 
143 static void
gst_audio_aggregator_pad_init(GstAudioAggregatorPad * pad)144 gst_audio_aggregator_pad_init (GstAudioAggregatorPad * pad)
145 {
146   pad->priv = gst_audio_aggregator_pad_get_instance_private (pad);
147 
148   gst_audio_info_init (&pad->info);
149 
150   pad->priv->buffer = NULL;
151   pad->priv->input_buffer = NULL;
152   pad->priv->position = 0;
153   pad->priv->size = 0;
154   pad->priv->output_offset = -1;
155   pad->priv->next_offset = -1;
156   pad->priv->discont_time = GST_CLOCK_TIME_NONE;
157 }
158 
159 
160 static GstFlowReturn
gst_audio_aggregator_pad_flush_pad(GstAggregatorPad * aggpad,GstAggregator * aggregator)161 gst_audio_aggregator_pad_flush_pad (GstAggregatorPad * aggpad,
162     GstAggregator * aggregator)
163 {
164   GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
165 
166   GST_OBJECT_LOCK (aggpad);
167   pad->priv->position = pad->priv->size = 0;
168   pad->priv->output_offset = pad->priv->next_offset = -1;
169   pad->priv->discont_time = GST_CLOCK_TIME_NONE;
170   gst_buffer_replace (&pad->priv->buffer, NULL);
171   gst_buffer_replace (&pad->priv->input_buffer, NULL);
172   GST_OBJECT_UNLOCK (aggpad);
173 
174   return GST_FLOW_OK;
175 }
176 
177 struct _GstAudioAggregatorConvertPadPrivate
178 {
179   /* All members are protected by the pad object lock */
180   GstAudioConverter *converter;
181   GstStructure *converter_config;
182   gboolean converter_config_changed;
183 };
184 
185 
186 G_DEFINE_TYPE_WITH_PRIVATE (GstAudioAggregatorConvertPad,
187     gst_audio_aggregator_convert_pad, GST_TYPE_AUDIO_AGGREGATOR_PAD);
188 
189 static void
gst_audio_aggregator_convert_pad_update_converter(GstAudioAggregatorConvertPad * aaggcpad,GstAudioInfo * in_info,GstAudioInfo * out_info)190 gst_audio_aggregator_convert_pad_update_converter (GstAudioAggregatorConvertPad
191     * aaggcpad, GstAudioInfo * in_info, GstAudioInfo * out_info)
192 {
193   if (!aaggcpad->priv->converter_config_changed)
194     return;
195 
196   if (aaggcpad->priv->converter) {
197     gst_audio_converter_free (aaggcpad->priv->converter);
198     aaggcpad->priv->converter = NULL;
199   }
200 
201   if (gst_audio_info_is_equal (in_info, out_info) ||
202       in_info->finfo->format == GST_AUDIO_FORMAT_UNKNOWN) {
203     if (aaggcpad->priv->converter) {
204       gst_audio_converter_free (aaggcpad->priv->converter);
205       aaggcpad->priv->converter = NULL;
206     }
207   } else {
208     /* If we haven't received caps yet, this pad should not have
209      * a buffer to convert anyway */
210     aaggcpad->priv->converter =
211         gst_audio_converter_new (GST_AUDIO_CONVERTER_FLAG_NONE,
212         in_info, out_info,
213         aaggcpad->priv->converter_config ? gst_structure_copy (aaggcpad->
214             priv->converter_config) : NULL);
215   }
216 
217   aaggcpad->priv->converter_config_changed = FALSE;
218 }
219 
220 static void
gst_audio_aggregator_pad_update_conversion_info(GstAudioAggregatorPad * aaggpad)221 gst_audio_aggregator_pad_update_conversion_info (GstAudioAggregatorPad *
222     aaggpad)
223 {
224   GST_AUDIO_AGGREGATOR_CONVERT_PAD (aaggpad)->priv->converter_config_changed =
225       TRUE;
226 }
227 
228 static GstBuffer *
gst_audio_aggregator_convert_pad_convert_buffer(GstAudioAggregatorPad * aaggpad,GstAudioInfo * in_info,GstAudioInfo * out_info,GstBuffer * input_buffer)229 gst_audio_aggregator_convert_pad_convert_buffer (GstAudioAggregatorPad *
230     aaggpad, GstAudioInfo * in_info, GstAudioInfo * out_info,
231     GstBuffer * input_buffer)
232 {
233   GstBuffer *res;
234   GstAudioAggregatorConvertPad *aaggcpad =
235       GST_AUDIO_AGGREGATOR_CONVERT_PAD (aaggpad);
236 
237   gst_audio_aggregator_convert_pad_update_converter (aaggcpad, in_info,
238       out_info);
239 
240   if (aaggcpad->priv->converter) {
241     gint insize = gst_buffer_get_size (input_buffer);
242     gsize insamples = insize / in_info->bpf;
243     gsize outsamples =
244         gst_audio_converter_get_out_frames (aaggcpad->priv->converter,
245         insamples);
246     gint outsize = outsamples * out_info->bpf;
247     GstMapInfo inmap, outmap;
248 
249     res = gst_buffer_new_allocate (NULL, outsize, NULL);
250 
251     /* We create a perfectly similar buffer, except obviously for
252      * its converted contents */
253     gst_buffer_copy_into (res, input_buffer,
254         GST_BUFFER_COPY_FLAGS | GST_BUFFER_COPY_TIMESTAMPS |
255         GST_BUFFER_COPY_META, 0, -1);
256 
257     gst_buffer_map (input_buffer, &inmap, GST_MAP_READ);
258     gst_buffer_map (res, &outmap, GST_MAP_WRITE);
259 
260     gst_audio_converter_samples (aaggcpad->priv->converter,
261         GST_AUDIO_CONVERTER_FLAG_NONE,
262         (gpointer *) & inmap.data, insamples,
263         (gpointer *) & outmap.data, outsamples);
264 
265     gst_buffer_unmap (input_buffer, &inmap);
266     gst_buffer_unmap (res, &outmap);
267   } else {
268     res = gst_buffer_ref (input_buffer);
269   }
270 
271   return res;
272 }
273 
274 static void
gst_audio_aggregator_convert_pad_finalize(GObject * object)275 gst_audio_aggregator_convert_pad_finalize (GObject * object)
276 {
277   GstAudioAggregatorConvertPad *pad = (GstAudioAggregatorConvertPad *) object;
278 
279   if (pad->priv->converter)
280     gst_audio_converter_free (pad->priv->converter);
281 
282   if (pad->priv->converter_config)
283     gst_structure_free (pad->priv->converter_config);
284 
285   G_OBJECT_CLASS (gst_audio_aggregator_convert_pad_parent_class)->finalize
286       (object);
287 }
288 
289 static void
gst_audio_aggregator_convert_pad_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)290 gst_audio_aggregator_convert_pad_get_property (GObject * object, guint prop_id,
291     GValue * value, GParamSpec * pspec)
292 {
293   GstAudioAggregatorConvertPad *pad = GST_AUDIO_AGGREGATOR_CONVERT_PAD (object);
294 
295   switch (prop_id) {
296     case PROP_PAD_CONVERTER_CONFIG:
297       GST_OBJECT_LOCK (pad);
298       if (pad->priv->converter_config)
299         g_value_set_boxed (value, pad->priv->converter_config);
300       GST_OBJECT_UNLOCK (pad);
301       break;
302     default:
303       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
304       break;
305   }
306 }
307 
308 static void
gst_audio_aggregator_convert_pad_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)309 gst_audio_aggregator_convert_pad_set_property (GObject * object, guint prop_id,
310     const GValue * value, GParamSpec * pspec)
311 {
312   GstAudioAggregatorConvertPad *pad = GST_AUDIO_AGGREGATOR_CONVERT_PAD (object);
313 
314   switch (prop_id) {
315     case PROP_PAD_CONVERTER_CONFIG:
316       GST_OBJECT_LOCK (pad);
317       if (pad->priv->converter_config)
318         gst_structure_free (pad->priv->converter_config);
319       pad->priv->converter_config = g_value_dup_boxed (value);
320       pad->priv->converter_config_changed = TRUE;
321       GST_OBJECT_UNLOCK (pad);
322       break;
323     default:
324       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
325       break;
326   }
327 }
328 
329 static void
gst_audio_aggregator_convert_pad_class_init(GstAudioAggregatorConvertPadClass * klass)330 gst_audio_aggregator_convert_pad_class_init (GstAudioAggregatorConvertPadClass *
331     klass)
332 {
333   GObjectClass *gobject_class = (GObjectClass *) klass;
334   GstAudioAggregatorPadClass *aaggpad_class =
335       (GstAudioAggregatorPadClass *) klass;
336 
337   gobject_class->set_property = gst_audio_aggregator_convert_pad_set_property;
338   gobject_class->get_property = gst_audio_aggregator_convert_pad_get_property;
339 
340   g_object_class_install_property (gobject_class, PROP_PAD_CONVERTER_CONFIG,
341       g_param_spec_boxed ("converter-config", "Converter configuration",
342           "A GstStructure describing the configuration that should be used "
343           "when converting this pad's audio buffers",
344           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
345 
346   aaggpad_class->convert_buffer =
347       gst_audio_aggregator_convert_pad_convert_buffer;
348 
349   aaggpad_class->update_conversion_info =
350       gst_audio_aggregator_pad_update_conversion_info;
351 
352   gobject_class->finalize = gst_audio_aggregator_convert_pad_finalize;
353 }
354 
355 static void
gst_audio_aggregator_convert_pad_init(GstAudioAggregatorConvertPad * pad)356 gst_audio_aggregator_convert_pad_init (GstAudioAggregatorConvertPad * pad)
357 {
358   pad->priv = gst_audio_aggregator_convert_pad_get_instance_private (pad);
359 }
360 
361 /**************************************
362  * GstAudioAggregator implementation  *
363  **************************************/
364 
365 struct _GstAudioAggregatorPrivate
366 {
367   GMutex mutex;
368 
369   /* All three properties are unprotected, can't be modified while streaming */
370   /* Size in frames that is output per buffer */
371   GstClockTime output_buffer_duration;
372   GstClockTime alignment_threshold;
373   GstClockTime discont_wait;
374 
375   /* Protected by srcpad stream clock */
376   /* Output buffer starting at offset containing blocksize frames (calculated
377    * from output_buffer_duration) */
378   GstBuffer *current_buffer;
379 
380   /* counters to keep track of timestamps */
381   /* Readable with object lock, writable with both aag lock and object lock */
382 
383   /* Sample offset starting from 0 at aggregator.segment.start */
384   gint64 offset;
385 };
386 
387 #define GST_AUDIO_AGGREGATOR_LOCK(self)   g_mutex_lock (&(self)->priv->mutex);
388 #define GST_AUDIO_AGGREGATOR_UNLOCK(self) g_mutex_unlock (&(self)->priv->mutex);
389 
390 static void gst_audio_aggregator_set_property (GObject * object, guint prop_id,
391     const GValue * value, GParamSpec * pspec);
392 static void gst_audio_aggregator_get_property (GObject * object, guint prop_id,
393     GValue * value, GParamSpec * pspec);
394 static void gst_audio_aggregator_dispose (GObject * object);
395 
396 static gboolean gst_audio_aggregator_src_event (GstAggregator * agg,
397     GstEvent * event);
398 static gboolean gst_audio_aggregator_sink_event (GstAggregator * agg,
399     GstAggregatorPad * aggpad, GstEvent * event);
400 static gboolean gst_audio_aggregator_src_query (GstAggregator * agg,
401     GstQuery * query);
402 static gboolean
403 gst_audio_aggregator_sink_query (GstAggregator * agg, GstAggregatorPad * aggpad,
404     GstQuery * query);
405 static gboolean gst_audio_aggregator_start (GstAggregator * agg);
406 static gboolean gst_audio_aggregator_stop (GstAggregator * agg);
407 static GstFlowReturn gst_audio_aggregator_flush (GstAggregator * agg);
408 
409 static GstBuffer *gst_audio_aggregator_create_output_buffer (GstAudioAggregator
410     * aagg, guint num_frames);
411 static GstBuffer *gst_audio_aggregator_do_clip (GstAggregator * agg,
412     GstAggregatorPad * bpad, GstBuffer * buffer);
413 static GstFlowReturn gst_audio_aggregator_aggregate (GstAggregator * agg,
414     gboolean timeout);
415 static gboolean sync_pad_values (GstElement * aagg, GstPad * pad, gpointer ud);
416 static gboolean gst_audio_aggregator_negotiated_src_caps (GstAggregator * agg,
417     GstCaps * caps);
418 static GstFlowReturn
419 gst_audio_aggregator_update_src_caps (GstAggregator * agg,
420     GstCaps * caps, GstCaps ** ret);
421 static GstCaps *gst_audio_aggregator_fixate_src_caps (GstAggregator * agg,
422     GstCaps * caps);
423 
424 #define DEFAULT_OUTPUT_BUFFER_DURATION (10 * GST_MSECOND)
425 #define DEFAULT_ALIGNMENT_THRESHOLD   (40 * GST_MSECOND)
426 #define DEFAULT_DISCONT_WAIT (1 * GST_SECOND)
427 
428 enum
429 {
430   PROP_0,
431   PROP_OUTPUT_BUFFER_DURATION,
432   PROP_ALIGNMENT_THRESHOLD,
433   PROP_DISCONT_WAIT,
434 };
435 
436 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GstAudioAggregator, gst_audio_aggregator,
437     GST_TYPE_AGGREGATOR);
438 
439 static GstBuffer *
gst_audio_aggregator_convert_buffer(GstAudioAggregator * aagg,GstPad * pad,GstAudioInfo * in_info,GstAudioInfo * out_info,GstBuffer * buffer)440 gst_audio_aggregator_convert_buffer (GstAudioAggregator * aagg, GstPad * pad,
441     GstAudioInfo * in_info, GstAudioInfo * out_info, GstBuffer * buffer)
442 {
443   GstAudioAggregatorPadClass *klass = GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (pad);
444   GstAudioAggregatorPad *aaggpad = GST_AUDIO_AGGREGATOR_PAD (pad);
445 
446   g_assert (klass->convert_buffer);
447 
448   return klass->convert_buffer (aaggpad, in_info, out_info, buffer);
449 }
450 
451 static void
gst_audio_aggregator_class_init(GstAudioAggregatorClass * klass)452 gst_audio_aggregator_class_init (GstAudioAggregatorClass * klass)
453 {
454   GObjectClass *gobject_class = (GObjectClass *) klass;
455   GstAggregatorClass *gstaggregator_class = (GstAggregatorClass *) klass;
456 
457   gobject_class->set_property = gst_audio_aggregator_set_property;
458   gobject_class->get_property = gst_audio_aggregator_get_property;
459   gobject_class->dispose = gst_audio_aggregator_dispose;
460 
461   gstaggregator_class->src_event =
462       GST_DEBUG_FUNCPTR (gst_audio_aggregator_src_event);
463   gstaggregator_class->sink_event =
464       GST_DEBUG_FUNCPTR (gst_audio_aggregator_sink_event);
465   gstaggregator_class->src_query =
466       GST_DEBUG_FUNCPTR (gst_audio_aggregator_src_query);
467   gstaggregator_class->sink_query = gst_audio_aggregator_sink_query;
468   gstaggregator_class->start = gst_audio_aggregator_start;
469   gstaggregator_class->stop = gst_audio_aggregator_stop;
470   gstaggregator_class->flush = gst_audio_aggregator_flush;
471   gstaggregator_class->aggregate =
472       GST_DEBUG_FUNCPTR (gst_audio_aggregator_aggregate);
473   gstaggregator_class->clip = GST_DEBUG_FUNCPTR (gst_audio_aggregator_do_clip);
474   gstaggregator_class->get_next_time = gst_aggregator_simple_get_next_time;
475   gstaggregator_class->update_src_caps =
476       GST_DEBUG_FUNCPTR (gst_audio_aggregator_update_src_caps);
477   gstaggregator_class->fixate_src_caps = gst_audio_aggregator_fixate_src_caps;
478   gstaggregator_class->negotiated_src_caps =
479       gst_audio_aggregator_negotiated_src_caps;
480 
481   klass->create_output_buffer = gst_audio_aggregator_create_output_buffer;
482 
483   GST_DEBUG_CATEGORY_INIT (audio_aggregator_debug, "audioaggregator",
484       GST_DEBUG_FG_MAGENTA, "GstAudioAggregator");
485 
486   g_object_class_install_property (gobject_class, PROP_OUTPUT_BUFFER_DURATION,
487       g_param_spec_uint64 ("output-buffer-duration", "Output Buffer Duration",
488           "Output block size in nanoseconds", 1,
489           G_MAXUINT64, DEFAULT_OUTPUT_BUFFER_DURATION,
490           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
491 
492   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
493       g_param_spec_uint64 ("alignment-threshold", "Alignment Threshold",
494           "Timestamp alignment threshold in nanoseconds", 0,
495           G_MAXUINT64 - 1, DEFAULT_ALIGNMENT_THRESHOLD,
496           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
497 
498   g_object_class_install_property (gobject_class, PROP_DISCONT_WAIT,
499       g_param_spec_uint64 ("discont-wait", "Discont Wait",
500           "Window of time in nanoseconds to wait before "
501           "creating a discontinuity", 0,
502           G_MAXUINT64 - 1, DEFAULT_DISCONT_WAIT,
503           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
504 }
505 
506 static void
gst_audio_aggregator_init(GstAudioAggregator * aagg)507 gst_audio_aggregator_init (GstAudioAggregator * aagg)
508 {
509   aagg->priv = gst_audio_aggregator_get_instance_private (aagg);
510 
511   g_mutex_init (&aagg->priv->mutex);
512 
513   aagg->priv->output_buffer_duration = DEFAULT_OUTPUT_BUFFER_DURATION;
514   aagg->priv->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
515   aagg->priv->discont_wait = DEFAULT_DISCONT_WAIT;
516 
517   aagg->current_caps = NULL;
518 
519   gst_aggregator_set_latency (GST_AGGREGATOR (aagg),
520       aagg->priv->output_buffer_duration, aagg->priv->output_buffer_duration);
521 }
522 
523 static void
gst_audio_aggregator_dispose(GObject * object)524 gst_audio_aggregator_dispose (GObject * object)
525 {
526   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object);
527 
528   gst_caps_replace (&aagg->current_caps, NULL);
529 
530   g_mutex_clear (&aagg->priv->mutex);
531 
532   G_OBJECT_CLASS (gst_audio_aggregator_parent_class)->dispose (object);
533 }
534 
535 static void
gst_audio_aggregator_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)536 gst_audio_aggregator_set_property (GObject * object, guint prop_id,
537     const GValue * value, GParamSpec * pspec)
538 {
539   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object);
540 
541   switch (prop_id) {
542     case PROP_OUTPUT_BUFFER_DURATION:
543       aagg->priv->output_buffer_duration = g_value_get_uint64 (value);
544       gst_aggregator_set_latency (GST_AGGREGATOR (aagg),
545           aagg->priv->output_buffer_duration,
546           aagg->priv->output_buffer_duration);
547       break;
548     case PROP_ALIGNMENT_THRESHOLD:
549       aagg->priv->alignment_threshold = g_value_get_uint64 (value);
550       break;
551     case PROP_DISCONT_WAIT:
552       aagg->priv->discont_wait = g_value_get_uint64 (value);
553       break;
554     default:
555       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
556       break;
557   }
558 }
559 
560 static void
gst_audio_aggregator_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)561 gst_audio_aggregator_get_property (GObject * object, guint prop_id,
562     GValue * value, GParamSpec * pspec)
563 {
564   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (object);
565 
566   switch (prop_id) {
567     case PROP_OUTPUT_BUFFER_DURATION:
568       g_value_set_uint64 (value, aagg->priv->output_buffer_duration);
569       break;
570     case PROP_ALIGNMENT_THRESHOLD:
571       g_value_set_uint64 (value, aagg->priv->alignment_threshold);
572       break;
573     case PROP_DISCONT_WAIT:
574       g_value_set_uint64 (value, aagg->priv->discont_wait);
575       break;
576     default:
577       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
578       break;
579   }
580 }
581 
582 /* Caps negotiation */
583 
584 /* Unref after usage */
585 static GstAudioAggregatorPad *
gst_audio_aggregator_get_first_configured_pad(GstAggregator * agg)586 gst_audio_aggregator_get_first_configured_pad (GstAggregator * agg)
587 {
588   GstAudioAggregatorPad *res = NULL;
589   GList *l;
590 
591   GST_OBJECT_LOCK (agg);
592   for (l = GST_ELEMENT (agg)->sinkpads; l; l = l->next) {
593     GstAudioAggregatorPad *aaggpad = l->data;
594 
595     if (GST_AUDIO_INFO_FORMAT (&aaggpad->info) != GST_AUDIO_FORMAT_UNKNOWN) {
596       res = gst_object_ref (aaggpad);
597       break;
598     }
599   }
600   GST_OBJECT_UNLOCK (agg);
601 
602   return res;
603 }
604 
605 static GstCaps *
gst_audio_aggregator_sink_getcaps(GstPad * pad,GstAggregator * agg,GstCaps * filter)606 gst_audio_aggregator_sink_getcaps (GstPad * pad, GstAggregator * agg,
607     GstCaps * filter)
608 {
609   GstAudioAggregatorPad *first_configured_pad =
610       gst_audio_aggregator_get_first_configured_pad (agg);
611   GstCaps *sink_template_caps = gst_pad_get_pad_template_caps (pad);
612   GstCaps *downstream_caps = gst_pad_get_allowed_caps (agg->srcpad);
613   GstCaps *sink_caps;
614 
615   GST_INFO_OBJECT (pad, "Getting caps with filter %" GST_PTR_FORMAT, filter);
616   GST_DEBUG_OBJECT (pad, "sink template caps : %" GST_PTR_FORMAT,
617       sink_template_caps);
618   GST_DEBUG_OBJECT (pad, "downstream caps %" GST_PTR_FORMAT, downstream_caps);
619 
620   /* If we already have a configured pad, assume that we can only configure
621    * to the very same format filtered with the template caps and continue
622    * with the result of that as the template caps */
623 
624   if (first_configured_pad) {
625     GstCaps *first_configured_caps =
626         gst_audio_info_to_caps (&first_configured_pad->info);
627     GstCaps *tmp;
628 
629     tmp =
630         gst_caps_intersect_full (sink_template_caps, first_configured_caps,
631         GST_CAPS_INTERSECT_FIRST);
632     gst_caps_unref (first_configured_caps);
633     gst_caps_unref (sink_template_caps);
634     sink_template_caps = tmp;
635 
636     gst_object_unref (first_configured_pad);
637   }
638 
639   /* If we have downstream caps, filter them against our template caps or
640    * the filtered first configured pad caps from above */
641   if (downstream_caps) {
642     sink_caps =
643         gst_caps_intersect_full (sink_template_caps, downstream_caps,
644         GST_CAPS_INTERSECT_FIRST);
645   } else {
646     sink_caps = gst_caps_ref (sink_template_caps);
647   }
648 
649   if (filter) {
650     GstCaps *tmp = gst_caps_intersect_full (sink_caps, filter,
651         GST_CAPS_INTERSECT_FIRST);
652 
653     gst_caps_unref (sink_caps);
654     sink_caps = tmp;
655   }
656 
657   gst_caps_unref (sink_template_caps);
658 
659   if (downstream_caps)
660     gst_caps_unref (downstream_caps);
661 
662   GST_INFO_OBJECT (pad, "returned sink caps : %" GST_PTR_FORMAT, sink_caps);
663 
664   return sink_caps;
665 }
666 
667 static GstCaps *
gst_audio_aggregator_convert_sink_getcaps(GstPad * pad,GstAggregator * agg,GstCaps * filter)668 gst_audio_aggregator_convert_sink_getcaps (GstPad * pad, GstAggregator * agg,
669     GstCaps * filter)
670 {
671   GstAudioAggregatorPad *first_configured_pad =
672       gst_audio_aggregator_get_first_configured_pad (agg);
673   GstCaps *sink_template_caps = gst_pad_get_pad_template_caps (pad);
674   GstCaps *downstream_caps = gst_pad_get_allowed_caps (agg->srcpad);
675   GstCaps *sink_caps;
676 
677   GST_INFO_OBJECT (pad, "Getting caps with filter %" GST_PTR_FORMAT, filter);
678   GST_DEBUG_OBJECT (pad, "sink template caps : %" GST_PTR_FORMAT,
679       sink_template_caps);
680   GST_DEBUG_OBJECT (pad, "downstream caps %" GST_PTR_FORMAT, downstream_caps);
681 
682   /* We can convert between all formats except for the sample rate, which has
683    * to match.  */
684 
685   /* If we have a first configured pad, we can only convert everything except
686    * for the sample rate, so modify our template caps to have exactly that
687    * sample rate in all structures */
688   if (first_configured_pad) {
689     GST_INFO_OBJECT (pad, "first configured pad has sample rate %d",
690         first_configured_pad->info.rate);
691     sink_template_caps = gst_caps_make_writable (sink_template_caps);
692     gst_caps_set_simple (sink_template_caps, "rate", G_TYPE_INT,
693         first_configured_pad->info.rate, NULL);
694     gst_object_unref (first_configured_pad);
695   }
696 
697   /* Now if we have downstream caps, filter against the template caps from
698    * above, i.e. with potentially fixated sample rate field already. This
699    * filters out any structures with unsupported rates.
700    *
701    * Afterwards we create new caps that only take over the rate fields of the
702    * remaining downstream caps, and filter that against the plain template
703    * caps to get the resulting allowed caps with conversion for everything but
704    * the rate */
705   if (downstream_caps) {
706     GstCaps *tmp;
707     guint i, n;
708 
709     tmp =
710         gst_caps_intersect_full (sink_template_caps, downstream_caps,
711         GST_CAPS_INTERSECT_FIRST);
712 
713     n = gst_caps_get_size (tmp);
714     sink_caps = gst_caps_new_empty ();
715     for (i = 0; i < n; i++) {
716       GstStructure *s = gst_caps_get_structure (tmp, i);
717       GstStructure *new_s =
718           gst_structure_new_empty (gst_structure_get_name (s));
719       gst_structure_set_value (new_s, "rate", gst_structure_get_value (s,
720               "rate"));
721       sink_caps = gst_caps_merge_structure (sink_caps, new_s);
722     }
723     gst_caps_unref (tmp);
724     tmp = sink_caps;
725 
726     sink_caps =
727         gst_caps_intersect_full (sink_template_caps, tmp,
728         GST_CAPS_INTERSECT_FIRST);
729     gst_caps_unref (tmp);
730   } else {
731     sink_caps = gst_caps_ref (sink_template_caps);
732   }
733 
734   /* And finally filter anything that remains against the filter caps */
735   if (filter) {
736     GstCaps *tmp =
737         gst_caps_intersect_full (filter, sink_caps, GST_CAPS_INTERSECT_FIRST);
738     gst_caps_unref (sink_caps);
739     sink_caps = tmp;
740   }
741 
742   GST_INFO_OBJECT (pad, "returned sink caps : %" GST_PTR_FORMAT, sink_caps);
743 
744   gst_caps_unref (sink_template_caps);
745 
746   if (downstream_caps)
747     gst_caps_unref (downstream_caps);
748 
749   return sink_caps;
750 }
751 
752 static gboolean
gst_audio_aggregator_sink_setcaps(GstAudioAggregatorPad * aaggpad,GstAggregator * agg,GstCaps * caps)753 gst_audio_aggregator_sink_setcaps (GstAudioAggregatorPad * aaggpad,
754     GstAggregator * agg, GstCaps * caps)
755 {
756   GstAudioAggregatorPad *first_configured_pad =
757       gst_audio_aggregator_get_first_configured_pad (agg);
758   GstCaps *downstream_caps = gst_pad_get_allowed_caps (agg->srcpad);
759   GstAudioInfo info;
760   gboolean ret = TRUE;
761   gint downstream_rate;
762   GstStructure *s;
763 
764   if (!downstream_caps || gst_caps_is_empty (downstream_caps)) {
765     ret = FALSE;
766     goto done;
767   }
768 
769   if (!gst_audio_info_from_caps (&info, caps)) {
770     GST_WARNING_OBJECT (agg, "Rejecting invalid caps: %" GST_PTR_FORMAT, caps);
771     return FALSE;
772   }
773   s = gst_caps_get_structure (downstream_caps, 0);
774 
775   /* TODO: handle different rates on sinkpads, a bit complex
776    * because offsets will have to be updated, and audio resampling
777    * has a latency to take into account
778    */
779   if ((gst_structure_get_int (s, "rate", &downstream_rate)
780           && info.rate != downstream_rate) || (first_configured_pad
781           && info.rate != first_configured_pad->info.rate)) {
782     gst_pad_push_event (GST_PAD (aaggpad), gst_event_new_reconfigure ());
783     ret = FALSE;
784   } else {
785     GstAudioAggregatorPadClass *klass =
786         GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (aaggpad);
787     GST_OBJECT_LOCK (aaggpad);
788     aaggpad->info = info;
789     if (klass->update_conversion_info)
790       klass->update_conversion_info (aaggpad);
791     GST_OBJECT_UNLOCK (aaggpad);
792   }
793 
794 done:
795   if (first_configured_pad)
796     gst_object_unref (first_configured_pad);
797 
798   if (downstream_caps)
799     gst_caps_unref (downstream_caps);
800 
801   return ret;
802 }
803 
804 static GstFlowReturn
gst_audio_aggregator_update_src_caps(GstAggregator * agg,GstCaps * caps,GstCaps ** ret)805 gst_audio_aggregator_update_src_caps (GstAggregator * agg,
806     GstCaps * caps, GstCaps ** ret)
807 {
808   GstCaps *src_template_caps = gst_pad_get_pad_template_caps (agg->srcpad);
809   GstCaps *downstream_caps =
810       gst_pad_peer_query_caps (agg->srcpad, src_template_caps);
811 
812   gst_caps_unref (src_template_caps);
813 
814   *ret = gst_caps_intersect (caps, downstream_caps);
815 
816   GST_INFO ("Updated src caps to %" GST_PTR_FORMAT, *ret);
817 
818   if (downstream_caps)
819     gst_caps_unref (downstream_caps);
820 
821   return GST_FLOW_OK;
822 }
823 
824 /* At that point if the caps are not fixed, this means downstream
825  * didn't have fully specified requirements, we'll just go ahead
826  * and fixate raw audio fields using our first configured pad, we don't for
827  * now need a more complicated heuristic
828  */
829 static GstCaps *
gst_audio_aggregator_fixate_src_caps(GstAggregator * agg,GstCaps * caps)830 gst_audio_aggregator_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
831 {
832   GstAudioAggregatorPad *first_configured_pad = NULL;
833 
834   if (GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (agg->srcpad)->convert_buffer)
835     first_configured_pad = gst_audio_aggregator_get_first_configured_pad (agg);
836 
837   caps = gst_caps_make_writable (caps);
838 
839   if (first_configured_pad) {
840     GstStructure *s, *s2;
841     GstCaps *first_configured_caps =
842         gst_audio_info_to_caps (&first_configured_pad->info);
843     gint first_configured_rate, first_configured_channels;
844     gint channels;
845 
846     s = gst_caps_get_structure (caps, 0);
847     s2 = gst_caps_get_structure (first_configured_caps, 0);
848 
849     gst_structure_get_int (s2, "rate", &first_configured_rate);
850     gst_structure_get_int (s2, "channels", &first_configured_channels);
851 
852     gst_structure_fixate_field_string (s, "format",
853         gst_structure_get_string (s2, "format"));
854     gst_structure_fixate_field_string (s, "layout",
855         gst_structure_get_string (s2, "layout"));
856     gst_structure_fixate_field_nearest_int (s, "rate", first_configured_rate);
857     gst_structure_fixate_field_nearest_int (s, "channels",
858         first_configured_channels);
859 
860     gst_structure_get_int (s, "channels", &channels);
861 
862     if (!gst_structure_has_field (s, "channel-mask") && channels > 2) {
863       guint64 mask;
864 
865       if (!gst_structure_get (s2, "channel-mask", GST_TYPE_BITMASK, &mask,
866               NULL)) {
867         mask = gst_audio_channel_get_fallback_mask (channels);
868       }
869       gst_structure_set (s, "channel-mask", GST_TYPE_BITMASK, mask, NULL);
870     }
871 
872     gst_caps_unref (first_configured_caps);
873     gst_object_unref (first_configured_pad);
874   } else {
875     GstStructure *s;
876     gint channels;
877 
878     s = gst_caps_get_structure (caps, 0);
879 
880     gst_structure_fixate_field_nearest_int (s, "rate", GST_AUDIO_DEF_RATE);
881     gst_structure_fixate_field_string (s, "format", GST_AUDIO_NE ("S16"));
882     gst_structure_fixate_field_string (s, "layout", "interleaved");
883     gst_structure_fixate_field_nearest_int (s, "channels", 2);
884 
885     if (gst_structure_get_int (s, "channels", &channels) && channels > 2) {
886       if (!gst_structure_has_field_typed (s, "channel-mask", GST_TYPE_BITMASK))
887         gst_structure_set (s, "channel-mask", GST_TYPE_BITMASK, 0ULL, NULL);
888     }
889   }
890 
891   if (!gst_caps_is_fixed (caps))
892     caps = gst_caps_fixate (caps);
893 
894   GST_INFO_OBJECT (agg, "Fixated src caps to %" GST_PTR_FORMAT, caps);
895 
896   return caps;
897 }
898 
899 /* Must be called with OBJECT_LOCK taken */
900 static void
gst_audio_aggregator_update_converters(GstAudioAggregator * aagg,GstAudioInfo * new_info,GstAudioInfo * old_info)901 gst_audio_aggregator_update_converters (GstAudioAggregator * aagg,
902     GstAudioInfo * new_info, GstAudioInfo * old_info)
903 {
904   GList *l;
905 
906   for (l = GST_ELEMENT (aagg)->sinkpads; l; l = l->next) {
907     GstAudioAggregatorPad *aaggpad = l->data;
908     GstAudioAggregatorPadClass *klass =
909         GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (aaggpad);
910 
911     if (klass->update_conversion_info)
912       klass->update_conversion_info (aaggpad);
913 
914     /* If we currently were mixing a buffer, we need to convert it to the new
915      * format */
916     if (aaggpad->priv->buffer) {
917       GstBuffer *new_converted_buffer =
918           gst_audio_aggregator_convert_buffer (aagg, GST_PAD (aaggpad),
919           old_info, new_info, aaggpad->priv->input_buffer);
920       gst_buffer_replace (&aaggpad->priv->buffer, new_converted_buffer);
921       gst_buffer_unref (new_converted_buffer);
922     }
923   }
924 }
925 
926 /* We now have our final output caps, we can create the required converters */
927 static gboolean
gst_audio_aggregator_negotiated_src_caps(GstAggregator * agg,GstCaps * caps)928 gst_audio_aggregator_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
929 {
930   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
931   GstAudioInfo info;
932   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
933 
934   GST_INFO_OBJECT (agg, "src caps negotiated %" GST_PTR_FORMAT, caps);
935 
936   if (!gst_audio_info_from_caps (&info, caps)) {
937     GST_WARNING_OBJECT (aagg, "Rejecting invalid caps: %" GST_PTR_FORMAT, caps);
938     return FALSE;
939   }
940 
941   GST_AUDIO_AGGREGATOR_LOCK (aagg);
942   GST_OBJECT_LOCK (aagg);
943 
944   if (!gst_audio_info_is_equal (&info, &srcpad->info)) {
945     GstAudioInfo old_info = srcpad->info;
946     GstAudioAggregatorPadClass *srcpad_klass =
947         GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (agg->srcpad);
948 
949     GST_INFO_OBJECT (aagg, "setting caps to %" GST_PTR_FORMAT, caps);
950     gst_caps_replace (&aagg->current_caps, caps);
951 
952     memcpy (&srcpad->info, &info, sizeof (info));
953 
954     gst_audio_aggregator_update_converters (aagg, &info, &old_info);
955 
956     if (srcpad_klass->update_conversion_info)
957       srcpad_klass->
958           update_conversion_info (GST_AUDIO_AGGREGATOR_PAD (agg->srcpad));
959 
960     if (aagg->priv->current_buffer) {
961       GstBuffer *converted;
962 
963       converted =
964           gst_audio_aggregator_convert_buffer (aagg, agg->srcpad, &old_info,
965           &info, aagg->priv->current_buffer);
966       gst_buffer_unref (aagg->priv->current_buffer);
967       aagg->priv->current_buffer = converted;
968     }
969   }
970 
971   GST_OBJECT_UNLOCK (aagg);
972   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
973 
974   return
975       GST_AGGREGATOR_CLASS
976       (gst_audio_aggregator_parent_class)->negotiated_src_caps (agg, caps);
977 }
978 
979 /* event handling */
980 
981 static gboolean
gst_audio_aggregator_src_event(GstAggregator * agg,GstEvent * event)982 gst_audio_aggregator_src_event (GstAggregator * agg, GstEvent * event)
983 {
984   gboolean result;
985 
986   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
987   GST_DEBUG_OBJECT (agg->srcpad, "Got %s event on src pad",
988       GST_EVENT_TYPE_NAME (event));
989 
990   switch (GST_EVENT_TYPE (event)) {
991     case GST_EVENT_QOS:
992       /* QoS might be tricky */
993       gst_event_unref (event);
994       return FALSE;
995     case GST_EVENT_NAVIGATION:
996       /* navigation is rather pointless. */
997       gst_event_unref (event);
998       return FALSE;
999       break;
1000     case GST_EVENT_SEEK:
1001     {
1002       GstSeekFlags flags;
1003       gdouble rate;
1004       GstSeekType start_type, stop_type;
1005       gint64 start, stop;
1006       GstFormat seek_format, dest_format;
1007 
1008       /* parse the seek parameters */
1009       gst_event_parse_seek (event, &rate, &seek_format, &flags, &start_type,
1010           &start, &stop_type, &stop);
1011 
1012       /* Check the seeking parameters before linking up */
1013       if ((start_type != GST_SEEK_TYPE_NONE)
1014           && (start_type != GST_SEEK_TYPE_SET)) {
1015         result = FALSE;
1016         GST_DEBUG_OBJECT (aagg,
1017             "seeking failed, unhandled seek type for start: %d", start_type);
1018         goto done;
1019       }
1020       if ((stop_type != GST_SEEK_TYPE_NONE) && (stop_type != GST_SEEK_TYPE_SET)) {
1021         result = FALSE;
1022         GST_DEBUG_OBJECT (aagg,
1023             "seeking failed, unhandled seek type for end: %d", stop_type);
1024         goto done;
1025       }
1026 
1027       GST_OBJECT_LOCK (agg);
1028       dest_format = GST_AGGREGATOR_PAD (agg->srcpad)->segment.format;
1029       GST_OBJECT_UNLOCK (agg);
1030       if (seek_format != dest_format) {
1031         result = FALSE;
1032         GST_DEBUG_OBJECT (aagg,
1033             "seeking failed, unhandled seek format: %s",
1034             gst_format_get_name (seek_format));
1035         goto done;
1036       }
1037     }
1038       break;
1039     default:
1040       break;
1041   }
1042 
1043   return
1044       GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->src_event (agg,
1045       event);
1046 
1047 done:
1048   return result;
1049 }
1050 
1051 
1052 static gboolean
gst_audio_aggregator_sink_event(GstAggregator * agg,GstAggregatorPad * aggpad,GstEvent * event)1053 gst_audio_aggregator_sink_event (GstAggregator * agg,
1054     GstAggregatorPad * aggpad, GstEvent * event)
1055 {
1056   GstAudioAggregatorPad *aaggpad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
1057   gboolean res = TRUE;
1058 
1059   GST_DEBUG_OBJECT (aggpad, "Got %s event on sink pad",
1060       GST_EVENT_TYPE_NAME (event));
1061 
1062   switch (GST_EVENT_TYPE (event)) {
1063     case GST_EVENT_SEGMENT:
1064     {
1065       const GstSegment *segment;
1066       gst_event_parse_segment (event, &segment);
1067 
1068       if (segment->format != GST_FORMAT_TIME) {
1069         GST_ERROR_OBJECT (agg, "Segment of type %s are not supported,"
1070             " only TIME segments are supported",
1071             gst_format_get_name (segment->format));
1072         gst_event_unref (event);
1073         event = NULL;
1074         res = FALSE;
1075         break;
1076       }
1077 
1078       GST_OBJECT_LOCK (agg);
1079       if (segment->rate != GST_AGGREGATOR_PAD (agg->srcpad)->segment.rate) {
1080         GST_ERROR_OBJECT (aggpad,
1081             "Got segment event with wrong rate %lf, expected %lf",
1082             segment->rate, GST_AGGREGATOR_PAD (agg->srcpad)->segment.rate);
1083         res = FALSE;
1084         gst_event_unref (event);
1085         event = NULL;
1086       } else if (segment->rate < 0.0) {
1087         GST_ERROR_OBJECT (aggpad, "Negative rates not supported yet");
1088         res = FALSE;
1089         gst_event_unref (event);
1090         event = NULL;
1091       } else {
1092         GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (aggpad);
1093 
1094         GST_OBJECT_LOCK (pad);
1095         pad->priv->new_segment = TRUE;
1096         GST_OBJECT_UNLOCK (pad);
1097       }
1098       GST_OBJECT_UNLOCK (agg);
1099 
1100       break;
1101     }
1102     case GST_EVENT_CAPS:
1103     {
1104       GstCaps *caps;
1105 
1106       gst_event_parse_caps (event, &caps);
1107       GST_INFO_OBJECT (aggpad, "Got caps %" GST_PTR_FORMAT, caps);
1108       res = gst_audio_aggregator_sink_setcaps (aaggpad, agg, caps);
1109       gst_event_unref (event);
1110       event = NULL;
1111       break;
1112     }
1113     default:
1114       break;
1115   }
1116 
1117   if (!res) {
1118     if (event)
1119       gst_event_unref (event);
1120     return res;
1121   }
1122 
1123   if (event != NULL)
1124     return
1125         GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->sink_event
1126         (agg, aggpad, event);
1127 
1128   return res;
1129 }
1130 
1131 static gboolean
gst_audio_aggregator_sink_query(GstAggregator * agg,GstAggregatorPad * aggpad,GstQuery * query)1132 gst_audio_aggregator_sink_query (GstAggregator * agg, GstAggregatorPad * aggpad,
1133     GstQuery * query)
1134 {
1135   gboolean res = FALSE;
1136 
1137   switch (GST_QUERY_TYPE (query)) {
1138     case GST_QUERY_CAPS:
1139     {
1140       GstCaps *filter, *caps;
1141 
1142       gst_query_parse_caps (query, &filter);
1143       if (GST_IS_AUDIO_AGGREGATOR_CONVERT_PAD (aggpad)) {
1144         caps =
1145             gst_audio_aggregator_convert_sink_getcaps (GST_PAD (aggpad), agg,
1146             filter);
1147       } else {
1148         caps =
1149             gst_audio_aggregator_sink_getcaps (GST_PAD (aggpad), agg, filter);
1150       }
1151       gst_query_set_caps_result (query, caps);
1152       gst_caps_unref (caps);
1153       res = TRUE;
1154       break;
1155     }
1156     default:
1157       res =
1158           GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->sink_query
1159           (agg, aggpad, query);
1160       break;
1161   }
1162 
1163   return res;
1164 }
1165 
1166 
1167 /* FIXME, the duration query should reflect how long you will produce
1168  * data, that is the amount of stream time until you will emit EOS.
1169  *
1170  * For synchronized mixing this is always the max of all the durations
1171  * of upstream since we emit EOS when all of them finished.
1172  *
1173  * We don't do synchronized mixing so this really depends on where the
1174  * streams where punched in and what their relative offsets are against
1175  * eachother which we can get from the first timestamps we see.
1176  *
1177  * When we add a new stream (or remove a stream) the duration might
1178  * also become invalid again and we need to post a new DURATION
1179  * message to notify this fact to the parent.
1180  * For now we take the max of all the upstream elements so the simple
1181  * cases work at least somewhat.
1182  */
1183 static gboolean
gst_audio_aggregator_query_duration(GstAudioAggregator * aagg,GstQuery * query)1184 gst_audio_aggregator_query_duration (GstAudioAggregator * aagg,
1185     GstQuery * query)
1186 {
1187   gint64 max;
1188   gboolean res;
1189   GstFormat format;
1190   GstIterator *it;
1191   gboolean done;
1192   GValue item = { 0, };
1193 
1194   /* parse format */
1195   gst_query_parse_duration (query, &format, NULL);
1196 
1197   max = -1;
1198   res = TRUE;
1199   done = FALSE;
1200 
1201   it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (aagg));
1202   while (!done) {
1203     GstIteratorResult ires;
1204 
1205     ires = gst_iterator_next (it, &item);
1206     switch (ires) {
1207       case GST_ITERATOR_DONE:
1208         done = TRUE;
1209         break;
1210       case GST_ITERATOR_OK:
1211       {
1212         GstPad *pad = g_value_get_object (&item);
1213         gint64 duration;
1214 
1215         /* ask sink peer for duration */
1216         res &= gst_pad_peer_query_duration (pad, format, &duration);
1217         /* take max from all valid return values */
1218         if (res) {
1219           /* valid unknown length, stop searching */
1220           if (duration == -1) {
1221             max = duration;
1222             done = TRUE;
1223           }
1224           /* else see if bigger than current max */
1225           else if (duration > max)
1226             max = duration;
1227         }
1228         g_value_reset (&item);
1229         break;
1230       }
1231       case GST_ITERATOR_RESYNC:
1232         max = -1;
1233         res = TRUE;
1234         gst_iterator_resync (it);
1235         break;
1236       default:
1237         res = FALSE;
1238         done = TRUE;
1239         break;
1240     }
1241   }
1242   g_value_unset (&item);
1243   gst_iterator_free (it);
1244 
1245   if (res) {
1246     /* and store the max */
1247     GST_DEBUG_OBJECT (aagg, "Total duration in format %s: %"
1248         GST_TIME_FORMAT, gst_format_get_name (format), GST_TIME_ARGS (max));
1249     gst_query_set_duration (query, format, max);
1250   }
1251 
1252   return res;
1253 }
1254 
1255 
1256 static gboolean
gst_audio_aggregator_src_query(GstAggregator * agg,GstQuery * query)1257 gst_audio_aggregator_src_query (GstAggregator * agg, GstQuery * query)
1258 {
1259   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1260   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1261   gboolean res = FALSE;
1262 
1263   switch (GST_QUERY_TYPE (query)) {
1264     case GST_QUERY_DURATION:
1265       res = gst_audio_aggregator_query_duration (aagg, query);
1266       break;
1267     case GST_QUERY_POSITION:
1268     {
1269       GstFormat format;
1270 
1271       gst_query_parse_position (query, &format, NULL);
1272 
1273       GST_OBJECT_LOCK (aagg);
1274 
1275       switch (format) {
1276         case GST_FORMAT_TIME:
1277           gst_query_set_position (query, format,
1278               gst_segment_to_stream_time (&GST_AGGREGATOR_PAD (agg->
1279                       srcpad)->segment, GST_FORMAT_TIME,
1280                   GST_AGGREGATOR_PAD (agg->srcpad)->segment.position));
1281           res = TRUE;
1282           break;
1283         case GST_FORMAT_BYTES:
1284           if (GST_AUDIO_INFO_BPF (&srcpad->info)) {
1285             gst_query_set_position (query, format, aagg->priv->offset *
1286                 GST_AUDIO_INFO_BPF (&srcpad->info));
1287             res = TRUE;
1288           }
1289           break;
1290         case GST_FORMAT_DEFAULT:
1291           gst_query_set_position (query, format, aagg->priv->offset);
1292           res = TRUE;
1293           break;
1294         default:
1295           break;
1296       }
1297 
1298       GST_OBJECT_UNLOCK (aagg);
1299 
1300       break;
1301     }
1302     default:
1303       res =
1304           GST_AGGREGATOR_CLASS (gst_audio_aggregator_parent_class)->src_query
1305           (agg, query);
1306       break;
1307   }
1308 
1309   return res;
1310 }
1311 
1312 
1313 void
gst_audio_aggregator_set_sink_caps(GstAudioAggregator * aagg,GstAudioAggregatorPad * pad,GstCaps * caps)1314 gst_audio_aggregator_set_sink_caps (GstAudioAggregator * aagg,
1315     GstAudioAggregatorPad * pad, GstCaps * caps)
1316 {
1317 #ifndef G_DISABLE_ASSERT
1318   gboolean valid;
1319 
1320   GST_OBJECT_LOCK (pad);
1321   valid = gst_audio_info_from_caps (&pad->info, caps);
1322   g_assert (valid);
1323   GST_OBJECT_UNLOCK (pad);
1324 #else
1325   GST_OBJECT_LOCK (pad);
1326   (void) gst_audio_info_from_caps (&pad->info, caps);
1327   GST_OBJECT_UNLOCK (pad);
1328 #endif
1329 }
1330 
1331 /* Must hold object lock and aagg lock to call */
1332 
1333 static void
gst_audio_aggregator_reset(GstAudioAggregator * aagg)1334 gst_audio_aggregator_reset (GstAudioAggregator * aagg)
1335 {
1336   GstAggregator *agg = GST_AGGREGATOR (aagg);
1337 
1338   GST_AUDIO_AGGREGATOR_LOCK (aagg);
1339   GST_OBJECT_LOCK (aagg);
1340   GST_AGGREGATOR_PAD (agg->srcpad)->segment.position = -1;
1341   aagg->priv->offset = -1;
1342   gst_audio_info_init (&GST_AUDIO_AGGREGATOR_PAD (agg->srcpad)->info);
1343   gst_caps_replace (&aagg->current_caps, NULL);
1344   gst_buffer_replace (&aagg->priv->current_buffer, NULL);
1345   GST_OBJECT_UNLOCK (aagg);
1346   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1347 }
1348 
1349 static gboolean
gst_audio_aggregator_start(GstAggregator * agg)1350 gst_audio_aggregator_start (GstAggregator * agg)
1351 {
1352   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1353 
1354   gst_audio_aggregator_reset (aagg);
1355 
1356   return TRUE;
1357 }
1358 
1359 static gboolean
gst_audio_aggregator_stop(GstAggregator * agg)1360 gst_audio_aggregator_stop (GstAggregator * agg)
1361 {
1362   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1363 
1364   gst_audio_aggregator_reset (aagg);
1365 
1366   return TRUE;
1367 }
1368 
1369 static GstFlowReturn
gst_audio_aggregator_flush(GstAggregator * agg)1370 gst_audio_aggregator_flush (GstAggregator * agg)
1371 {
1372   GstAudioAggregator *aagg = GST_AUDIO_AGGREGATOR (agg);
1373 
1374   GST_AUDIO_AGGREGATOR_LOCK (aagg);
1375   GST_OBJECT_LOCK (aagg);
1376   GST_AGGREGATOR_PAD (agg->srcpad)->segment.position = -1;
1377   aagg->priv->offset = -1;
1378   gst_buffer_replace (&aagg->priv->current_buffer, NULL);
1379   GST_OBJECT_UNLOCK (aagg);
1380   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1381 
1382   return GST_FLOW_OK;
1383 }
1384 
1385 static GstBuffer *
gst_audio_aggregator_do_clip(GstAggregator * agg,GstAggregatorPad * bpad,GstBuffer * buffer)1386 gst_audio_aggregator_do_clip (GstAggregator * agg,
1387     GstAggregatorPad * bpad, GstBuffer * buffer)
1388 {
1389   GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (bpad);
1390   gint rate, bpf;
1391 
1392   /* Guard against invalid audio info, we just don't clip here then */
1393   if (!GST_AUDIO_INFO_IS_VALID (&pad->info))
1394     return buffer;
1395 
1396   GST_OBJECT_LOCK (bpad);
1397   rate = GST_AUDIO_INFO_RATE (&pad->info);
1398   bpf = GST_AUDIO_INFO_BPF (&pad->info);
1399   buffer = gst_audio_buffer_clip (buffer, &bpad->segment, rate, bpf);
1400   GST_OBJECT_UNLOCK (bpad);
1401 
1402   return buffer;
1403 }
1404 
1405 /* Called with the object lock for both the element and pad held,
1406  * as well as the aagg lock
1407  *
1408  * Replace the current buffer with input and update GstAudioAggregatorPadPrivate
1409  * values.
1410  */
1411 static gboolean
gst_audio_aggregator_fill_buffer(GstAudioAggregator * aagg,GstAudioAggregatorPad * pad)1412 gst_audio_aggregator_fill_buffer (GstAudioAggregator * aagg,
1413     GstAudioAggregatorPad * pad)
1414 {
1415   GstClockTime start_time, end_time;
1416   gboolean discont = FALSE;
1417   guint64 start_offset, end_offset;
1418   gint rate, bpf;
1419 
1420   GstAggregator *agg = GST_AGGREGATOR (aagg);
1421   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1422   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1423 
1424   if (GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (pad)->convert_buffer) {
1425     rate = GST_AUDIO_INFO_RATE (&srcpad->info);
1426     bpf = GST_AUDIO_INFO_BPF (&srcpad->info);
1427   } else {
1428     rate = GST_AUDIO_INFO_RATE (&pad->info);
1429     bpf = GST_AUDIO_INFO_BPF (&pad->info);
1430   }
1431 
1432   pad->priv->position = 0;
1433   pad->priv->size = gst_buffer_get_size (pad->priv->buffer) / bpf;
1434 
1435   if (pad->priv->size == 0) {
1436     if (!GST_BUFFER_DURATION_IS_VALID (pad->priv->buffer) ||
1437         !GST_BUFFER_FLAG_IS_SET (pad->priv->buffer, GST_BUFFER_FLAG_GAP)) {
1438       GST_WARNING_OBJECT (pad, "Dropping 0-sized buffer missing either a"
1439           " duration or a GAP flag: %" GST_PTR_FORMAT, pad->priv->buffer);
1440       return FALSE;
1441     }
1442 
1443     pad->priv->size =
1444         gst_util_uint64_scale (GST_BUFFER_DURATION (pad->priv->buffer), rate,
1445         GST_SECOND);
1446   }
1447 
1448   if (!GST_BUFFER_PTS_IS_VALID (pad->priv->buffer)) {
1449     if (pad->priv->output_offset == -1)
1450       pad->priv->output_offset = aagg->priv->offset;
1451     if (pad->priv->next_offset == -1)
1452       pad->priv->next_offset = pad->priv->size;
1453     else
1454       pad->priv->next_offset += pad->priv->size;
1455     goto done;
1456   }
1457 
1458   start_time = GST_BUFFER_PTS (pad->priv->buffer);
1459   end_time =
1460       start_time + gst_util_uint64_scale_ceil (pad->priv->size, GST_SECOND,
1461       rate);
1462 
1463   /* Clipping should've ensured this */
1464   g_assert (start_time >= aggpad->segment.start);
1465 
1466   start_offset =
1467       gst_util_uint64_scale (start_time - aggpad->segment.start, rate,
1468       GST_SECOND);
1469   end_offset = start_offset + pad->priv->size;
1470 
1471   if (GST_BUFFER_IS_DISCONT (pad->priv->buffer)
1472       || GST_BUFFER_FLAG_IS_SET (pad->priv->buffer, GST_BUFFER_FLAG_RESYNC)
1473       || pad->priv->new_segment || pad->priv->next_offset == -1) {
1474     discont = TRUE;
1475     pad->priv->new_segment = FALSE;
1476   } else {
1477     guint64 diff, max_sample_diff;
1478 
1479     /* Check discont, based on audiobasesink */
1480     if (start_offset <= pad->priv->next_offset)
1481       diff = pad->priv->next_offset - start_offset;
1482     else
1483       diff = start_offset - pad->priv->next_offset;
1484 
1485     max_sample_diff =
1486         gst_util_uint64_scale_int (aagg->priv->alignment_threshold, rate,
1487         GST_SECOND);
1488 
1489     /* Discont! */
1490     if (G_UNLIKELY (diff >= max_sample_diff)) {
1491       if (aagg->priv->discont_wait > 0) {
1492         if (pad->priv->discont_time == GST_CLOCK_TIME_NONE) {
1493           pad->priv->discont_time = start_time;
1494         } else if (start_time - pad->priv->discont_time >=
1495             aagg->priv->discont_wait) {
1496           discont = TRUE;
1497           pad->priv->discont_time = GST_CLOCK_TIME_NONE;
1498         }
1499       } else {
1500         discont = TRUE;
1501       }
1502     } else if (G_UNLIKELY (pad->priv->discont_time != GST_CLOCK_TIME_NONE)) {
1503       /* we have had a discont, but are now back on track! */
1504       pad->priv->discont_time = GST_CLOCK_TIME_NONE;
1505     }
1506   }
1507 
1508   if (discont) {
1509     /* Have discont, need resync */
1510     if (pad->priv->next_offset != -1)
1511       GST_DEBUG_OBJECT (pad, "Have discont. Expected %"
1512           G_GUINT64_FORMAT ", got %" G_GUINT64_FORMAT,
1513           pad->priv->next_offset, start_offset);
1514     pad->priv->output_offset = -1;
1515     pad->priv->next_offset = end_offset;
1516   } else {
1517     pad->priv->next_offset += pad->priv->size;
1518   }
1519 
1520   if (pad->priv->output_offset == -1) {
1521     GstClockTime start_running_time;
1522     GstClockTime end_running_time;
1523     GstClockTime segment_pos;
1524     guint64 start_output_offset = -1;
1525     guint64 end_output_offset = -1;
1526     GstSegment *agg_segment = &GST_AGGREGATOR_PAD (agg->srcpad)->segment;
1527 
1528     start_running_time =
1529         gst_segment_to_running_time (&aggpad->segment,
1530         GST_FORMAT_TIME, start_time);
1531     end_running_time =
1532         gst_segment_to_running_time (&aggpad->segment,
1533         GST_FORMAT_TIME, end_time);
1534 
1535     /* Convert to position in the output segment */
1536     segment_pos =
1537         gst_segment_position_from_running_time (agg_segment, GST_FORMAT_TIME,
1538         start_running_time);
1539     if (GST_CLOCK_TIME_IS_VALID (segment_pos))
1540       start_output_offset =
1541           gst_util_uint64_scale (segment_pos - agg_segment->start, rate,
1542           GST_SECOND);
1543 
1544     segment_pos =
1545         gst_segment_position_from_running_time (agg_segment, GST_FORMAT_TIME,
1546         end_running_time);
1547     if (GST_CLOCK_TIME_IS_VALID (segment_pos))
1548       end_output_offset =
1549           gst_util_uint64_scale (segment_pos - agg_segment->start, rate,
1550           GST_SECOND);
1551 
1552     if (start_output_offset == -1 && end_output_offset == -1) {
1553       /* Outside output segment, drop */
1554       pad->priv->position = 0;
1555       pad->priv->size = 0;
1556       pad->priv->output_offset = -1;
1557       GST_DEBUG_OBJECT (pad, "Buffer outside output segment");
1558       return FALSE;
1559     }
1560 
1561     /* Calculate end_output_offset if it was outside the output segment */
1562     if (end_output_offset == -1)
1563       end_output_offset = start_output_offset + pad->priv->size;
1564 
1565     if (end_output_offset < aagg->priv->offset) {
1566       pad->priv->position = 0;
1567       pad->priv->size = 0;
1568       pad->priv->output_offset = -1;
1569       GST_DEBUG_OBJECT (pad,
1570           "Buffer before segment or current position: %" G_GUINT64_FORMAT " < %"
1571           G_GINT64_FORMAT, end_output_offset, aagg->priv->offset);
1572       return FALSE;
1573     }
1574 
1575     if (start_output_offset == -1 || start_output_offset < aagg->priv->offset) {
1576       guint diff;
1577 
1578       if (start_output_offset == -1 && end_output_offset < pad->priv->size) {
1579         diff = pad->priv->size - end_output_offset + aagg->priv->offset;
1580       } else if (start_output_offset == -1) {
1581         start_output_offset = end_output_offset - pad->priv->size;
1582 
1583         if (start_output_offset < aagg->priv->offset)
1584           diff = aagg->priv->offset - start_output_offset;
1585         else
1586           diff = 0;
1587       } else {
1588         diff = aagg->priv->offset - start_output_offset;
1589       }
1590 
1591       pad->priv->position += diff;
1592       if (pad->priv->position >= pad->priv->size) {
1593         /* Empty buffer, drop */
1594         pad->priv->position = 0;
1595         pad->priv->size = 0;
1596         pad->priv->output_offset = -1;
1597         GST_DEBUG_OBJECT (pad,
1598             "Buffer before segment or current position: %" G_GUINT64_FORMAT
1599             " < %" G_GINT64_FORMAT, end_output_offset, aagg->priv->offset);
1600         return FALSE;
1601       }
1602     }
1603 
1604     if (start_output_offset == -1 || start_output_offset < aagg->priv->offset)
1605       pad->priv->output_offset = aagg->priv->offset;
1606     else
1607       pad->priv->output_offset = start_output_offset;
1608 
1609     GST_DEBUG_OBJECT (pad,
1610         "Buffer resynced: Pad offset %" G_GUINT64_FORMAT
1611         ", current audio aggregator offset %" G_GINT64_FORMAT,
1612         pad->priv->output_offset, aagg->priv->offset);
1613   }
1614 
1615 done:
1616 
1617   GST_LOG_OBJECT (pad,
1618       "Queued new buffer at offset %" G_GUINT64_FORMAT,
1619       pad->priv->output_offset);
1620 
1621   return TRUE;
1622 }
1623 
1624 /* Called with pad object lock held */
1625 
1626 static gboolean
gst_audio_aggregator_mix_buffer(GstAudioAggregator * aagg,GstAudioAggregatorPad * pad,GstBuffer * inbuf,GstBuffer * outbuf,guint blocksize)1627 gst_audio_aggregator_mix_buffer (GstAudioAggregator * aagg,
1628     GstAudioAggregatorPad * pad, GstBuffer * inbuf, GstBuffer * outbuf,
1629     guint blocksize)
1630 {
1631   guint overlap;
1632   guint out_start;
1633   gboolean filled;
1634   guint in_offset;
1635   gboolean pad_changed = FALSE;
1636 
1637   /* Overlap => mix */
1638   if (aagg->priv->offset < pad->priv->output_offset)
1639     out_start = pad->priv->output_offset - aagg->priv->offset;
1640   else
1641     out_start = 0;
1642 
1643   overlap = pad->priv->size - pad->priv->position;
1644   if (overlap > blocksize - out_start)
1645     overlap = blocksize - out_start;
1646 
1647   if (GST_BUFFER_FLAG_IS_SET (inbuf, GST_BUFFER_FLAG_GAP)) {
1648     /* skip gap buffer */
1649     GST_LOG_OBJECT (pad, "skipping GAP buffer");
1650     pad->priv->output_offset += pad->priv->size - pad->priv->position;
1651     pad->priv->position = pad->priv->size;
1652 
1653     gst_buffer_replace (&pad->priv->buffer, NULL);
1654     gst_buffer_replace (&pad->priv->input_buffer, NULL);
1655     return FALSE;
1656   }
1657 
1658   gst_buffer_ref (inbuf);
1659   in_offset = pad->priv->position;
1660   GST_OBJECT_UNLOCK (pad);
1661   GST_OBJECT_UNLOCK (aagg);
1662 
1663   filled = GST_AUDIO_AGGREGATOR_GET_CLASS (aagg)->aggregate_one_buffer (aagg,
1664       pad, inbuf, in_offset, outbuf, out_start, overlap);
1665 
1666   GST_OBJECT_LOCK (aagg);
1667   GST_OBJECT_LOCK (pad);
1668 
1669   pad_changed = (inbuf != pad->priv->buffer);
1670   gst_buffer_unref (inbuf);
1671 
1672   if (filled)
1673     GST_BUFFER_FLAG_UNSET (outbuf, GST_BUFFER_FLAG_GAP);
1674 
1675   if (pad_changed)
1676     return FALSE;
1677 
1678   pad->priv->position += overlap;
1679   pad->priv->output_offset += overlap;
1680 
1681   if (pad->priv->position == pad->priv->size) {
1682     /* Buffer done, drop it */
1683     gst_buffer_replace (&pad->priv->buffer, NULL);
1684     gst_buffer_replace (&pad->priv->input_buffer, NULL);
1685     GST_LOG_OBJECT (pad, "Finished mixing buffer, waiting for next");
1686     return FALSE;
1687   }
1688 
1689   return TRUE;
1690 }
1691 
1692 static GstBuffer *
gst_audio_aggregator_create_output_buffer(GstAudioAggregator * aagg,guint num_frames)1693 gst_audio_aggregator_create_output_buffer (GstAudioAggregator * aagg,
1694     guint num_frames)
1695 {
1696   GstAllocator *allocator;
1697   GstAllocationParams params;
1698   GstBuffer *outbuf;
1699   GstMapInfo outmap;
1700   GstAggregator *agg = GST_AGGREGATOR (aagg);
1701   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1702 
1703   gst_aggregator_get_allocator (GST_AGGREGATOR (aagg), &allocator, &params);
1704 
1705   GST_DEBUG ("Creating output buffer with size %d",
1706       num_frames * GST_AUDIO_INFO_BPF (&srcpad->info));
1707 
1708   outbuf = gst_buffer_new_allocate (allocator, num_frames *
1709       GST_AUDIO_INFO_BPF (&srcpad->info), &params);
1710 
1711   if (allocator)
1712     gst_object_unref (allocator);
1713 
1714   gst_buffer_map (outbuf, &outmap, GST_MAP_WRITE);
1715   gst_audio_format_fill_silence (srcpad->info.finfo, outmap.data, outmap.size);
1716   gst_buffer_unmap (outbuf, &outmap);
1717 
1718   return outbuf;
1719 }
1720 
1721 static gboolean
sync_pad_values(GstElement * aagg,GstPad * pad,gpointer user_data)1722 sync_pad_values (GstElement * aagg, GstPad * pad, gpointer user_data)
1723 {
1724   GstAudioAggregatorPad *aapad = GST_AUDIO_AGGREGATOR_PAD (pad);
1725   GstAggregatorPad *bpad = GST_AGGREGATOR_PAD_CAST (pad);
1726   GstClockTime timestamp, stream_time;
1727 
1728   if (aapad->priv->buffer == NULL)
1729     return TRUE;
1730 
1731   timestamp = GST_BUFFER_PTS (aapad->priv->buffer);
1732   GST_OBJECT_LOCK (bpad);
1733   stream_time = gst_segment_to_stream_time (&bpad->segment, GST_FORMAT_TIME,
1734       timestamp);
1735   GST_OBJECT_UNLOCK (bpad);
1736 
1737   /* sync object properties on stream time */
1738   /* TODO: Ideally we would want to do that on every sample */
1739   if (GST_CLOCK_TIME_IS_VALID (stream_time))
1740     gst_object_sync_values (GST_OBJECT_CAST (pad), stream_time);
1741 
1742   return TRUE;
1743 }
1744 
1745 static GstFlowReturn
gst_audio_aggregator_aggregate(GstAggregator * agg,gboolean timeout)1746 gst_audio_aggregator_aggregate (GstAggregator * agg, gboolean timeout)
1747 {
1748   /* Calculate the current output offset/timestamp and offset_end/timestamp_end.
1749    * Allocate a silence buffer for this and store it.
1750    *
1751    * For all pads:
1752    * 1) Once per input buffer (cached)
1753    *   1) Check discont (flag and timestamp with tolerance)
1754    *   2) If discont or new, resync. That means:
1755    *     1) Drop all start data of the buffer that comes before
1756    *        the current position/offset.
1757    *     2) Calculate the offset (output segment!) that the first
1758    *        frame of the input buffer corresponds to. Base this on
1759    *        the running time.
1760    *
1761    * 2) If the current pad's offset/offset_end overlaps with the output
1762    *    offset/offset_end, mix it at the appropiate position in the output
1763    *    buffer and advance the pad's position. Remember if this pad needs
1764    *    a new buffer to advance behind the output offset_end.
1765    *
1766    * If we had no pad with a buffer, go EOS.
1767    *
1768    * If we had at least one pad that did not advance behind output
1769    * offset_end, let aggregate be called again for the current
1770    * output offset/offset_end.
1771    */
1772   GstElement *element;
1773   GstAudioAggregator *aagg;
1774   GList *iter;
1775   GstFlowReturn ret;
1776   GstBuffer *outbuf = NULL;
1777   gint64 next_offset;
1778   gint64 next_timestamp;
1779   gint rate, bpf;
1780   gboolean dropped = FALSE;
1781   gboolean is_eos = TRUE;
1782   gboolean is_done = TRUE;
1783   guint blocksize;
1784   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
1785   GstSegment *agg_segment = &GST_AGGREGATOR_PAD (agg->srcpad)->segment;
1786 
1787   element = GST_ELEMENT (agg);
1788   aagg = GST_AUDIO_AGGREGATOR (agg);
1789 
1790   /* Sync pad properties to the stream time */
1791   gst_element_foreach_sink_pad (element, sync_pad_values, NULL);
1792 
1793   GST_AUDIO_AGGREGATOR_LOCK (aagg);
1794   GST_OBJECT_LOCK (agg);
1795 
1796   /* Update position from the segment start/stop if needed */
1797   if (agg_segment->position == -1) {
1798     if (agg_segment->rate > 0.0)
1799       agg_segment->position = agg_segment->start;
1800     else
1801       agg_segment->position = agg_segment->stop;
1802   }
1803 
1804   if (G_UNLIKELY (srcpad->info.finfo->format == GST_AUDIO_FORMAT_UNKNOWN)) {
1805     if (timeout) {
1806       GST_DEBUG_OBJECT (aagg,
1807           "Got timeout before receiving any caps, don't output anything");
1808 
1809       /* Advance position */
1810       if (agg_segment->rate > 0.0)
1811         agg_segment->position += aagg->priv->output_buffer_duration;
1812       else if (agg_segment->position > aagg->priv->output_buffer_duration)
1813         agg_segment->position -= aagg->priv->output_buffer_duration;
1814       else
1815         agg_segment->position = 0;
1816 
1817       GST_OBJECT_UNLOCK (agg);
1818       GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1819       return GST_AGGREGATOR_FLOW_NEED_DATA;
1820     } else {
1821       GST_OBJECT_UNLOCK (agg);
1822       goto not_negotiated;
1823     }
1824   }
1825 
1826   rate = GST_AUDIO_INFO_RATE (&srcpad->info);
1827   bpf = GST_AUDIO_INFO_BPF (&srcpad->info);
1828 
1829   if (aagg->priv->offset == -1) {
1830     aagg->priv->offset =
1831         gst_util_uint64_scale (agg_segment->position - agg_segment->start, rate,
1832         GST_SECOND);
1833     GST_DEBUG_OBJECT (aagg, "Starting at offset %" G_GINT64_FORMAT,
1834         aagg->priv->offset);
1835   }
1836 
1837   blocksize = gst_util_uint64_scale (aagg->priv->output_buffer_duration,
1838       rate, GST_SECOND);
1839   blocksize = MAX (1, blocksize);
1840 
1841   /* FIXME: Reverse mixing does not work at all yet */
1842   if (agg_segment->rate > 0.0) {
1843     next_offset = aagg->priv->offset + blocksize;
1844   } else {
1845     next_offset = aagg->priv->offset - blocksize;
1846   }
1847 
1848   /* Use the sample counter, which will never accumulate rounding errors */
1849   next_timestamp =
1850       agg_segment->start + gst_util_uint64_scale (next_offset, GST_SECOND,
1851       rate);
1852 
1853   if (aagg->priv->current_buffer == NULL) {
1854     GST_OBJECT_UNLOCK (agg);
1855     aagg->priv->current_buffer =
1856         GST_AUDIO_AGGREGATOR_GET_CLASS (aagg)->create_output_buffer (aagg,
1857         blocksize);
1858     /* Be careful, some things could have changed ? */
1859     GST_OBJECT_LOCK (agg);
1860     GST_BUFFER_FLAG_SET (aagg->priv->current_buffer, GST_BUFFER_FLAG_GAP);
1861   }
1862   outbuf = aagg->priv->current_buffer;
1863 
1864   GST_LOG_OBJECT (agg,
1865       "Starting to mix %u samples for offset %" G_GINT64_FORMAT
1866       " with timestamp %" GST_TIME_FORMAT, blocksize,
1867       aagg->priv->offset, GST_TIME_ARGS (agg_segment->position));
1868 
1869   for (iter = element->sinkpads; iter; iter = iter->next) {
1870     GstAudioAggregatorPad *pad = (GstAudioAggregatorPad *) iter->data;
1871     GstAggregatorPad *aggpad = (GstAggregatorPad *) iter->data;
1872     gboolean pad_eos = gst_aggregator_pad_is_eos (aggpad);
1873 
1874     if (!pad_eos)
1875       is_eos = FALSE;
1876 
1877     pad->priv->input_buffer = gst_aggregator_pad_peek_buffer (aggpad);
1878 
1879     GST_OBJECT_LOCK (pad);
1880     if (!pad->priv->input_buffer) {
1881       if (timeout) {
1882         if (pad->priv->output_offset < next_offset) {
1883           gint64 diff = next_offset - pad->priv->output_offset;
1884           GST_DEBUG_OBJECT (pad, "Timeout, missing %" G_GINT64_FORMAT
1885               " frames (%" GST_TIME_FORMAT ")", diff,
1886               GST_TIME_ARGS (gst_util_uint64_scale (diff, GST_SECOND,
1887                       GST_AUDIO_INFO_RATE (&srcpad->info))));
1888         }
1889       } else if (!pad_eos) {
1890         is_done = FALSE;
1891       }
1892       GST_OBJECT_UNLOCK (pad);
1893       continue;
1894     } else if (!GST_AUDIO_INFO_IS_VALID (&pad->info)) {
1895       GST_OBJECT_UNLOCK (pad);
1896       GST_OBJECT_UNLOCK (agg);
1897       goto not_negotiated;
1898     }
1899 
1900     /* New buffer? */
1901     if (!pad->priv->buffer) {
1902       if (GST_AUDIO_AGGREGATOR_PAD_GET_CLASS (pad)->convert_buffer)
1903         pad->priv->buffer =
1904             gst_audio_aggregator_convert_buffer
1905             (aagg, GST_PAD (pad), &pad->info, &srcpad->info,
1906             pad->priv->input_buffer);
1907       else
1908         pad->priv->buffer = gst_buffer_ref (pad->priv->input_buffer);
1909 
1910       if (!gst_audio_aggregator_fill_buffer (aagg, pad)) {
1911         gst_buffer_replace (&pad->priv->buffer, NULL);
1912         gst_buffer_replace (&pad->priv->input_buffer, NULL);
1913         pad->priv->buffer = NULL;
1914         dropped = TRUE;
1915         GST_OBJECT_UNLOCK (pad);
1916 
1917         gst_aggregator_pad_drop_buffer (aggpad);
1918         continue;
1919       }
1920     } else {
1921       gst_buffer_unref (pad->priv->input_buffer);
1922     }
1923 
1924     if (!pad->priv->buffer && !dropped && pad_eos) {
1925       GST_DEBUG_OBJECT (aggpad, "Pad is in EOS state");
1926       GST_OBJECT_UNLOCK (pad);
1927       continue;
1928     }
1929 
1930     g_assert (pad->priv->buffer);
1931 
1932     /* This pad is lagging behind, we need to update the offset
1933      * and maybe drop the current buffer */
1934     if (pad->priv->output_offset < aagg->priv->offset) {
1935       gint64 diff = aagg->priv->offset - pad->priv->output_offset;
1936       gint64 odiff = diff;
1937 
1938       if (pad->priv->position + diff > pad->priv->size)
1939         diff = pad->priv->size - pad->priv->position;
1940       pad->priv->position += diff;
1941       pad->priv->output_offset += diff;
1942 
1943       if (pad->priv->position == pad->priv->size) {
1944         GST_DEBUG_OBJECT (pad, "Buffer was late by %" GST_TIME_FORMAT
1945             ", dropping %" GST_PTR_FORMAT,
1946             GST_TIME_ARGS (gst_util_uint64_scale (odiff, GST_SECOND,
1947                     GST_AUDIO_INFO_RATE (&srcpad->info))), pad->priv->buffer);
1948         /* Buffer done, drop it */
1949         gst_buffer_replace (&pad->priv->buffer, NULL);
1950         gst_buffer_replace (&pad->priv->input_buffer, NULL);
1951         dropped = TRUE;
1952         GST_OBJECT_UNLOCK (pad);
1953         gst_aggregator_pad_drop_buffer (aggpad);
1954         continue;
1955       }
1956     }
1957 
1958     g_assert (pad->priv->buffer);
1959 
1960     if (pad->priv->output_offset >= aagg->priv->offset
1961         && pad->priv->output_offset < aagg->priv->offset + blocksize) {
1962       gboolean drop_buf;
1963 
1964       GST_LOG_OBJECT (aggpad, "Mixing buffer for current offset");
1965       drop_buf = !gst_audio_aggregator_mix_buffer (aagg, pad, pad->priv->buffer,
1966           outbuf, blocksize);
1967       if (pad->priv->output_offset >= next_offset) {
1968         GST_LOG_OBJECT (pad,
1969             "Pad is at or after current offset: %" G_GUINT64_FORMAT " >= %"
1970             G_GINT64_FORMAT, pad->priv->output_offset, next_offset);
1971       } else {
1972         is_done = FALSE;
1973       }
1974       if (drop_buf) {
1975         GST_OBJECT_UNLOCK (pad);
1976         gst_aggregator_pad_drop_buffer (aggpad);
1977         continue;
1978       }
1979     }
1980 
1981     GST_OBJECT_UNLOCK (pad);
1982   }
1983   GST_OBJECT_UNLOCK (agg);
1984 
1985   if (dropped) {
1986     /* We dropped a buffer, retry */
1987     GST_LOG_OBJECT (aagg, "A pad dropped a buffer, wait for the next one");
1988     GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1989     return GST_AGGREGATOR_FLOW_NEED_DATA;
1990   }
1991 
1992   if (!is_done && !is_eos) {
1993     /* Get more buffers */
1994     GST_LOG_OBJECT (aagg,
1995         "We're not done yet for the current offset, waiting for more data");
1996     GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
1997     return GST_AGGREGATOR_FLOW_NEED_DATA;
1998   }
1999 
2000   if (is_eos) {
2001     gint64 max_offset = 0;
2002 
2003     GST_DEBUG_OBJECT (aagg, "We're EOS");
2004 
2005     GST_OBJECT_LOCK (agg);
2006     for (iter = GST_ELEMENT (agg)->sinkpads; iter; iter = iter->next) {
2007       GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (iter->data);
2008 
2009       max_offset = MAX ((gint64) max_offset, (gint64) pad->priv->output_offset);
2010     }
2011     GST_OBJECT_UNLOCK (agg);
2012 
2013     /* This means EOS or nothing mixed in at all */
2014     if (aagg->priv->offset == max_offset) {
2015       gst_buffer_replace (&aagg->priv->current_buffer, NULL);
2016       GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2017       return GST_FLOW_EOS;
2018     }
2019 
2020     if (max_offset <= next_offset) {
2021       GST_DEBUG_OBJECT (aagg,
2022           "Last buffer is incomplete: %" G_GUINT64_FORMAT " <= %"
2023           G_GINT64_FORMAT, max_offset, next_offset);
2024       next_offset = max_offset;
2025       next_timestamp =
2026           agg_segment->start + gst_util_uint64_scale (next_offset, GST_SECOND,
2027           rate);
2028 
2029       if (next_offset > aagg->priv->offset)
2030         gst_buffer_resize (outbuf, 0, (next_offset - aagg->priv->offset) * bpf);
2031     }
2032   }
2033 
2034   /* set timestamps on the output buffer */
2035   GST_OBJECT_LOCK (agg);
2036   if (agg_segment->rate > 0.0) {
2037     GST_BUFFER_PTS (outbuf) = agg_segment->position;
2038     GST_BUFFER_OFFSET (outbuf) = aagg->priv->offset;
2039     GST_BUFFER_OFFSET_END (outbuf) = next_offset;
2040     GST_BUFFER_DURATION (outbuf) = next_timestamp - agg_segment->position;
2041   } else {
2042     GST_BUFFER_PTS (outbuf) = next_timestamp;
2043     GST_BUFFER_OFFSET (outbuf) = next_offset;
2044     GST_BUFFER_OFFSET_END (outbuf) = aagg->priv->offset;
2045     GST_BUFFER_DURATION (outbuf) = agg_segment->position - next_timestamp;
2046   }
2047 
2048   GST_OBJECT_UNLOCK (agg);
2049 
2050   /* send it out */
2051   GST_LOG_OBJECT (aagg,
2052       "pushing outbuf %p, timestamp %" GST_TIME_FORMAT " offset %"
2053       G_GINT64_FORMAT, outbuf, GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)),
2054       GST_BUFFER_OFFSET (outbuf));
2055 
2056   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2057 
2058   ret = gst_aggregator_finish_buffer (agg, outbuf);
2059   aagg->priv->current_buffer = NULL;
2060 
2061   GST_LOG_OBJECT (aagg, "pushed outbuf, result = %s", gst_flow_get_name (ret));
2062 
2063   GST_AUDIO_AGGREGATOR_LOCK (aagg);
2064   GST_OBJECT_LOCK (agg);
2065   aagg->priv->offset = next_offset;
2066   agg_segment->position = next_timestamp;
2067 
2068   /* If there was a timeout and there was a gap in data in out of the streams,
2069    * then it's a very good time to for a resync with the timestamps.
2070    */
2071   if (timeout) {
2072     for (iter = element->sinkpads; iter; iter = iter->next) {
2073       GstAudioAggregatorPad *pad = GST_AUDIO_AGGREGATOR_PAD (iter->data);
2074 
2075       GST_OBJECT_LOCK (pad);
2076       if (pad->priv->output_offset < aagg->priv->offset)
2077         pad->priv->output_offset = -1;
2078       GST_OBJECT_UNLOCK (pad);
2079     }
2080   }
2081   GST_OBJECT_UNLOCK (agg);
2082   GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2083 
2084   return ret;
2085   /* ERRORS */
2086 not_negotiated:
2087   {
2088     GST_AUDIO_AGGREGATOR_UNLOCK (aagg);
2089     GST_ELEMENT_ERROR (aagg, STREAM, FORMAT, (NULL),
2090         ("Unknown data received, not negotiated"));
2091     return GST_FLOW_NOT_NEGOTIATED;
2092   }
2093 }
2094