1 /* GStreamer aggregator base class
2 * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3 * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4 *
5 * gstaggregator.c:
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
16 *
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
21 */
22 /**
23 * SECTION: gstaggregator
24 * @title: GstAggregator
25 * @short_description: Base class for mixers and muxers, manages a set of input
26 * pads and aggregates their streams
27 * @see_also: gstcollectpads for historical reasons.
28 *
29 * Manages a set of pads with the purpose of aggregating their buffers.
30 * Control is given to the subclass when all pads have data.
31 *
32 * * Base class for mixers and muxers. Subclasses should at least implement
33 * the #GstAggregatorClass.aggregate() virtual method.
34 *
35 * * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a
36 * #GstPadQueryFunction to queue all serialized data packets per sink pad.
37 * Subclasses should not overwrite those, but instead implement
38 * #GstAggregatorClass.sink_event() and #GstAggregatorClass.sink_query() as
39 * needed.
40 *
41 * * When data is queued on all pads, the aggregate vmethod is called.
42 *
43 * * One can peek at the data on any given GstAggregatorPad with the
44 * gst_aggregator_pad_peek_buffer () method, and remove it from the pad
45 * with the gst_aggregator_pad_pop_buffer () method. When a buffer
46 * has been taken with pop_buffer (), a new buffer can be queued
47 * on that pad.
48 *
49 * * If the subclass wishes to push a buffer downstream in its aggregate
50 * implementation, it should do so through the
51 * gst_aggregator_finish_buffer () method. This method will take care
52 * of sending and ordering mandatory events such as stream start, caps
53 * and segment.
54 *
55 * * Same goes for EOS events, which should not be pushed directly by the
56 * subclass, it should instead return GST_FLOW_EOS in its aggregate
57 * implementation.
58 *
59 * * Note that the aggregator logic regarding gap event handling is to turn
60 * these into gap buffers with matching PTS and duration. It will also
61 * flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
62 * to ease their identification and subsequent processing.
63 *
64 * * Subclasses must use (a subclass of) #GstAggregatorPad for both their
65 * sink and source pads.
66 * See gst_element_class_add_static_pad_template_with_gtype().
67 *
68 * This class used to live in gst-plugins-bad and was moved to core.
69 *
70 * Since: 1.14
71 */
72
73 /**
74 * SECTION: gstaggregatorpad
75 * @title: GstAggregatorPad
76 * @short_description: #GstPad subclass for pads managed by #GstAggregator
77 * @see_also: gstcollectpads for historical reasons.
78 *
79 * Pads managed by a #GstAggregor subclass.
80 *
81 * This class used to live in gst-plugins-bad and was moved to core.
82 *
83 * Since: 1.14
84 */
85
86 #ifdef HAVE_CONFIG_H
87 # include "config.h"
88 #endif
89
90 #include <string.h> /* strlen */
91
92 #include "gstaggregator.h"
93
94 typedef enum
95 {
96 GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
97 GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
98 GST_AGGREGATOR_START_TIME_SELECTION_SET
99 } GstAggregatorStartTimeSelection;
100
101 static GType
gst_aggregator_start_time_selection_get_type(void)102 gst_aggregator_start_time_selection_get_type (void)
103 {
104 static GType gtype = 0;
105
106 if (gtype == 0) {
107 static const GEnumValue values[] = {
108 {GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
109 "Start at 0 running time (default)", "zero"},
110 {GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
111 "Start at first observed input running time", "first"},
112 {GST_AGGREGATOR_START_TIME_SELECTION_SET,
113 "Set start time with start-time property", "set"},
114 {0, NULL, NULL}
115 };
116
117 gtype = g_enum_register_static ("GstAggregatorStartTimeSelection", values);
118 }
119 return gtype;
120 }
121
122 /* Might become API */
123 #if 0
124 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
125 const GstTagList * tags, GstTagMergeMode mode);
126 #endif
127 static void gst_aggregator_set_latency_property (GstAggregator * agg,
128 GstClockTime latency);
129 static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
130
131 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
132
133 static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad,
134 GstBuffer * buffer);
135
136 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
137 #define GST_CAT_DEFAULT aggregator_debug
138
139 /* Locking order, locks in this element must always be taken in this order
140 *
141 * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
142 * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
143 * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
144 * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
145 * standard element object lock -> GST_OBJECT_LOCK(agg)
146 * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
147 * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
148 * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
149 */
150
151 /* GstAggregatorPad definitions */
152 #define PAD_LOCK(pad) G_STMT_START { \
153 GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p", \
154 g_thread_self()); \
155 g_mutex_lock(&pad->priv->lock); \
156 GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p", \
157 g_thread_self()); \
158 } G_STMT_END
159
160 #define PAD_UNLOCK(pad) G_STMT_START { \
161 GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p", \
162 g_thread_self()); \
163 g_mutex_unlock(&pad->priv->lock); \
164 GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p", \
165 g_thread_self()); \
166 } G_STMT_END
167
168
169 #define PAD_WAIT_EVENT(pad) G_STMT_START { \
170 GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p", \
171 g_thread_self()); \
172 g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond), \
173 (&((GstAggregatorPad*)pad)->priv->lock)); \
174 GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
175 g_thread_self()); \
176 } G_STMT_END
177
178 #define PAD_BROADCAST_EVENT(pad) G_STMT_START { \
179 GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p", \
180 g_thread_self()); \
181 g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond)); \
182 } G_STMT_END
183
184
185 #define PAD_FLUSH_LOCK(pad) G_STMT_START { \
186 GST_TRACE_OBJECT (pad, "Taking lock from thread %p", \
187 g_thread_self()); \
188 g_mutex_lock(&pad->priv->flush_lock); \
189 GST_TRACE_OBJECT (pad, "Took lock from thread %p", \
190 g_thread_self()); \
191 } G_STMT_END
192
193 #define PAD_FLUSH_UNLOCK(pad) G_STMT_START { \
194 GST_TRACE_OBJECT (pad, "Releasing lock from thread %p", \
195 g_thread_self()); \
196 g_mutex_unlock(&pad->priv->flush_lock); \
197 GST_TRACE_OBJECT (pad, "Release lock from thread %p", \
198 g_thread_self()); \
199 } G_STMT_END
200
201 #define SRC_LOCK(self) G_STMT_START { \
202 GST_TRACE_OBJECT (self, "Taking src lock from thread %p", \
203 g_thread_self()); \
204 g_mutex_lock(&self->priv->src_lock); \
205 GST_TRACE_OBJECT (self, "Took src lock from thread %p", \
206 g_thread_self()); \
207 } G_STMT_END
208
209 #define SRC_UNLOCK(self) G_STMT_START { \
210 GST_TRACE_OBJECT (self, "Releasing src lock from thread %p", \
211 g_thread_self()); \
212 g_mutex_unlock(&self->priv->src_lock); \
213 GST_TRACE_OBJECT (self, "Released src lock from thread %p", \
214 g_thread_self()); \
215 } G_STMT_END
216
217 #define SRC_WAIT(self) G_STMT_START { \
218 GST_LOG_OBJECT (self, "Waiting for src on thread %p", \
219 g_thread_self()); \
220 g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock)); \
221 GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p", \
222 g_thread_self()); \
223 } G_STMT_END
224
225 #define SRC_BROADCAST(self) G_STMT_START { \
226 GST_LOG_OBJECT (self, "Signaling src from thread %p", \
227 g_thread_self()); \
228 if (self->priv->aggregate_id) \
229 gst_clock_id_unschedule (self->priv->aggregate_id); \
230 g_cond_broadcast(&(self->priv->src_cond)); \
231 } G_STMT_END
232
233 struct _GstAggregatorPadPrivate
234 {
235 /* Following fields are protected by the PAD_LOCK */
236 GstFlowReturn flow_return;
237 gboolean pending_flush_start;
238 gboolean pending_flush_stop;
239
240 gboolean first_buffer;
241
242 GQueue data; /* buffers, events and queries */
243 GstBuffer *clipped_buffer;
244 guint num_buffers;
245
246 /* used to track fill state of queues, only used with live-src and when
247 * latency property is set to > 0 */
248 GstClockTime head_position;
249 GstClockTime tail_position;
250 GstClockTime head_time; /* running time */
251 GstClockTime tail_time;
252 GstClockTime time_level; /* how much head is ahead of tail */
253 GstSegment head_segment; /* segment before the queue */
254
255 gboolean negotiated;
256
257 gboolean eos;
258
259 GMutex lock;
260 GCond event_cond;
261 /* This lock prevents a flush start processing happening while
262 * the chain function is also happening.
263 */
264 GMutex flush_lock;
265
266 /* properties */
267 gboolean emit_signals;
268 };
269
270 /* Must be called with PAD_LOCK held */
271 static void
gst_aggregator_pad_reset_unlocked(GstAggregatorPad * aggpad)272 gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
273 {
274 aggpad->priv->eos = FALSE;
275 aggpad->priv->flow_return = GST_FLOW_OK;
276 GST_OBJECT_LOCK (aggpad);
277 gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
278 gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
279 GST_OBJECT_UNLOCK (aggpad);
280 aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
281 aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
282 aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
283 aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
284 aggpad->priv->time_level = 0;
285 aggpad->priv->first_buffer = TRUE;
286 }
287
288 static gboolean
gst_aggregator_pad_flush(GstAggregatorPad * aggpad,GstAggregator * agg)289 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
290 {
291 GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
292
293 PAD_LOCK (aggpad);
294 gst_aggregator_pad_reset_unlocked (aggpad);
295 PAD_UNLOCK (aggpad);
296
297 if (klass->flush)
298 return (klass->flush (aggpad, agg) == GST_FLOW_OK);
299
300 return TRUE;
301 }
302
303 /*************************************
304 * GstAggregator implementation *
305 *************************************/
306 static GstElementClass *aggregator_parent_class = NULL;
307 static gint aggregator_private_offset = 0;
308
309 /* All members are protected by the object lock unless otherwise noted */
310
311 struct _GstAggregatorPrivate
312 {
313 gint max_padserial;
314
315 /* Our state is >= PAUSED */
316 gboolean running; /* protected by src_lock */
317
318 /* seqnum from seek or segment,
319 * to be applied to synthetic segment/eos events */
320 gint seqnum;
321 gboolean send_stream_start; /* protected by srcpad stream lock */
322 gboolean send_segment;
323 gboolean flush_seeking;
324 gboolean pending_flush_start;
325 gboolean send_eos; /* protected by srcpad stream lock */
326
327 GstCaps *srccaps; /* protected by the srcpad stream lock */
328
329 GstTagList *tags;
330 gboolean tags_changed;
331
332 gboolean peer_latency_live; /* protected by src_lock */
333 GstClockTime peer_latency_min; /* protected by src_lock */
334 GstClockTime peer_latency_max; /* protected by src_lock */
335 gboolean has_peer_latency; /* protected by src_lock */
336
337 GstClockTime sub_latency_min; /* protected by src_lock */
338 GstClockTime sub_latency_max; /* protected by src_lock */
339
340 GstClockTime upstream_latency_min; /* protected by src_lock */
341
342 /* aggregate */
343 GstClockID aggregate_id; /* protected by src_lock */
344 GMutex src_lock;
345 GCond src_cond;
346
347 gboolean first_buffer; /* protected by object lock */
348 GstAggregatorStartTimeSelection start_time_selection;
349 GstClockTime start_time;
350
351 /* protected by the object lock */
352 GstQuery *allocation_query;
353 GstAllocator *allocator;
354 GstBufferPool *pool;
355 GstAllocationParams allocation_params;
356
357 /* properties */
358 gint64 latency; /* protected by both src_lock and all pad locks */
359 };
360
361 /* Seek event forwarding helper */
362 typedef struct
363 {
364 /* parameters */
365 GstEvent *event;
366 gboolean flush;
367 gboolean only_to_active_pads;
368
369 /* results */
370 gboolean result;
371 gboolean one_actually_seeked;
372 } EventData;
373
374 #define DEFAULT_LATENCY 0
375 #define DEFAULT_MIN_UPSTREAM_LATENCY 0
376 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
377 #define DEFAULT_START_TIME (-1)
378
379 enum
380 {
381 PROP_0,
382 PROP_LATENCY,
383 PROP_MIN_UPSTREAM_LATENCY,
384 PROP_START_TIME_SELECTION,
385 PROP_START_TIME,
386 PROP_LAST
387 };
388
389 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
390 GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
391
392 static gboolean
gst_aggregator_pad_queue_is_empty(GstAggregatorPad * pad)393 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
394 {
395 return (g_queue_peek_tail (&pad->priv->data) == NULL &&
396 pad->priv->clipped_buffer == NULL);
397 }
398
399 static gboolean
gst_aggregator_check_pads_ready(GstAggregator * self)400 gst_aggregator_check_pads_ready (GstAggregator * self)
401 {
402 GstAggregatorPad *pad = NULL;
403 GList *l, *sinkpads;
404 gboolean have_buffer = TRUE;
405 gboolean have_event_or_query = FALSE;
406
407 GST_LOG_OBJECT (self, "checking pads");
408
409 GST_OBJECT_LOCK (self);
410
411 sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
412 if (sinkpads == NULL)
413 goto no_sinkpads;
414
415 for (l = sinkpads; l != NULL; l = l->next) {
416 pad = l->data;
417
418 PAD_LOCK (pad);
419
420 if (pad->priv->num_buffers == 0) {
421 if (!gst_aggregator_pad_queue_is_empty (pad))
422 have_event_or_query = TRUE;
423 if (!pad->priv->eos) {
424 have_buffer = FALSE;
425
426 /* If not live we need data on all pads, so leave the loop */
427 if (!self->priv->peer_latency_live) {
428 PAD_UNLOCK (pad);
429 goto pad_not_ready;
430 }
431 }
432 } else if (self->priv->peer_latency_live) {
433 /* In live mode, having a single pad with buffers is enough to
434 * generate a start time from it. In non-live mode all pads need
435 * to have a buffer
436 */
437 self->priv->first_buffer = FALSE;
438 }
439
440 PAD_UNLOCK (pad);
441 }
442
443 if (!have_buffer && !have_event_or_query)
444 goto pad_not_ready;
445
446 if (have_buffer)
447 self->priv->first_buffer = FALSE;
448
449 GST_OBJECT_UNLOCK (self);
450 GST_LOG_OBJECT (self, "pads are ready");
451 return TRUE;
452
453 no_sinkpads:
454 {
455 GST_LOG_OBJECT (self, "pads not ready: no sink pads");
456 GST_OBJECT_UNLOCK (self);
457 return FALSE;
458 }
459 pad_not_ready:
460 {
461 if (have_event_or_query)
462 GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
463 " but waking up for serialized event");
464 else
465 GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
466 GST_OBJECT_UNLOCK (self);
467 return have_event_or_query;
468 }
469 }
470
471 static void
gst_aggregator_reset_flow_values(GstAggregator * self)472 gst_aggregator_reset_flow_values (GstAggregator * self)
473 {
474 GST_OBJECT_LOCK (self);
475 self->priv->send_stream_start = TRUE;
476 self->priv->send_segment = TRUE;
477 gst_segment_init (&GST_AGGREGATOR_PAD (self->srcpad)->segment,
478 GST_FORMAT_TIME);
479 self->priv->first_buffer = TRUE;
480 GST_OBJECT_UNLOCK (self);
481 }
482
483 static inline void
gst_aggregator_push_mandatory_events(GstAggregator * self)484 gst_aggregator_push_mandatory_events (GstAggregator * self)
485 {
486 GstAggregatorPrivate *priv = self->priv;
487 GstEvent *segment = NULL;
488 GstEvent *tags = NULL;
489
490 if (self->priv->send_stream_start) {
491 gchar s_id[32];
492
493 GST_INFO_OBJECT (self, "pushing stream start");
494 /* stream-start (FIXME: create id based on input ids) */
495 g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
496 if (!gst_pad_push_event (GST_PAD (self->srcpad),
497 gst_event_new_stream_start (s_id))) {
498 GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
499 }
500 self->priv->send_stream_start = FALSE;
501 }
502
503 if (self->priv->srccaps) {
504
505 GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
506 self->priv->srccaps);
507 if (!gst_pad_push_event (GST_PAD (self->srcpad),
508 gst_event_new_caps (self->priv->srccaps))) {
509 GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
510 }
511 gst_caps_unref (self->priv->srccaps);
512 self->priv->srccaps = NULL;
513 }
514
515 GST_OBJECT_LOCK (self);
516 if (self->priv->send_segment && !self->priv->flush_seeking) {
517 segment =
518 gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment);
519
520 if (!self->priv->seqnum)
521 /* This code-path is in preparation to be able to run without a source
522 * connected. Then we won't have a seq-num from a segment event. */
523 self->priv->seqnum = gst_event_get_seqnum (segment);
524 else
525 gst_event_set_seqnum (segment, self->priv->seqnum);
526 self->priv->send_segment = FALSE;
527
528 GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
529 }
530
531 if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
532 tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
533 priv->tags_changed = FALSE;
534 }
535 GST_OBJECT_UNLOCK (self);
536
537 if (segment)
538 gst_pad_push_event (self->srcpad, segment);
539 if (tags)
540 gst_pad_push_event (self->srcpad, tags);
541
542 }
543
544 /**
545 * gst_aggregator_set_src_caps:
546 * @self: The #GstAggregator
547 * @caps: The #GstCaps to set on the src pad.
548 *
549 * Sets the caps to be used on the src pad.
550 */
551 void
gst_aggregator_set_src_caps(GstAggregator * self,GstCaps * caps)552 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
553 {
554 GST_PAD_STREAM_LOCK (self->srcpad);
555 gst_caps_replace (&self->priv->srccaps, caps);
556 gst_aggregator_push_mandatory_events (self);
557 GST_PAD_STREAM_UNLOCK (self->srcpad);
558 }
559
560 static GstFlowReturn
gst_aggregator_default_finish_buffer(GstAggregator * self,GstBuffer * buffer)561 gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
562 {
563 gst_aggregator_push_mandatory_events (self);
564
565 GST_OBJECT_LOCK (self);
566 if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
567 GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
568 GST_OBJECT_UNLOCK (self);
569 return gst_pad_push (self->srcpad, buffer);
570 } else {
571 GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
572 self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
573 GST_OBJECT_UNLOCK (self);
574 gst_buffer_unref (buffer);
575 return GST_FLOW_OK;
576 }
577 }
578
579 /**
580 * gst_aggregator_finish_buffer:
581 * @aggregator: The #GstAggregator
582 * @buffer: (transfer full): the #GstBuffer to push.
583 *
584 * This method will push the provided output buffer downstream. If needed,
585 * mandatory events such as stream-start, caps, and segment events will be
586 * sent before pushing the buffer.
587 */
588 GstFlowReturn
gst_aggregator_finish_buffer(GstAggregator * aggregator,GstBuffer * buffer)589 gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer)
590 {
591 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
592
593 g_assert (klass->finish_buffer != NULL);
594
595 return klass->finish_buffer (aggregator, buffer);
596 }
597
598 static void
gst_aggregator_push_eos(GstAggregator * self)599 gst_aggregator_push_eos (GstAggregator * self)
600 {
601 GstEvent *event;
602 gst_aggregator_push_mandatory_events (self);
603
604 event = gst_event_new_eos ();
605
606 GST_OBJECT_LOCK (self);
607 self->priv->send_eos = FALSE;
608 gst_event_set_seqnum (event, self->priv->seqnum);
609 GST_OBJECT_UNLOCK (self);
610
611 gst_pad_push_event (self->srcpad, event);
612 }
613
614 static GstClockTime
gst_aggregator_get_next_time(GstAggregator * self)615 gst_aggregator_get_next_time (GstAggregator * self)
616 {
617 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
618
619 if (klass->get_next_time)
620 return klass->get_next_time (self);
621
622 return GST_CLOCK_TIME_NONE;
623 }
624
625 static gboolean
gst_aggregator_wait_and_check(GstAggregator * self,gboolean * timeout)626 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
627 {
628 GstClockTime latency;
629 GstClockTime start;
630 gboolean res;
631
632 *timeout = FALSE;
633
634 SRC_LOCK (self);
635
636 latency = gst_aggregator_get_latency_unlocked (self);
637
638 if (gst_aggregator_check_pads_ready (self)) {
639 GST_DEBUG_OBJECT (self, "all pads have data");
640 SRC_UNLOCK (self);
641
642 return TRUE;
643 }
644
645 /* Before waiting, check if we're actually still running */
646 if (!self->priv->running || !self->priv->send_eos) {
647 SRC_UNLOCK (self);
648
649 return FALSE;
650 }
651
652 start = gst_aggregator_get_next_time (self);
653
654 /* If we're not live, or if we use the running time
655 * of the first buffer as start time, we wait until
656 * all pads have buffers.
657 * Otherwise (i.e. if we are live!), we wait on the clock
658 * and if a pad does not have a buffer in time we ignore
659 * that pad.
660 */
661 GST_OBJECT_LOCK (self);
662 if (!GST_CLOCK_TIME_IS_VALID (latency) ||
663 !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
664 !GST_CLOCK_TIME_IS_VALID (start) ||
665 (self->priv->first_buffer
666 && self->priv->start_time_selection ==
667 GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) {
668 /* We wake up here when something happened, and below
669 * then check if we're ready now. If we return FALSE,
670 * we will be directly called again.
671 */
672 GST_OBJECT_UNLOCK (self);
673 SRC_WAIT (self);
674 } else {
675 GstClockTime base_time, time;
676 GstClock *clock;
677 GstClockReturn status;
678 GstClockTimeDiff jitter;
679
680 GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
681 GST_TIME_ARGS (start));
682
683 base_time = GST_ELEMENT_CAST (self)->base_time;
684 clock = gst_object_ref (GST_ELEMENT_CLOCK (self));
685 GST_OBJECT_UNLOCK (self);
686
687 time = base_time + start;
688 time += latency;
689
690 GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
691 GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
692 " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
693 GST_TIME_ARGS (time),
694 GST_TIME_ARGS (base_time),
695 GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
696 GST_TIME_ARGS (gst_clock_get_time (clock)));
697
698 self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
699 gst_object_unref (clock);
700 SRC_UNLOCK (self);
701
702 jitter = 0;
703 status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
704
705 SRC_LOCK (self);
706 if (self->priv->aggregate_id) {
707 gst_clock_id_unref (self->priv->aggregate_id);
708 self->priv->aggregate_id = NULL;
709 }
710
711 GST_DEBUG_OBJECT (self,
712 "clock returned %d (jitter: %" GST_STIME_FORMAT ")",
713 status, GST_STIME_ARGS (jitter));
714
715 /* we timed out */
716 if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
717 SRC_UNLOCK (self);
718 *timeout = TRUE;
719 return TRUE;
720 }
721 }
722
723 res = gst_aggregator_check_pads_ready (self);
724 SRC_UNLOCK (self);
725
726 return res;
727 }
728
729 typedef struct
730 {
731 gboolean processed_event;
732 GstFlowReturn flow_ret;
733 } DoHandleEventsAndQueriesData;
734
735 static gboolean
gst_aggregator_do_events_and_queries(GstElement * self,GstPad * epad,gpointer user_data)736 gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
737 gpointer user_data)
738 {
739 GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
740 GstAggregator *aggregator = GST_AGGREGATOR_CAST (self);
741 GstEvent *event = NULL;
742 GstQuery *query = NULL;
743 GstAggregatorClass *klass = NULL;
744 DoHandleEventsAndQueriesData *data = user_data;
745
746 do {
747 event = NULL;
748 query = NULL;
749
750 PAD_LOCK (pad);
751 if (pad->priv->clipped_buffer == NULL &&
752 !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
753 if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
754 event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
755 if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
756 query = g_queue_peek_tail (&pad->priv->data);
757 }
758 PAD_UNLOCK (pad);
759 if (event || query) {
760 gboolean ret;
761
762 data->processed_event = TRUE;
763 if (klass == NULL)
764 klass = GST_AGGREGATOR_GET_CLASS (self);
765
766 if (event) {
767 GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
768 gst_event_ref (event);
769 ret = klass->sink_event (aggregator, pad, event);
770
771 PAD_LOCK (pad);
772 if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
773 pad->priv->negotiated = ret;
774 if (!ret)
775 pad->priv->flow_return = data->flow_ret = GST_FLOW_NOT_NEGOTIATED;
776 }
777 if (g_queue_peek_tail (&pad->priv->data) == event)
778 gst_event_unref (g_queue_pop_tail (&pad->priv->data));
779 gst_event_unref (event);
780 } else if (query) {
781 GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
782 ret = klass->sink_query (aggregator, pad, query);
783
784 PAD_LOCK (pad);
785 if (g_queue_peek_tail (&pad->priv->data) == query) {
786 GstStructure *s;
787
788 s = gst_query_writable_structure (query);
789 gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
790 NULL);
791 g_queue_pop_tail (&pad->priv->data);
792 }
793 }
794
795 PAD_BROADCAST_EVENT (pad);
796 PAD_UNLOCK (pad);
797 }
798 } while (event || query);
799
800 return TRUE;
801 }
802
803 static gboolean
gst_aggregator_pad_skip_buffers(GstElement * self,GstPad * epad,gpointer user_data)804 gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
805 gpointer user_data)
806 {
807 GList *item;
808 GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
809 GstAggregator *agg = (GstAggregator *) self;
810 GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
811
812 if (!klass->skip_buffer)
813 return FALSE;
814
815 PAD_LOCK (aggpad);
816
817 item = g_queue_peek_head_link (&aggpad->priv->data);
818 while (item) {
819 GList *next = item->next;
820
821 if (GST_IS_BUFFER (item->data)
822 && klass->skip_buffer (aggpad, agg, item->data)) {
823 GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
824 gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data));
825 gst_buffer_unref (item->data);
826 g_queue_delete_link (&aggpad->priv->data, item);
827 } else {
828 break;
829 }
830
831 item = next;
832 }
833
834 PAD_UNLOCK (aggpad);
835
836 return TRUE;
837 }
838
839 static void
gst_aggregator_pad_set_flushing(GstAggregatorPad * aggpad,GstFlowReturn flow_return,gboolean full)840 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
841 GstFlowReturn flow_return, gboolean full)
842 {
843 GList *item;
844
845 PAD_LOCK (aggpad);
846 if (flow_return == GST_FLOW_NOT_LINKED)
847 aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
848 else
849 aggpad->priv->flow_return = flow_return;
850
851 item = g_queue_peek_head_link (&aggpad->priv->data);
852 while (item) {
853 GList *next = item->next;
854
855 /* In partial flush, we do like the pad, we get rid of non-sticky events
856 * and EOS/SEGMENT.
857 */
858 if (full || GST_IS_BUFFER (item->data) ||
859 GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
860 GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
861 !GST_EVENT_IS_STICKY (item->data)) {
862 if (!GST_IS_QUERY (item->data))
863 gst_mini_object_unref (item->data);
864 g_queue_delete_link (&aggpad->priv->data, item);
865 }
866 item = next;
867 }
868 aggpad->priv->num_buffers = 0;
869 gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
870
871 PAD_BROADCAST_EVENT (aggpad);
872 PAD_UNLOCK (aggpad);
873 }
874
875 static GstFlowReturn
gst_aggregator_default_update_src_caps(GstAggregator * agg,GstCaps * caps,GstCaps ** ret)876 gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps,
877 GstCaps ** ret)
878 {
879 *ret = gst_caps_ref (caps);
880
881 return GST_FLOW_OK;
882 }
883
884 static GstCaps *
gst_aggregator_default_fixate_src_caps(GstAggregator * agg,GstCaps * caps)885 gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
886 {
887 caps = gst_caps_fixate (caps);
888
889 return caps;
890 }
891
892 static gboolean
gst_aggregator_default_negotiated_src_caps(GstAggregator * agg,GstCaps * caps)893 gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
894 {
895 return TRUE;
896 }
897
898
899 /* takes ownership of the pool, allocator and query */
900 static gboolean
gst_aggregator_set_allocation(GstAggregator * self,GstBufferPool * pool,GstAllocator * allocator,GstAllocationParams * params,GstQuery * query)901 gst_aggregator_set_allocation (GstAggregator * self,
902 GstBufferPool * pool, GstAllocator * allocator,
903 GstAllocationParams * params, GstQuery * query)
904 {
905 GstAllocator *oldalloc;
906 GstBufferPool *oldpool;
907 GstQuery *oldquery;
908
909 GST_DEBUG ("storing allocation query");
910
911 GST_OBJECT_LOCK (self);
912 oldpool = self->priv->pool;
913 self->priv->pool = pool;
914
915 oldalloc = self->priv->allocator;
916 self->priv->allocator = allocator;
917
918 oldquery = self->priv->allocation_query;
919 self->priv->allocation_query = query;
920
921 if (params)
922 self->priv->allocation_params = *params;
923 else
924 gst_allocation_params_init (&self->priv->allocation_params);
925 GST_OBJECT_UNLOCK (self);
926
927 if (oldpool) {
928 GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool);
929 gst_buffer_pool_set_active (oldpool, FALSE);
930 gst_object_unref (oldpool);
931 }
932 if (oldalloc) {
933 gst_object_unref (oldalloc);
934 }
935 if (oldquery) {
936 gst_query_unref (oldquery);
937 }
938 return TRUE;
939 }
940
941
942 static gboolean
gst_aggregator_decide_allocation(GstAggregator * self,GstQuery * query)943 gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query)
944 {
945 GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
946
947 if (aggclass->decide_allocation)
948 if (!aggclass->decide_allocation (self, query))
949 return FALSE;
950
951 return TRUE;
952 }
953
954 static gboolean
gst_aggregator_do_allocation(GstAggregator * self,GstCaps * caps)955 gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps)
956 {
957 GstQuery *query;
958 gboolean result = TRUE;
959 GstBufferPool *pool = NULL;
960 GstAllocator *allocator;
961 GstAllocationParams params;
962
963 /* find a pool for the negotiated caps now */
964 GST_DEBUG_OBJECT (self, "doing allocation query");
965 query = gst_query_new_allocation (caps, TRUE);
966 if (!gst_pad_peer_query (self->srcpad, query)) {
967 /* not a problem, just debug a little */
968 GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed");
969 }
970
971 GST_DEBUG_OBJECT (self, "calling decide_allocation");
972 result = gst_aggregator_decide_allocation (self, query);
973
974 GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
975 query);
976
977 if (!result)
978 goto no_decide_allocation;
979
980 /* we got configuration from our peer or the decide_allocation method,
981 * parse them */
982 if (gst_query_get_n_allocation_params (query) > 0) {
983 gst_query_parse_nth_allocation_param (query, 0, &allocator, ¶ms);
984 } else {
985 allocator = NULL;
986 gst_allocation_params_init (¶ms);
987 }
988
989 if (gst_query_get_n_allocation_pools (query) > 0)
990 gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
991
992 /* now store */
993 result =
994 gst_aggregator_set_allocation (self, pool, allocator, ¶ms, query);
995
996 return result;
997
998 /* Errors */
999 no_decide_allocation:
1000 {
1001 GST_WARNING_OBJECT (self, "Failed to decide allocation");
1002 gst_query_unref (query);
1003
1004 return result;
1005 }
1006
1007 }
1008
1009 /* WITH SRC_LOCK held */
1010 static GstFlowReturn
gst_aggregator_update_src_caps(GstAggregator * self)1011 gst_aggregator_update_src_caps (GstAggregator * self)
1012 {
1013 GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
1014 GstCaps *downstream_caps, *template_caps, *caps = NULL;
1015 GstFlowReturn ret = GST_FLOW_OK;
1016
1017 template_caps = gst_pad_get_pad_template_caps (self->srcpad);
1018 downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps);
1019
1020 if (gst_caps_is_empty (downstream_caps)) {
1021 GST_INFO_OBJECT (self, "Downstream caps (%"
1022 GST_PTR_FORMAT ") not compatible with pad template caps (%"
1023 GST_PTR_FORMAT ")", downstream_caps, template_caps);
1024 ret = GST_FLOW_NOT_NEGOTIATED;
1025 goto done;
1026 }
1027
1028 g_assert (agg_klass->update_src_caps);
1029 GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT,
1030 downstream_caps);
1031 ret = agg_klass->update_src_caps (self, downstream_caps, &caps);
1032 if (ret < GST_FLOW_OK) {
1033 GST_WARNING_OBJECT (self, "Subclass failed to update provided caps");
1034 goto done;
1035 } else if (ret == GST_AGGREGATOR_FLOW_NEED_DATA) {
1036 GST_DEBUG_OBJECT (self, "Subclass needs more data to decide on caps");
1037 goto done;
1038 }
1039 if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) {
1040 ret = GST_FLOW_NOT_NEGOTIATED;
1041 goto done;
1042 }
1043 GST_DEBUG_OBJECT (self, " to %" GST_PTR_FORMAT, caps);
1044
1045 #ifdef GST_ENABLE_EXTRA_CHECKS
1046 if (!gst_caps_is_subset (caps, template_caps)) {
1047 GstCaps *intersection;
1048
1049 GST_ERROR_OBJECT (self,
1050 "update_src_caps returned caps %" GST_PTR_FORMAT
1051 " which are not a real subset of the template caps %"
1052 GST_PTR_FORMAT, caps, template_caps);
1053 g_warning ("%s: update_src_caps returned caps which are not a real "
1054 "subset of the filter caps", GST_ELEMENT_NAME (self));
1055
1056 intersection =
1057 gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST);
1058 gst_caps_unref (caps);
1059 caps = intersection;
1060 }
1061 #endif
1062
1063 if (gst_caps_is_any (caps)) {
1064 goto done;
1065 }
1066
1067 if (!gst_caps_is_fixed (caps)) {
1068 g_assert (agg_klass->fixate_src_caps);
1069
1070 GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps);
1071 if (!(caps = agg_klass->fixate_src_caps (self, caps))) {
1072 GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps");
1073 ret = GST_FLOW_NOT_NEGOTIATED;
1074 goto done;
1075 }
1076 GST_DEBUG_OBJECT (self, " to %" GST_PTR_FORMAT, caps);
1077 }
1078
1079 if (agg_klass->negotiated_src_caps) {
1080 if (!agg_klass->negotiated_src_caps (self, caps)) {
1081 GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps");
1082 ret = GST_FLOW_NOT_NEGOTIATED;
1083 goto done;
1084 }
1085 }
1086
1087 gst_aggregator_set_src_caps (self, caps);
1088
1089 if (!gst_aggregator_do_allocation (self, caps)) {
1090 GST_WARNING_OBJECT (self, "Allocation negotiation failed");
1091 ret = GST_FLOW_NOT_NEGOTIATED;
1092 }
1093
1094 done:
1095 gst_caps_unref (downstream_caps);
1096 gst_caps_unref (template_caps);
1097
1098 if (caps)
1099 gst_caps_unref (caps);
1100
1101 return ret;
1102 }
1103
1104 static void
gst_aggregator_aggregate_func(GstAggregator * self)1105 gst_aggregator_aggregate_func (GstAggregator * self)
1106 {
1107 GstAggregatorPrivate *priv = self->priv;
1108 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1109 gboolean timeout = FALSE;
1110
1111 if (self->priv->running == FALSE) {
1112 GST_DEBUG_OBJECT (self, "Not running anymore");
1113 return;
1114 }
1115
1116 GST_LOG_OBJECT (self, "Checking aggregate");
1117 while (priv->send_eos && priv->running) {
1118 GstFlowReturn flow_return = GST_FLOW_OK;
1119 DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK };
1120
1121 gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1122 gst_aggregator_do_events_and_queries, &events_query_data);
1123
1124 if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
1125 goto handle_error;
1126
1127 if (self->priv->peer_latency_live)
1128 gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1129 gst_aggregator_pad_skip_buffers, NULL);
1130
1131 /* Ensure we have buffers ready (either in clipped_buffer or at the head of
1132 * the queue */
1133 if (!gst_aggregator_wait_and_check (self, &timeout))
1134 continue;
1135
1136 events_query_data.processed_event = FALSE;
1137 events_query_data.flow_ret = GST_FLOW_OK;
1138 gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1139 gst_aggregator_do_events_and_queries, &events_query_data);
1140
1141 if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
1142 goto handle_error;
1143
1144 if (events_query_data.processed_event)
1145 continue;
1146
1147 if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
1148 flow_return = gst_aggregator_update_src_caps (self);
1149 if (flow_return != GST_FLOW_OK)
1150 gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1151 if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
1152 flow_return = GST_FLOW_OK;
1153 }
1154
1155 if (timeout || flow_return >= GST_FLOW_OK) {
1156 GST_TRACE_OBJECT (self, "Actually aggregating!");
1157 flow_return = klass->aggregate (self, timeout);
1158 }
1159
1160 if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
1161 continue;
1162
1163 GST_OBJECT_LOCK (self);
1164 if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
1165 /* We don't want to set the pads to flushing, but we want to
1166 * stop the thread, so just break here */
1167 GST_OBJECT_UNLOCK (self);
1168 break;
1169 }
1170 GST_OBJECT_UNLOCK (self);
1171
1172 if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) {
1173 gst_aggregator_push_eos (self);
1174 }
1175
1176 handle_error:
1177 GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
1178
1179 if (flow_return != GST_FLOW_OK) {
1180 GList *item;
1181
1182 GST_OBJECT_LOCK (self);
1183 for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
1184 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
1185
1186 gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
1187 }
1188 GST_OBJECT_UNLOCK (self);
1189 break;
1190 }
1191 }
1192
1193 /* Pause the task here, the only ways to get here are:
1194 * 1) We're stopping, in which case the task is stopped anyway
1195 * 2) We got a flow error above, in which case it might take
1196 * some time to forward the flow return upstream and we
1197 * would otherwise call the task function over and over
1198 * again without doing anything
1199 */
1200 gst_pad_pause_task (self->srcpad);
1201 }
1202
1203 static gboolean
gst_aggregator_start(GstAggregator * self)1204 gst_aggregator_start (GstAggregator * self)
1205 {
1206 GstAggregatorClass *klass;
1207 gboolean result;
1208
1209 self->priv->send_stream_start = TRUE;
1210 self->priv->send_segment = TRUE;
1211 self->priv->send_eos = TRUE;
1212 self->priv->srccaps = NULL;
1213
1214 gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL);
1215
1216 klass = GST_AGGREGATOR_GET_CLASS (self);
1217
1218 if (klass->start)
1219 result = klass->start (self);
1220 else
1221 result = TRUE;
1222
1223 return result;
1224 }
1225
1226 static gboolean
_check_pending_flush_stop(GstAggregatorPad * pad)1227 _check_pending_flush_stop (GstAggregatorPad * pad)
1228 {
1229 gboolean res;
1230
1231 PAD_LOCK (pad);
1232 res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
1233 PAD_UNLOCK (pad);
1234
1235 return res;
1236 }
1237
1238 static gboolean
gst_aggregator_stop_srcpad_task(GstAggregator * self,GstEvent * flush_start)1239 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
1240 {
1241 gboolean res = TRUE;
1242
1243 GST_INFO_OBJECT (self, "%s srcpad task",
1244 flush_start ? "Pausing" : "Stopping");
1245
1246 SRC_LOCK (self);
1247 self->priv->running = FALSE;
1248 SRC_BROADCAST (self);
1249 SRC_UNLOCK (self);
1250
1251 if (flush_start) {
1252 res = gst_pad_push_event (self->srcpad, flush_start);
1253 }
1254
1255 gst_pad_stop_task (self->srcpad);
1256
1257 return res;
1258 }
1259
1260 static void
gst_aggregator_start_srcpad_task(GstAggregator * self)1261 gst_aggregator_start_srcpad_task (GstAggregator * self)
1262 {
1263 GST_INFO_OBJECT (self, "Starting srcpad task");
1264
1265 self->priv->running = TRUE;
1266 gst_pad_start_task (GST_PAD (self->srcpad),
1267 (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
1268 }
1269
1270 static GstFlowReturn
gst_aggregator_flush(GstAggregator * self)1271 gst_aggregator_flush (GstAggregator * self)
1272 {
1273 GstFlowReturn ret = GST_FLOW_OK;
1274 GstAggregatorPrivate *priv = self->priv;
1275 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1276
1277 GST_DEBUG_OBJECT (self, "Flushing everything");
1278 GST_OBJECT_LOCK (self);
1279 priv->send_segment = TRUE;
1280 priv->flush_seeking = FALSE;
1281 priv->tags_changed = FALSE;
1282 GST_OBJECT_UNLOCK (self);
1283 if (klass->flush)
1284 ret = klass->flush (self);
1285
1286 return ret;
1287 }
1288
1289
1290 /* Called with GstAggregator's object lock held */
1291
1292 static gboolean
gst_aggregator_all_flush_stop_received_locked(GstAggregator * self)1293 gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
1294 {
1295 GList *tmp;
1296 GstAggregatorPad *tmppad;
1297
1298 for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1299 tmppad = (GstAggregatorPad *) tmp->data;
1300
1301 if (_check_pending_flush_stop (tmppad) == FALSE) {
1302 GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
1303 tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
1304 return FALSE;
1305 }
1306 }
1307
1308 return TRUE;
1309 }
1310
1311 static void
gst_aggregator_flush_start(GstAggregator * self,GstAggregatorPad * aggpad,GstEvent * event)1312 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
1313 GstEvent * event)
1314 {
1315 GstAggregatorPrivate *priv = self->priv;
1316 GstAggregatorPadPrivate *padpriv = aggpad->priv;
1317
1318 gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
1319
1320 PAD_FLUSH_LOCK (aggpad);
1321 PAD_LOCK (aggpad);
1322 if (padpriv->pending_flush_start) {
1323 GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
1324
1325 padpriv->pending_flush_start = FALSE;
1326 padpriv->pending_flush_stop = TRUE;
1327 }
1328 PAD_UNLOCK (aggpad);
1329
1330 GST_OBJECT_LOCK (self);
1331 if (priv->flush_seeking) {
1332 /* If flush_seeking we forward the first FLUSH_START */
1333 if (priv->pending_flush_start) {
1334 priv->pending_flush_start = FALSE;
1335 GST_OBJECT_UNLOCK (self);
1336
1337 GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
1338 gst_aggregator_stop_srcpad_task (self, event);
1339
1340 GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
1341 GST_PAD_STREAM_LOCK (self->srcpad);
1342 GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
1343 event = NULL;
1344 } else {
1345 GST_OBJECT_UNLOCK (self);
1346 gst_event_unref (event);
1347 }
1348 } else {
1349 GST_OBJECT_UNLOCK (self);
1350 gst_event_unref (event);
1351 }
1352 PAD_FLUSH_UNLOCK (aggpad);
1353 }
1354
1355 /* Must be called with the the PAD_LOCK held */
1356 static void
update_time_level(GstAggregatorPad * aggpad,gboolean head)1357 update_time_level (GstAggregatorPad * aggpad, gboolean head)
1358 {
1359 GstAggregatorPadPrivate *priv = aggpad->priv;
1360
1361 if (head) {
1362 if (GST_CLOCK_TIME_IS_VALID (priv->head_position) &&
1363 priv->head_segment.format == GST_FORMAT_TIME)
1364 priv->head_time = gst_segment_to_running_time (&priv->head_segment,
1365 GST_FORMAT_TIME, priv->head_position);
1366 else
1367 priv->head_time = GST_CLOCK_TIME_NONE;
1368
1369 if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time))
1370 priv->tail_time = priv->head_time;
1371 } else {
1372 if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) &&
1373 aggpad->segment.format == GST_FORMAT_TIME)
1374 priv->tail_time = gst_segment_to_running_time (&aggpad->segment,
1375 GST_FORMAT_TIME, priv->tail_position);
1376 else
1377 priv->tail_time = priv->head_time;
1378 }
1379
1380 if (priv->head_time == GST_CLOCK_TIME_NONE ||
1381 priv->tail_time == GST_CLOCK_TIME_NONE) {
1382 priv->time_level = 0;
1383 return;
1384 }
1385
1386 if (priv->tail_time > priv->head_time)
1387 priv->time_level = 0;
1388 else
1389 priv->time_level = priv->head_time - priv->tail_time;
1390 }
1391
1392
1393 /* GstAggregator vmethods default implementations */
1394 static gboolean
gst_aggregator_default_sink_event(GstAggregator * self,GstAggregatorPad * aggpad,GstEvent * event)1395 gst_aggregator_default_sink_event (GstAggregator * self,
1396 GstAggregatorPad * aggpad, GstEvent * event)
1397 {
1398 gboolean res = TRUE;
1399 GstPad *pad = GST_PAD (aggpad);
1400 GstAggregatorPrivate *priv = self->priv;
1401
1402 GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event);
1403
1404 switch (GST_EVENT_TYPE (event)) {
1405 case GST_EVENT_FLUSH_START:
1406 {
1407 gst_aggregator_flush_start (self, aggpad, event);
1408 /* We forward only in one case: right after flush_seeking */
1409 event = NULL;
1410 goto eat;
1411 }
1412 case GST_EVENT_FLUSH_STOP:
1413 {
1414 gst_aggregator_pad_flush (aggpad, self);
1415 GST_OBJECT_LOCK (self);
1416 if (priv->flush_seeking) {
1417 g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
1418 if (gst_aggregator_all_flush_stop_received_locked (self)) {
1419 GST_OBJECT_UNLOCK (self);
1420 /* That means we received FLUSH_STOP/FLUSH_STOP on
1421 * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
1422 gst_aggregator_flush (self);
1423 gst_pad_push_event (self->srcpad, event);
1424 event = NULL;
1425 SRC_LOCK (self);
1426 priv->send_eos = TRUE;
1427 SRC_BROADCAST (self);
1428 SRC_UNLOCK (self);
1429
1430 GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
1431 GST_PAD_STREAM_UNLOCK (self->srcpad);
1432 gst_aggregator_start_srcpad_task (self);
1433 } else {
1434 GST_OBJECT_UNLOCK (self);
1435 }
1436 } else {
1437 GST_OBJECT_UNLOCK (self);
1438 }
1439
1440 /* We never forward the event */
1441 goto eat;
1442 }
1443 case GST_EVENT_EOS:
1444 {
1445 SRC_LOCK (self);
1446 PAD_LOCK (aggpad);
1447 g_assert (aggpad->priv->num_buffers == 0);
1448 aggpad->priv->eos = TRUE;
1449 PAD_UNLOCK (aggpad);
1450 SRC_BROADCAST (self);
1451 SRC_UNLOCK (self);
1452 goto eat;
1453 }
1454 case GST_EVENT_SEGMENT:
1455 {
1456 PAD_LOCK (aggpad);
1457 GST_OBJECT_LOCK (aggpad);
1458 gst_event_copy_segment (event, &aggpad->segment);
1459 /* We've got a new segment, tail_position is now meaningless
1460 * and may interfere with the time_level calculation
1461 */
1462 aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
1463 update_time_level (aggpad, FALSE);
1464 GST_OBJECT_UNLOCK (aggpad);
1465 PAD_UNLOCK (aggpad);
1466
1467 GST_OBJECT_LOCK (self);
1468 self->priv->seqnum = gst_event_get_seqnum (event);
1469 GST_OBJECT_UNLOCK (self);
1470 goto eat;
1471 }
1472 case GST_EVENT_STREAM_START:
1473 {
1474 goto eat;
1475 }
1476 case GST_EVENT_GAP:
1477 {
1478 GstClockTime pts, endpts;
1479 GstClockTime duration;
1480 GstBuffer *gapbuf;
1481
1482 gst_event_parse_gap (event, &pts, &duration);
1483
1484 if (GST_CLOCK_TIME_IS_VALID (duration))
1485 endpts = pts + duration;
1486 else
1487 endpts = GST_CLOCK_TIME_NONE;
1488
1489 GST_OBJECT_LOCK (aggpad);
1490 res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
1491 &pts, &endpts);
1492 GST_OBJECT_UNLOCK (aggpad);
1493
1494 if (!res) {
1495 GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
1496 goto eat;
1497 }
1498
1499 if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
1500 duration = endpts - pts;
1501 else
1502 duration = GST_CLOCK_TIME_NONE;
1503
1504 gapbuf = gst_buffer_new ();
1505 GST_BUFFER_PTS (gapbuf) = pts;
1506 GST_BUFFER_DURATION (gapbuf) = duration;
1507 GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
1508 GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
1509
1510 /* Remove GAP event so we can replace it with the buffer */
1511 PAD_LOCK (aggpad);
1512 if (g_queue_peek_tail (&aggpad->priv->data) == event)
1513 gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
1514 PAD_UNLOCK (aggpad);
1515
1516 if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
1517 GST_FLOW_OK) {
1518 GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
1519 res = FALSE;
1520 }
1521
1522 goto eat;
1523 }
1524 case GST_EVENT_TAG:
1525 goto eat;
1526 default:
1527 {
1528 break;
1529 }
1530 }
1531
1532 GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
1533 return gst_pad_event_default (pad, GST_OBJECT (self), event);
1534
1535 eat:
1536 GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
1537 if (event)
1538 gst_event_unref (event);
1539
1540 return res;
1541 }
1542
1543 static gboolean
gst_aggregator_stop_pad(GstElement * self,GstPad * epad,gpointer user_data)1544 gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data)
1545 {
1546 GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
1547 GstAggregator *agg = GST_AGGREGATOR_CAST (self);
1548
1549 gst_aggregator_pad_flush (pad, agg);
1550
1551 PAD_LOCK (pad);
1552 pad->priv->flow_return = GST_FLOW_FLUSHING;
1553 pad->priv->negotiated = FALSE;
1554 PAD_BROADCAST_EVENT (pad);
1555 PAD_UNLOCK (pad);
1556
1557 return TRUE;
1558 }
1559
1560 static gboolean
gst_aggregator_stop(GstAggregator * agg)1561 gst_aggregator_stop (GstAggregator * agg)
1562 {
1563 GstAggregatorClass *klass;
1564 gboolean result;
1565
1566 gst_aggregator_reset_flow_values (agg);
1567
1568 /* Application needs to make sure no pads are added while it shuts us down */
1569 gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg),
1570 gst_aggregator_stop_pad, NULL);
1571
1572 klass = GST_AGGREGATOR_GET_CLASS (agg);
1573
1574 if (klass->stop)
1575 result = klass->stop (agg);
1576 else
1577 result = TRUE;
1578
1579 agg->priv->has_peer_latency = FALSE;
1580 agg->priv->peer_latency_live = FALSE;
1581 agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0;
1582
1583 if (agg->priv->tags)
1584 gst_tag_list_unref (agg->priv->tags);
1585 agg->priv->tags = NULL;
1586
1587 gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL);
1588
1589 return result;
1590 }
1591
1592 /* GstElement vmethods implementations */
1593 static GstStateChangeReturn
gst_aggregator_change_state(GstElement * element,GstStateChange transition)1594 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1595 {
1596 GstStateChangeReturn ret;
1597 GstAggregator *self = GST_AGGREGATOR (element);
1598
1599 switch (transition) {
1600 case GST_STATE_CHANGE_READY_TO_PAUSED:
1601 if (!gst_aggregator_start (self))
1602 goto error_start;
1603 break;
1604 default:
1605 break;
1606 }
1607
1608 if ((ret =
1609 GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1610 transition)) == GST_STATE_CHANGE_FAILURE)
1611 goto failure;
1612
1613
1614 switch (transition) {
1615 case GST_STATE_CHANGE_PAUSED_TO_READY:
1616 if (!gst_aggregator_stop (self)) {
1617 /* What to do in this case? Error out? */
1618 GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1619 }
1620 break;
1621 default:
1622 break;
1623 }
1624
1625 return ret;
1626
1627 /* ERRORS */
1628 failure:
1629 {
1630 GST_ERROR_OBJECT (element, "parent failed state change");
1631 return ret;
1632 }
1633 error_start:
1634 {
1635 GST_ERROR_OBJECT (element, "Subclass failed to start");
1636 return GST_STATE_CHANGE_FAILURE;
1637 }
1638 }
1639
1640 static void
gst_aggregator_release_pad(GstElement * element,GstPad * pad)1641 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1642 {
1643 GstAggregator *self = GST_AGGREGATOR (element);
1644 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1645
1646 GST_INFO_OBJECT (pad, "Removing pad");
1647
1648 SRC_LOCK (self);
1649 gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
1650 gst_element_remove_pad (element, pad);
1651
1652 self->priv->has_peer_latency = FALSE;
1653 SRC_BROADCAST (self);
1654 SRC_UNLOCK (self);
1655 }
1656
1657 static GstAggregatorPad *
gst_aggregator_default_create_new_pad(GstAggregator * self,GstPadTemplate * templ,const gchar * req_name,const GstCaps * caps)1658 gst_aggregator_default_create_new_pad (GstAggregator * self,
1659 GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1660 {
1661 GstAggregatorPad *agg_pad;
1662 GstAggregatorPrivate *priv = self->priv;
1663 gint serial = 0;
1664 gchar *name = NULL;
1665 GType pad_type =
1666 GST_PAD_TEMPLATE_GTYPE (templ) ==
1667 G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
1668
1669 if (templ->direction != GST_PAD_SINK)
1670 goto not_sink;
1671
1672 if (templ->presence != GST_PAD_REQUEST)
1673 goto not_request;
1674
1675 GST_OBJECT_LOCK (self);
1676 if (req_name == NULL || strlen (req_name) < 6
1677 || !g_str_has_prefix (req_name, "sink_")
1678 || strrchr (req_name, '%') != NULL) {
1679 /* no name given when requesting the pad, use next available int */
1680 serial = ++priv->max_padserial;
1681 } else {
1682 gchar *endptr = NULL;
1683
1684 /* parse serial number from requested padname */
1685 serial = g_ascii_strtoull (&req_name[5], &endptr, 10);
1686 if (endptr != NULL && *endptr == '\0') {
1687 if (serial > priv->max_padserial) {
1688 priv->max_padserial = serial;
1689 }
1690 } else {
1691 serial = ++priv->max_padserial;
1692 }
1693 }
1694
1695 name = g_strdup_printf ("sink_%u", serial);
1696 g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
1697 agg_pad = g_object_new (pad_type,
1698 "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1699 g_free (name);
1700
1701 GST_OBJECT_UNLOCK (self);
1702
1703 return agg_pad;
1704
1705 /* errors */
1706 not_sink:
1707 {
1708 GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad");
1709 return NULL;
1710 }
1711 not_request:
1712 {
1713 GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad");
1714 return NULL;
1715 }
1716 }
1717
1718 static GstPad *
gst_aggregator_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * req_name,const GstCaps * caps)1719 gst_aggregator_request_new_pad (GstElement * element,
1720 GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1721 {
1722 GstAggregator *self;
1723 GstAggregatorPad *agg_pad;
1724 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element);
1725 GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
1726
1727 self = GST_AGGREGATOR (element);
1728
1729 agg_pad = klass->create_new_pad (self, templ, req_name, caps);
1730 if (!agg_pad) {
1731 GST_ERROR_OBJECT (element, "Couldn't create new pad");
1732 return NULL;
1733 }
1734
1735 GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1736
1737 if (priv->running)
1738 gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1739
1740 /* add the pad to the element */
1741 gst_element_add_pad (element, GST_PAD (agg_pad));
1742
1743 return GST_PAD (agg_pad);
1744 }
1745
1746 /* Must be called with SRC_LOCK held */
1747
1748 static gboolean
gst_aggregator_query_latency_unlocked(GstAggregator * self,GstQuery * query)1749 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
1750 {
1751 gboolean query_ret, live;
1752 GstClockTime our_latency, min, max;
1753
1754 query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1755
1756 if (!query_ret) {
1757 GST_WARNING_OBJECT (self, "Latency query failed");
1758 return FALSE;
1759 }
1760
1761 gst_query_parse_latency (query, &live, &min, &max);
1762
1763 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
1764 GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
1765 ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
1766 return FALSE;
1767 }
1768
1769 if (self->priv->upstream_latency_min > min) {
1770 GstClockTimeDiff diff =
1771 GST_CLOCK_DIFF (min, self->priv->upstream_latency_min);
1772
1773 min += diff;
1774 if (GST_CLOCK_TIME_IS_VALID (max)) {
1775 max += diff;
1776 }
1777 }
1778
1779 if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
1780 GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
1781 ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
1782 GST_TIME_FORMAT ". Add queues or other buffering elements.",
1783 GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
1784 return FALSE;
1785 }
1786
1787 our_latency = self->priv->latency;
1788
1789 self->priv->peer_latency_live = live;
1790 self->priv->peer_latency_min = min;
1791 self->priv->peer_latency_max = max;
1792 self->priv->has_peer_latency = TRUE;
1793
1794 /* add our own */
1795 min += our_latency;
1796 min += self->priv->sub_latency_min;
1797 if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1798 && GST_CLOCK_TIME_IS_VALID (max))
1799 max += self->priv->sub_latency_max + our_latency;
1800 else
1801 max = GST_CLOCK_TIME_NONE;
1802
1803 SRC_BROADCAST (self);
1804
1805 GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1806 " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1807
1808 gst_query_set_latency (query, live, min, max);
1809
1810 return query_ret;
1811 }
1812
1813 /*
1814 * MUST be called with the src_lock held.
1815 *
1816 * See gst_aggregator_get_latency() for doc
1817 */
1818 static GstClockTime
gst_aggregator_get_latency_unlocked(GstAggregator * self)1819 gst_aggregator_get_latency_unlocked (GstAggregator * self)
1820 {
1821 GstClockTime latency;
1822
1823 g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
1824
1825 if (!self->priv->has_peer_latency) {
1826 GstQuery *query = gst_query_new_latency ();
1827 gboolean ret;
1828
1829 ret = gst_aggregator_query_latency_unlocked (self, query);
1830 gst_query_unref (query);
1831 if (!ret)
1832 return GST_CLOCK_TIME_NONE;
1833 }
1834
1835 if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
1836 return GST_CLOCK_TIME_NONE;
1837
1838 /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1839 latency = self->priv->peer_latency_min;
1840
1841 /* add our own */
1842 latency += self->priv->latency;
1843 latency += self->priv->sub_latency_min;
1844
1845 return latency;
1846 }
1847
1848 /**
1849 * gst_aggregator_get_latency:
1850 * @self: a #GstAggregator
1851 *
1852 * Retrieves the latency values reported by @self in response to the latency
1853 * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
1854 * will not wait for the clock.
1855 *
1856 * Typically only called by subclasses.
1857 *
1858 * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
1859 */
1860 GstClockTime
gst_aggregator_get_latency(GstAggregator * self)1861 gst_aggregator_get_latency (GstAggregator * self)
1862 {
1863 GstClockTime ret;
1864
1865 SRC_LOCK (self);
1866 ret = gst_aggregator_get_latency_unlocked (self);
1867 SRC_UNLOCK (self);
1868
1869 return ret;
1870 }
1871
1872 static gboolean
gst_aggregator_send_event(GstElement * element,GstEvent * event)1873 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1874 {
1875 GstAggregator *self = GST_AGGREGATOR (element);
1876
1877 GST_STATE_LOCK (element);
1878 if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1879 GST_STATE (element) < GST_STATE_PAUSED) {
1880 gdouble rate;
1881 GstFormat fmt;
1882 GstSeekFlags flags;
1883 GstSeekType start_type, stop_type;
1884 gint64 start, stop;
1885
1886 gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1887 &start, &stop_type, &stop);
1888
1889 GST_OBJECT_LOCK (self);
1890 gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
1891 flags, start_type, start, stop_type, stop, NULL);
1892 self->priv->seqnum = gst_event_get_seqnum (event);
1893 self->priv->first_buffer = FALSE;
1894 GST_OBJECT_UNLOCK (self);
1895
1896 GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1897 }
1898 GST_STATE_UNLOCK (element);
1899
1900
1901 return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1902 event);
1903 }
1904
1905 static gboolean
gst_aggregator_default_src_query(GstAggregator * self,GstQuery * query)1906 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1907 {
1908 gboolean res = TRUE;
1909
1910 switch (GST_QUERY_TYPE (query)) {
1911 case GST_QUERY_SEEKING:
1912 {
1913 GstFormat format;
1914
1915 /* don't pass it along as some (file)sink might claim it does
1916 * whereas with a collectpads in between that will not likely work */
1917 gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1918 gst_query_set_seeking (query, format, FALSE, 0, -1);
1919 res = TRUE;
1920
1921 break;
1922 }
1923 case GST_QUERY_LATENCY:
1924 SRC_LOCK (self);
1925 res = gst_aggregator_query_latency_unlocked (self, query);
1926 SRC_UNLOCK (self);
1927 break;
1928 default:
1929 return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1930 }
1931
1932 return res;
1933 }
1934
1935 static gboolean
gst_aggregator_event_forward_func(GstPad * pad,gpointer user_data)1936 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1937 {
1938 EventData *evdata = user_data;
1939 gboolean ret = TRUE;
1940 GstPad *peer = gst_pad_get_peer (pad);
1941 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1942
1943 if (peer) {
1944 if (evdata->only_to_active_pads && aggpad->priv->first_buffer) {
1945 GST_DEBUG_OBJECT (pad, "not sending event to inactive pad");
1946 ret = TRUE;
1947 } else {
1948 ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1949 GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1950 }
1951 }
1952
1953 if (ret == FALSE) {
1954 if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1955 GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1956
1957 GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1958
1959 if (gst_pad_query (peer, seeking)) {
1960 gboolean seekable;
1961
1962 gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1963
1964 if (seekable == FALSE) {
1965 GST_INFO_OBJECT (pad,
1966 "Source not seekable, We failed but it does not matter!");
1967
1968 ret = TRUE;
1969 }
1970 } else {
1971 GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1972 }
1973
1974 gst_query_unref (seeking);
1975 }
1976
1977 if (evdata->flush) {
1978 PAD_LOCK (aggpad);
1979 aggpad->priv->pending_flush_start = FALSE;
1980 aggpad->priv->pending_flush_stop = FALSE;
1981 PAD_UNLOCK (aggpad);
1982 }
1983 } else {
1984 evdata->one_actually_seeked = TRUE;
1985 }
1986
1987 evdata->result &= ret;
1988
1989 if (peer)
1990 gst_object_unref (peer);
1991
1992 /* Always send to all pads */
1993 return FALSE;
1994 }
1995
1996 static void
gst_aggregator_forward_event_to_all_sinkpads(GstAggregator * self,EventData * evdata)1997 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1998 EventData * evdata)
1999 {
2000 evdata->result = TRUE;
2001 evdata->one_actually_seeked = FALSE;
2002
2003 /* We first need to set all pads as flushing in a first pass
2004 * as flush_start flush_stop is sometimes sent synchronously
2005 * while we send the seek event */
2006 if (evdata->flush) {
2007 GList *l;
2008
2009 GST_OBJECT_LOCK (self);
2010 for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
2011 GstAggregatorPad *pad = l->data;
2012
2013 PAD_LOCK (pad);
2014 pad->priv->pending_flush_start = TRUE;
2015 pad->priv->pending_flush_stop = FALSE;
2016 PAD_UNLOCK (pad);
2017 }
2018 GST_OBJECT_UNLOCK (self);
2019 }
2020
2021 gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
2022
2023 gst_event_unref (evdata->event);
2024 }
2025
2026 static gboolean
gst_aggregator_do_seek(GstAggregator * self,GstEvent * event)2027 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
2028 {
2029 gdouble rate;
2030 GstFormat fmt;
2031 GstSeekFlags flags;
2032 GstSeekType start_type, stop_type;
2033 gint64 start, stop;
2034 gboolean flush;
2035 EventData evdata = { 0, };
2036 GstAggregatorPrivate *priv = self->priv;
2037
2038 gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
2039 &start, &stop_type, &stop);
2040
2041 GST_INFO_OBJECT (self, "starting SEEK");
2042
2043 flush = flags & GST_SEEK_FLAG_FLUSH;
2044
2045 GST_OBJECT_LOCK (self);
2046 if (flush) {
2047 priv->pending_flush_start = TRUE;
2048 priv->flush_seeking = TRUE;
2049 }
2050
2051 gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
2052 flags, start_type, start, stop_type, stop, NULL);
2053
2054 /* Seeking sets a position */
2055 self->priv->first_buffer = FALSE;
2056 GST_OBJECT_UNLOCK (self);
2057
2058 /* forward the seek upstream */
2059 evdata.event = event;
2060 evdata.flush = flush;
2061 evdata.only_to_active_pads = FALSE;
2062 gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2063 event = NULL;
2064
2065 if (!evdata.result || !evdata.one_actually_seeked) {
2066 GST_OBJECT_LOCK (self);
2067 priv->flush_seeking = FALSE;
2068 priv->pending_flush_start = FALSE;
2069 GST_OBJECT_UNLOCK (self);
2070 }
2071
2072 GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
2073
2074 return evdata.result;
2075 }
2076
2077 static gboolean
gst_aggregator_default_src_event(GstAggregator * self,GstEvent * event)2078 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
2079 {
2080 EventData evdata = { 0, };
2081
2082 switch (GST_EVENT_TYPE (event)) {
2083 case GST_EVENT_SEEK:
2084 /* _do_seek() unrefs the event. */
2085 return gst_aggregator_do_seek (self, event);
2086 case GST_EVENT_NAVIGATION:
2087 /* navigation is rather pointless. */
2088 gst_event_unref (event);
2089 return FALSE;
2090 default:
2091 break;
2092 }
2093
2094 /* Don't forward QOS events to pads that had no active buffer yet. Otherwise
2095 * they will receive a QOS event that has earliest_time=0 (because we can't
2096 * have negative timestamps), and consider their buffer as too late */
2097 evdata.event = event;
2098 evdata.flush = FALSE;
2099 evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS;
2100 gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2101 return evdata.result;
2102 }
2103
2104 static gboolean
gst_aggregator_src_pad_event_func(GstPad * pad,GstObject * parent,GstEvent * event)2105 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
2106 GstEvent * event)
2107 {
2108 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2109
2110 return klass->src_event (GST_AGGREGATOR (parent), event);
2111 }
2112
2113 static gboolean
gst_aggregator_src_pad_query_func(GstPad * pad,GstObject * parent,GstQuery * query)2114 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
2115 GstQuery * query)
2116 {
2117 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2118
2119 return klass->src_query (GST_AGGREGATOR (parent), query);
2120 }
2121
2122 static gboolean
gst_aggregator_src_pad_activate_mode_func(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)2123 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
2124 GstObject * parent, GstPadMode mode, gboolean active)
2125 {
2126 GstAggregator *self = GST_AGGREGATOR (parent);
2127 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2128
2129 if (klass->src_activate) {
2130 if (klass->src_activate (self, mode, active) == FALSE) {
2131 return FALSE;
2132 }
2133 }
2134
2135 if (active == TRUE) {
2136 switch (mode) {
2137 case GST_PAD_MODE_PUSH:
2138 {
2139 GST_INFO_OBJECT (pad, "Activating pad!");
2140 gst_aggregator_start_srcpad_task (self);
2141 return TRUE;
2142 }
2143 default:
2144 {
2145 GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
2146 return FALSE;
2147 }
2148 }
2149 }
2150
2151 /* deactivating */
2152 GST_INFO_OBJECT (self, "Deactivating srcpad");
2153 gst_aggregator_stop_srcpad_task (self, FALSE);
2154
2155 return TRUE;
2156 }
2157
2158 static gboolean
gst_aggregator_default_sink_query(GstAggregator * self,GstAggregatorPad * aggpad,GstQuery * query)2159 gst_aggregator_default_sink_query (GstAggregator * self,
2160 GstAggregatorPad * aggpad, GstQuery * query)
2161 {
2162 GstPad *pad = GST_PAD (aggpad);
2163
2164 if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) {
2165 GstQuery *decide_query = NULL;
2166 GstAggregatorClass *agg_class;
2167 gboolean ret;
2168
2169 GST_OBJECT_LOCK (self);
2170 PAD_LOCK (aggpad);
2171 if (G_UNLIKELY (!aggpad->priv->negotiated)) {
2172 GST_DEBUG_OBJECT (self,
2173 "not negotiated yet, can't answer ALLOCATION query");
2174 PAD_UNLOCK (aggpad);
2175 GST_OBJECT_UNLOCK (self);
2176
2177 return FALSE;
2178 }
2179
2180 if ((decide_query = self->priv->allocation_query))
2181 gst_query_ref (decide_query);
2182 PAD_UNLOCK (aggpad);
2183 GST_OBJECT_UNLOCK (self);
2184
2185 GST_DEBUG_OBJECT (self,
2186 "calling propose allocation with query %" GST_PTR_FORMAT, decide_query);
2187
2188 agg_class = GST_AGGREGATOR_GET_CLASS (self);
2189
2190 /* pass the query to the propose_allocation vmethod if any */
2191 if (agg_class->propose_allocation)
2192 ret = agg_class->propose_allocation (self, aggpad, decide_query, query);
2193 else
2194 ret = FALSE;
2195
2196 if (decide_query)
2197 gst_query_unref (decide_query);
2198
2199 GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query);
2200 return ret;
2201 }
2202
2203 return gst_pad_query_default (pad, GST_OBJECT (self), query);
2204 }
2205
2206 static void
gst_aggregator_finalize(GObject * object)2207 gst_aggregator_finalize (GObject * object)
2208 {
2209 GstAggregator *self = (GstAggregator *) object;
2210
2211 g_mutex_clear (&self->priv->src_lock);
2212 g_cond_clear (&self->priv->src_cond);
2213
2214 G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
2215 }
2216
2217 /*
2218 * gst_aggregator_set_latency_property:
2219 * @agg: a #GstAggregator
2220 * @latency: the new latency value (in nanoseconds).
2221 *
2222 * Sets the new latency value to @latency. This value is used to limit the
2223 * amount of time a pad waits for data to appear before considering the pad
2224 * as unresponsive.
2225 */
2226 static void
gst_aggregator_set_latency_property(GstAggregator * self,GstClockTime latency)2227 gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency)
2228 {
2229 gboolean changed;
2230
2231 g_return_if_fail (GST_IS_AGGREGATOR (self));
2232 g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
2233
2234 SRC_LOCK (self);
2235 changed = (self->priv->latency != latency);
2236
2237 if (changed) {
2238 GList *item;
2239
2240 GST_OBJECT_LOCK (self);
2241 /* First lock all the pads */
2242 for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2243 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2244 PAD_LOCK (aggpad);
2245 }
2246
2247 self->priv->latency = latency;
2248
2249 SRC_BROADCAST (self);
2250
2251 /* Now wake up the pads */
2252 for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2253 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2254 PAD_BROADCAST_EVENT (aggpad);
2255 PAD_UNLOCK (aggpad);
2256 }
2257 GST_OBJECT_UNLOCK (self);
2258 }
2259
2260 SRC_UNLOCK (self);
2261
2262 if (changed)
2263 gst_element_post_message (GST_ELEMENT_CAST (self),
2264 gst_message_new_latency (GST_OBJECT_CAST (self)));
2265 }
2266
2267 /*
2268 * gst_aggregator_get_latency_property:
2269 * @agg: a #GstAggregator
2270 *
2271 * Gets the latency value. See gst_aggregator_set_latency for
2272 * more details.
2273 *
2274 * Returns: The time in nanoseconds to wait for data to arrive on a sink pad
2275 * before a pad is deemed unresponsive. A value of -1 means an
2276 * unlimited time.
2277 */
2278 static GstClockTime
gst_aggregator_get_latency_property(GstAggregator * agg)2279 gst_aggregator_get_latency_property (GstAggregator * agg)
2280 {
2281 GstClockTime res;
2282
2283 g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE);
2284
2285 GST_OBJECT_LOCK (agg);
2286 res = agg->priv->latency;
2287 GST_OBJECT_UNLOCK (agg);
2288
2289 return res;
2290 }
2291
2292 static void
gst_aggregator_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)2293 gst_aggregator_set_property (GObject * object, guint prop_id,
2294 const GValue * value, GParamSpec * pspec)
2295 {
2296 GstAggregator *agg = GST_AGGREGATOR (object);
2297
2298 switch (prop_id) {
2299 case PROP_LATENCY:
2300 gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value));
2301 break;
2302 case PROP_MIN_UPSTREAM_LATENCY:
2303 SRC_LOCK (agg);
2304 agg->priv->upstream_latency_min = g_value_get_uint64 (value);
2305 SRC_UNLOCK (agg);
2306 break;
2307 case PROP_START_TIME_SELECTION:
2308 agg->priv->start_time_selection = g_value_get_enum (value);
2309 break;
2310 case PROP_START_TIME:
2311 agg->priv->start_time = g_value_get_uint64 (value);
2312 break;
2313 default:
2314 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2315 break;
2316 }
2317 }
2318
2319 static void
gst_aggregator_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)2320 gst_aggregator_get_property (GObject * object, guint prop_id,
2321 GValue * value, GParamSpec * pspec)
2322 {
2323 GstAggregator *agg = GST_AGGREGATOR (object);
2324
2325 switch (prop_id) {
2326 case PROP_LATENCY:
2327 g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg));
2328 break;
2329 case PROP_MIN_UPSTREAM_LATENCY:
2330 SRC_LOCK (agg);
2331 g_value_set_uint64 (value, agg->priv->upstream_latency_min);
2332 SRC_UNLOCK (agg);
2333 break;
2334 case PROP_START_TIME_SELECTION:
2335 g_value_set_enum (value, agg->priv->start_time_selection);
2336 break;
2337 case PROP_START_TIME:
2338 g_value_set_uint64 (value, agg->priv->start_time);
2339 break;
2340 default:
2341 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2342 break;
2343 }
2344 }
2345
2346 /* GObject vmethods implementations */
2347 static void
gst_aggregator_class_init(GstAggregatorClass * klass)2348 gst_aggregator_class_init (GstAggregatorClass * klass)
2349 {
2350 GObjectClass *gobject_class = (GObjectClass *) klass;
2351 GstElementClass *gstelement_class = (GstElementClass *) klass;
2352
2353 aggregator_parent_class = g_type_class_peek_parent (klass);
2354
2355 GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
2356 GST_DEBUG_FG_MAGENTA, "GstAggregator");
2357
2358 if (aggregator_private_offset != 0)
2359 g_type_class_adjust_private_offset (klass, &aggregator_private_offset);
2360
2361 klass->finish_buffer = gst_aggregator_default_finish_buffer;
2362
2363 klass->sink_event = gst_aggregator_default_sink_event;
2364 klass->sink_query = gst_aggregator_default_sink_query;
2365
2366 klass->src_event = gst_aggregator_default_src_event;
2367 klass->src_query = gst_aggregator_default_src_query;
2368
2369 klass->create_new_pad = gst_aggregator_default_create_new_pad;
2370 klass->update_src_caps = gst_aggregator_default_update_src_caps;
2371 klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps;
2372 klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps;
2373
2374 gstelement_class->request_new_pad =
2375 GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
2376 gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
2377 gstelement_class->release_pad =
2378 GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
2379 gstelement_class->change_state =
2380 GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
2381
2382 gobject_class->set_property = gst_aggregator_set_property;
2383 gobject_class->get_property = gst_aggregator_get_property;
2384 gobject_class->finalize = gst_aggregator_finalize;
2385
2386 g_object_class_install_property (gobject_class, PROP_LATENCY,
2387 g_param_spec_uint64 ("latency", "Buffer latency",
2388 "Additional latency in live mode to allow upstream "
2389 "to take longer to produce buffers for the current "
2390 "position (in nanoseconds)", 0, G_MAXUINT64,
2391 DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2392
2393 /**
2394 * GstAggregator:min-upstream-latency:
2395 *
2396 * Force minimum upstream latency (in nanoseconds). When sources with a
2397 * higher latency are expected to be plugged in dynamically after the
2398 * aggregator has started playing, this allows overriding the minimum
2399 * latency reported by the initial source(s). This is only taken into
2400 * account when larger than the actually reported minimum latency.
2401 *
2402 * Since: 1.16
2403 */
2404 g_object_class_install_property (gobject_class, PROP_MIN_UPSTREAM_LATENCY,
2405 g_param_spec_uint64 ("min-upstream-latency", "Buffer latency",
2406 "When sources with a higher latency are expected to be plugged "
2407 "in dynamically after the aggregator has started playing, "
2408 "this allows overriding the minimum latency reported by the "
2409 "initial source(s). This is only taken into account when larger "
2410 "than the actually reported minimum latency. (nanoseconds)",
2411 0, G_MAXUINT64,
2412 DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2413
2414 g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
2415 g_param_spec_enum ("start-time-selection", "Start Time Selection",
2416 "Decides which start time is output",
2417 gst_aggregator_start_time_selection_get_type (),
2418 DEFAULT_START_TIME_SELECTION,
2419 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2420
2421 g_object_class_install_property (gobject_class, PROP_START_TIME,
2422 g_param_spec_uint64 ("start-time", "Start Time",
2423 "Start time to use if start-time-selection=set", 0,
2424 G_MAXUINT64,
2425 DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2426 }
2427
2428 static inline gpointer
gst_aggregator_get_instance_private(GstAggregator * self)2429 gst_aggregator_get_instance_private (GstAggregator * self)
2430 {
2431 return (G_STRUCT_MEMBER_P (self, aggregator_private_offset));
2432 }
2433
2434 static void
gst_aggregator_init(GstAggregator * self,GstAggregatorClass * klass)2435 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
2436 {
2437 GstPadTemplate *pad_template;
2438 GstAggregatorPrivate *priv;
2439 GType pad_type;
2440
2441 g_return_if_fail (klass->aggregate != NULL);
2442
2443 self->priv = gst_aggregator_get_instance_private (self);
2444
2445 priv = self->priv;
2446
2447 pad_template =
2448 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
2449 g_return_if_fail (pad_template != NULL);
2450
2451 priv->max_padserial = -1;
2452 priv->tags_changed = FALSE;
2453
2454 self->priv->peer_latency_live = FALSE;
2455 self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
2456 self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
2457 self->priv->has_peer_latency = FALSE;
2458
2459 pad_type =
2460 GST_PAD_TEMPLATE_GTYPE (pad_template) ==
2461 G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD :
2462 GST_PAD_TEMPLATE_GTYPE (pad_template);
2463 g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
2464 self->srcpad =
2465 g_object_new (pad_type, "name", "src", "direction", GST_PAD_SRC,
2466 "template", pad_template, NULL);
2467
2468 gst_aggregator_reset_flow_values (self);
2469
2470 gst_pad_set_event_function (self->srcpad,
2471 GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
2472 gst_pad_set_query_function (self->srcpad,
2473 GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
2474 gst_pad_set_activatemode_function (self->srcpad,
2475 GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
2476
2477 gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
2478
2479 self->priv->upstream_latency_min = DEFAULT_MIN_UPSTREAM_LATENCY;
2480 self->priv->latency = DEFAULT_LATENCY;
2481 self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
2482 self->priv->start_time = DEFAULT_START_TIME;
2483
2484 g_mutex_init (&self->priv->src_lock);
2485 g_cond_init (&self->priv->src_cond);
2486 }
2487
2488 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
2489 * method to get to the padtemplates */
2490 GType
gst_aggregator_get_type(void)2491 gst_aggregator_get_type (void)
2492 {
2493 static volatile gsize type = 0;
2494
2495 if (g_once_init_enter (&type)) {
2496 GType _type;
2497 static const GTypeInfo info = {
2498 sizeof (GstAggregatorClass),
2499 NULL,
2500 NULL,
2501 (GClassInitFunc) gst_aggregator_class_init,
2502 NULL,
2503 NULL,
2504 sizeof (GstAggregator),
2505 0,
2506 (GInstanceInitFunc) gst_aggregator_init,
2507 };
2508
2509 _type = g_type_register_static (GST_TYPE_ELEMENT,
2510 "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
2511
2512 aggregator_private_offset =
2513 g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate));
2514
2515 g_once_init_leave (&type, _type);
2516 }
2517 return type;
2518 }
2519
2520 /* Must be called with SRC lock and PAD lock held */
2521 static gboolean
gst_aggregator_pad_has_space(GstAggregator * self,GstAggregatorPad * aggpad)2522 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
2523 {
2524 /* Empty queue always has space */
2525 if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
2526 return TRUE;
2527
2528 /* We also want at least two buffers, one is being processed and one is ready
2529 * for the next iteration when we operate in live mode. */
2530 if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
2531 return TRUE;
2532
2533 /* zero latency, if there is a buffer, it's full */
2534 if (self->priv->latency == 0)
2535 return FALSE;
2536
2537 /* Allow no more buffers than the latency */
2538 return (aggpad->priv->time_level <= self->priv->latency);
2539 }
2540
2541 /* Must be called with the PAD_LOCK held */
2542 static void
apply_buffer(GstAggregatorPad * aggpad,GstBuffer * buffer,gboolean head)2543 apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2544 {
2545 GstClockTime timestamp;
2546
2547 if (GST_BUFFER_DTS_IS_VALID (buffer))
2548 timestamp = GST_BUFFER_DTS (buffer);
2549 else
2550 timestamp = GST_BUFFER_PTS (buffer);
2551
2552 if (timestamp == GST_CLOCK_TIME_NONE) {
2553 if (head)
2554 timestamp = aggpad->priv->head_position;
2555 else
2556 timestamp = aggpad->priv->tail_position;
2557 }
2558
2559 /* add duration */
2560 if (GST_BUFFER_DURATION_IS_VALID (buffer))
2561 timestamp += GST_BUFFER_DURATION (buffer);
2562
2563 if (head)
2564 aggpad->priv->head_position = timestamp;
2565 else
2566 aggpad->priv->tail_position = timestamp;
2567
2568 update_time_level (aggpad, head);
2569 }
2570
2571 /*
2572 * Can be called either from the sinkpad's chain function or from the srcpad's
2573 * thread in the case of a buffer synthetized from a GAP event.
2574 * Because of this second case, FLUSH_LOCK can't be used here.
2575 */
2576
2577 static GstFlowReturn
gst_aggregator_pad_chain_internal(GstAggregator * self,GstAggregatorPad * aggpad,GstBuffer * buffer,gboolean head)2578 gst_aggregator_pad_chain_internal (GstAggregator * self,
2579 GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2580 {
2581 GstFlowReturn flow_return;
2582 GstClockTime buf_pts;
2583
2584 PAD_LOCK (aggpad);
2585 flow_return = aggpad->priv->flow_return;
2586 if (flow_return != GST_FLOW_OK)
2587 goto flushing;
2588
2589 PAD_UNLOCK (aggpad);
2590
2591 buf_pts = GST_BUFFER_PTS (buffer);
2592
2593 for (;;) {
2594 SRC_LOCK (self);
2595 GST_OBJECT_LOCK (self);
2596 PAD_LOCK (aggpad);
2597
2598 if (aggpad->priv->first_buffer) {
2599 self->priv->has_peer_latency = FALSE;
2600 aggpad->priv->first_buffer = FALSE;
2601 }
2602
2603 if ((gst_aggregator_pad_has_space (self, aggpad) || !head)
2604 && aggpad->priv->flow_return == GST_FLOW_OK) {
2605 if (head)
2606 g_queue_push_head (&aggpad->priv->data, buffer);
2607 else
2608 g_queue_push_tail (&aggpad->priv->data, buffer);
2609 apply_buffer (aggpad, buffer, head);
2610 aggpad->priv->num_buffers++;
2611 buffer = NULL;
2612 SRC_BROADCAST (self);
2613 break;
2614 }
2615
2616 flow_return = aggpad->priv->flow_return;
2617 if (flow_return != GST_FLOW_OK) {
2618 GST_OBJECT_UNLOCK (self);
2619 SRC_UNLOCK (self);
2620 goto flushing;
2621 }
2622 GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2623 GST_OBJECT_UNLOCK (self);
2624 SRC_UNLOCK (self);
2625 PAD_WAIT_EVENT (aggpad);
2626
2627 PAD_UNLOCK (aggpad);
2628 }
2629
2630 if (self->priv->first_buffer) {
2631 GstClockTime start_time;
2632 GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
2633
2634 switch (self->priv->start_time_selection) {
2635 case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
2636 default:
2637 start_time = 0;
2638 break;
2639 case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
2640 GST_OBJECT_LOCK (aggpad);
2641 if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
2642 start_time = buf_pts;
2643 if (start_time != -1) {
2644 start_time = MAX (start_time, aggpad->priv->head_segment.start);
2645 start_time =
2646 gst_segment_to_running_time (&aggpad->priv->head_segment,
2647 GST_FORMAT_TIME, start_time);
2648 }
2649 } else {
2650 start_time = 0;
2651 GST_WARNING_OBJECT (aggpad,
2652 "Ignoring request of selecting the first start time "
2653 "as the segment is a %s segment instead of a time segment",
2654 gst_format_get_name (aggpad->segment.format));
2655 }
2656 GST_OBJECT_UNLOCK (aggpad);
2657 break;
2658 case GST_AGGREGATOR_START_TIME_SELECTION_SET:
2659 start_time = self->priv->start_time;
2660 if (start_time == -1)
2661 start_time = 0;
2662 break;
2663 }
2664
2665 if (start_time != -1) {
2666 if (srcpad->segment.position == -1)
2667 srcpad->segment.position = start_time;
2668 else
2669 srcpad->segment.position = MIN (start_time, srcpad->segment.position);
2670
2671 GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
2672 GST_TIME_ARGS (start_time));
2673 }
2674 }
2675
2676 PAD_UNLOCK (aggpad);
2677 GST_OBJECT_UNLOCK (self);
2678 SRC_UNLOCK (self);
2679
2680 GST_DEBUG_OBJECT (aggpad, "Done chaining");
2681
2682 return flow_return;
2683
2684 flushing:
2685 PAD_UNLOCK (aggpad);
2686
2687 GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
2688 gst_flow_get_name (flow_return));
2689 if (buffer)
2690 gst_buffer_unref (buffer);
2691
2692 return flow_return;
2693 }
2694
2695 static GstFlowReturn
gst_aggregator_pad_chain(GstPad * pad,GstObject * object,GstBuffer * buffer)2696 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
2697 {
2698 GstFlowReturn ret;
2699 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2700
2701 PAD_FLUSH_LOCK (aggpad);
2702
2703 ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
2704 aggpad, buffer, TRUE);
2705
2706 PAD_FLUSH_UNLOCK (aggpad);
2707
2708 return ret;
2709 }
2710
2711 static gboolean
gst_aggregator_pad_query_func(GstPad * pad,GstObject * parent,GstQuery * query)2712 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
2713 GstQuery * query)
2714 {
2715 GstAggregator *self = GST_AGGREGATOR (parent);
2716 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2717
2718 if (GST_QUERY_IS_SERIALIZED (query)) {
2719 GstStructure *s;
2720 gboolean ret = FALSE;
2721
2722 SRC_LOCK (self);
2723 PAD_LOCK (aggpad);
2724
2725 if (aggpad->priv->flow_return != GST_FLOW_OK) {
2726 SRC_UNLOCK (self);
2727 goto flushing;
2728 }
2729
2730 g_queue_push_head (&aggpad->priv->data, query);
2731 SRC_BROADCAST (self);
2732 SRC_UNLOCK (self);
2733
2734 while (!gst_aggregator_pad_queue_is_empty (aggpad)
2735 && aggpad->priv->flow_return == GST_FLOW_OK) {
2736 GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2737 PAD_WAIT_EVENT (aggpad);
2738 }
2739
2740 s = gst_query_writable_structure (query);
2741 if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
2742 gst_structure_remove_field (s, "gst-aggregator-retval");
2743 else
2744 g_queue_remove (&aggpad->priv->data, query);
2745
2746 if (aggpad->priv->flow_return != GST_FLOW_OK)
2747 goto flushing;
2748
2749 PAD_UNLOCK (aggpad);
2750
2751 return ret;
2752 } else {
2753 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2754
2755 return klass->sink_query (self, aggpad, query);
2756 }
2757
2758 flushing:
2759 GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
2760 gst_flow_get_name (aggpad->priv->flow_return));
2761 PAD_UNLOCK (aggpad);
2762
2763 return FALSE;
2764 }
2765
2766 /* Queue serialized events and let the others go through directly.
2767 * The queued events with be handled from the src-pad task in
2768 * gst_aggregator_do_events_and_queries().
2769 */
2770 static GstFlowReturn
gst_aggregator_pad_event_func(GstPad * pad,GstObject * parent,GstEvent * event)2771 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
2772 GstEvent * event)
2773 {
2774 GstFlowReturn ret = GST_FLOW_OK;
2775 GstAggregator *self = GST_AGGREGATOR (parent);
2776 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2777
2778 if (GST_EVENT_IS_SERIALIZED (event)
2779 && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
2780 SRC_LOCK (self);
2781 PAD_LOCK (aggpad);
2782
2783 if (aggpad->priv->flow_return != GST_FLOW_OK)
2784 goto flushing;
2785
2786 if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
2787 GST_OBJECT_LOCK (aggpad);
2788 gst_event_copy_segment (event, &aggpad->priv->head_segment);
2789 aggpad->priv->head_position = aggpad->priv->head_segment.position;
2790 update_time_level (aggpad, TRUE);
2791 GST_OBJECT_UNLOCK (aggpad);
2792 }
2793
2794 GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event);
2795 g_queue_push_head (&aggpad->priv->data, event);
2796 SRC_BROADCAST (self);
2797 PAD_UNLOCK (aggpad);
2798 SRC_UNLOCK (self);
2799 } else {
2800 GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2801
2802 if (!klass->sink_event (self, aggpad, event)) {
2803 /* Copied from GstPad to convert boolean to a GstFlowReturn in
2804 * the event handling func */
2805 ret = GST_FLOW_ERROR;
2806 }
2807 }
2808
2809 return ret;
2810
2811 flushing:
2812 GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
2813 gst_flow_get_name (aggpad->priv->flow_return));
2814 PAD_UNLOCK (aggpad);
2815 SRC_UNLOCK (self);
2816 if (GST_EVENT_IS_STICKY (event))
2817 gst_pad_store_sticky_event (pad, event);
2818 gst_event_unref (event);
2819
2820 return aggpad->priv->flow_return;
2821 }
2822
2823 static gboolean
gst_aggregator_pad_activate_mode_func(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)2824 gst_aggregator_pad_activate_mode_func (GstPad * pad,
2825 GstObject * parent, GstPadMode mode, gboolean active)
2826 {
2827 GstAggregator *self = GST_AGGREGATOR (parent);
2828 GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2829
2830 if (active == FALSE) {
2831 SRC_LOCK (self);
2832 gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
2833 SRC_BROADCAST (self);
2834 SRC_UNLOCK (self);
2835 } else {
2836 PAD_LOCK (aggpad);
2837 aggpad->priv->flow_return = GST_FLOW_OK;
2838 PAD_BROADCAST_EVENT (aggpad);
2839 PAD_UNLOCK (aggpad);
2840 }
2841
2842 return TRUE;
2843 }
2844
2845 /***********************************
2846 * GstAggregatorPad implementation *
2847 ************************************/
2848 G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
2849
2850 #define DEFAULT_PAD_EMIT_SIGNALS FALSE
2851
2852 enum
2853 {
2854 PAD_PROP_0,
2855 PAD_PROP_EMIT_SIGNALS,
2856 };
2857
2858 enum
2859 {
2860 PAD_SIGNAL_BUFFER_CONSUMED,
2861 PAD_LAST_SIGNAL,
2862 };
2863
2864 static guint gst_aggregator_pad_signals[PAD_LAST_SIGNAL] = { 0 };
2865
2866 static void
gst_aggregator_pad_constructed(GObject * object)2867 gst_aggregator_pad_constructed (GObject * object)
2868 {
2869 GstPad *pad = GST_PAD (object);
2870
2871 if (GST_PAD_IS_SINK (pad)) {
2872 gst_pad_set_chain_function (pad,
2873 GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
2874 gst_pad_set_event_full_function_full (pad,
2875 GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
2876 gst_pad_set_query_function (pad,
2877 GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
2878 gst_pad_set_activatemode_function (pad,
2879 GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
2880 }
2881 }
2882
2883 static void
gst_aggregator_pad_finalize(GObject * object)2884 gst_aggregator_pad_finalize (GObject * object)
2885 {
2886 GstAggregatorPad *pad = (GstAggregatorPad *) object;
2887
2888 g_cond_clear (&pad->priv->event_cond);
2889 g_mutex_clear (&pad->priv->flush_lock);
2890 g_mutex_clear (&pad->priv->lock);
2891
2892 G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
2893 }
2894
2895 static void
gst_aggregator_pad_dispose(GObject * object)2896 gst_aggregator_pad_dispose (GObject * object)
2897 {
2898 GstAggregatorPad *pad = (GstAggregatorPad *) object;
2899
2900 gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
2901
2902 G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
2903 }
2904
2905 static void
gst_aggregator_pad_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)2906 gst_aggregator_pad_set_property (GObject * object, guint prop_id,
2907 const GValue * value, GParamSpec * pspec)
2908 {
2909 GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
2910
2911 switch (prop_id) {
2912 case PAD_PROP_EMIT_SIGNALS:
2913 pad->priv->emit_signals = g_value_get_boolean (value);
2914 break;
2915 default:
2916 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2917 break;
2918 }
2919 }
2920
2921 static void
gst_aggregator_pad_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)2922 gst_aggregator_pad_get_property (GObject * object, guint prop_id,
2923 GValue * value, GParamSpec * pspec)
2924 {
2925 GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
2926
2927 switch (prop_id) {
2928 case PAD_PROP_EMIT_SIGNALS:
2929 g_value_set_boolean (value, pad->priv->emit_signals);
2930 break;
2931 default:
2932 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2933 break;
2934 }
2935 }
2936
2937 static void
gst_aggregator_pad_class_init(GstAggregatorPadClass * klass)2938 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
2939 {
2940 GObjectClass *gobject_class = (GObjectClass *) klass;
2941
2942 gobject_class->constructed = gst_aggregator_pad_constructed;
2943 gobject_class->finalize = gst_aggregator_pad_finalize;
2944 gobject_class->dispose = gst_aggregator_pad_dispose;
2945 gobject_class->set_property = gst_aggregator_pad_set_property;
2946 gobject_class->get_property = gst_aggregator_pad_get_property;
2947
2948 /**
2949 * GstAggregatorPad:buffer-consumed:
2950 *
2951 * Signals that a buffer was consumed. As aggregator pads store buffers
2952 * in an internal queue, there is no direct match between input and output
2953 * buffers at any given time. This signal can be useful to forward metas
2954 * such as #GstVideoTimeCodeMeta or #GstVideoCaptionMeta at the right time.
2955 *
2956 * Since: 1.16
2957 */
2958 gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED] =
2959 g_signal_new ("buffer-consumed", G_TYPE_FROM_CLASS (klass),
2960 G_SIGNAL_RUN_FIRST, 0, NULL, NULL, g_cclosure_marshal_generic,
2961 G_TYPE_NONE, 1, GST_TYPE_BUFFER);
2962
2963 /**
2964 * GstAggregatorPad:emit-signals:
2965 *
2966 * Enables the emission of signals such as #GstAggregatorPad::buffer-consumed
2967 *
2968 * Since: 1.16
2969 */
2970 g_object_class_install_property (gobject_class, PAD_PROP_EMIT_SIGNALS,
2971 g_param_spec_boolean ("emit-signals", "Emit signals",
2972 "Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS,
2973 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2974 }
2975
2976 static void
gst_aggregator_pad_init(GstAggregatorPad * pad)2977 gst_aggregator_pad_init (GstAggregatorPad * pad)
2978 {
2979 pad->priv = gst_aggregator_pad_get_instance_private (pad);
2980
2981 g_queue_init (&pad->priv->data);
2982 g_cond_init (&pad->priv->event_cond);
2983
2984 g_mutex_init (&pad->priv->flush_lock);
2985 g_mutex_init (&pad->priv->lock);
2986
2987 gst_aggregator_pad_reset_unlocked (pad);
2988 pad->priv->negotiated = FALSE;
2989 pad->priv->emit_signals = DEFAULT_PAD_EMIT_SIGNALS;
2990 }
2991
2992 /* Must be called with the PAD_LOCK held */
2993 static void
gst_aggregator_pad_buffer_consumed(GstAggregatorPad * pad,GstBuffer * buffer)2994 gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer)
2995 {
2996 pad->priv->num_buffers--;
2997 GST_TRACE_OBJECT (pad, "Consuming buffer %" GST_PTR_FORMAT, buffer);
2998 if (buffer && pad->priv->emit_signals) {
2999 g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED],
3000 0, buffer);
3001 }
3002 PAD_BROADCAST_EVENT (pad);
3003 }
3004
3005 /* Must be called with the PAD_LOCK held */
3006 static void
gst_aggregator_pad_clip_buffer_unlocked(GstAggregatorPad * pad)3007 gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
3008 {
3009 GstAggregator *self = NULL;
3010 GstAggregatorClass *aggclass = NULL;
3011 GstBuffer *buffer = NULL;
3012
3013 while (pad->priv->clipped_buffer == NULL &&
3014 GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
3015 buffer = g_queue_pop_tail (&pad->priv->data);
3016
3017 apply_buffer (pad, buffer, FALSE);
3018
3019 /* We only take the parent here so that it's not taken if the buffer is
3020 * already clipped or if the queue is empty.
3021 */
3022 if (self == NULL) {
3023 self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
3024 if (self == NULL) {
3025 gst_buffer_unref (buffer);
3026 return;
3027 }
3028
3029 aggclass = GST_AGGREGATOR_GET_CLASS (self);
3030 }
3031
3032 if (aggclass->clip) {
3033 GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
3034
3035 buffer = aggclass->clip (self, pad, buffer);
3036
3037 if (buffer == NULL) {
3038 gst_aggregator_pad_buffer_consumed (pad, buffer);
3039 GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
3040 }
3041 }
3042
3043 pad->priv->clipped_buffer = buffer;
3044 }
3045
3046 if (self)
3047 gst_object_unref (self);
3048 }
3049
3050 /**
3051 * gst_aggregator_pad_pop_buffer:
3052 * @pad: the pad to get buffer from
3053 *
3054 * Steal the ref to the buffer currently queued in @pad.
3055 *
3056 * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
3057 * queued. You should unref the buffer after usage.
3058 */
3059 GstBuffer *
gst_aggregator_pad_pop_buffer(GstAggregatorPad * pad)3060 gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
3061 {
3062 GstBuffer *buffer;
3063
3064 PAD_LOCK (pad);
3065
3066 if (pad->priv->flow_return != GST_FLOW_OK) {
3067 PAD_UNLOCK (pad);
3068 return NULL;
3069 }
3070
3071 gst_aggregator_pad_clip_buffer_unlocked (pad);
3072
3073 buffer = pad->priv->clipped_buffer;
3074
3075 if (buffer) {
3076 pad->priv->clipped_buffer = NULL;
3077 gst_aggregator_pad_buffer_consumed (pad, buffer);
3078 GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
3079 }
3080
3081 PAD_UNLOCK (pad);
3082
3083 return buffer;
3084 }
3085
3086 /**
3087 * gst_aggregator_pad_drop_buffer:
3088 * @pad: the pad where to drop any pending buffer
3089 *
3090 * Drop the buffer currently queued in @pad.
3091 *
3092 * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
3093 */
3094 gboolean
gst_aggregator_pad_drop_buffer(GstAggregatorPad * pad)3095 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
3096 {
3097 GstBuffer *buf;
3098
3099 buf = gst_aggregator_pad_pop_buffer (pad);
3100
3101 if (buf == NULL)
3102 return FALSE;
3103
3104 gst_buffer_unref (buf);
3105 return TRUE;
3106 }
3107
3108 /**
3109 * gst_aggregator_pad_peek_buffer:
3110 * @pad: the pad to get buffer from
3111 *
3112 * Returns: (transfer full): A reference to the buffer in @pad or
3113 * NULL if no buffer was queued. You should unref the buffer after
3114 * usage.
3115 */
3116 GstBuffer *
gst_aggregator_pad_peek_buffer(GstAggregatorPad * pad)3117 gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
3118 {
3119 GstBuffer *buffer;
3120
3121 PAD_LOCK (pad);
3122
3123 if (pad->priv->flow_return != GST_FLOW_OK) {
3124 PAD_UNLOCK (pad);
3125 return NULL;
3126 }
3127
3128 gst_aggregator_pad_clip_buffer_unlocked (pad);
3129
3130 if (pad->priv->clipped_buffer) {
3131 buffer = gst_buffer_ref (pad->priv->clipped_buffer);
3132 } else {
3133 buffer = NULL;
3134 }
3135 PAD_UNLOCK (pad);
3136
3137 return buffer;
3138 }
3139
3140 /**
3141 * gst_aggregator_pad_has_buffer:
3142 * @pad: the pad to check the buffer on
3143 *
3144 * This checks if a pad has a buffer available that will be returned by
3145 * a call to gst_aggregator_pad_peek_buffer() or
3146 * gst_aggregator_pad_pop_buffer().
3147 *
3148 * Returns: %TRUE if the pad has a buffer available as the next thing.
3149 *
3150 * Since: 1.14.1
3151 */
3152 gboolean
gst_aggregator_pad_has_buffer(GstAggregatorPad * pad)3153 gst_aggregator_pad_has_buffer (GstAggregatorPad * pad)
3154 {
3155 gboolean has_buffer;
3156
3157 PAD_LOCK (pad);
3158 gst_aggregator_pad_clip_buffer_unlocked (pad);
3159 has_buffer = (pad->priv->clipped_buffer != NULL);
3160 PAD_UNLOCK (pad);
3161
3162 return has_buffer;
3163 }
3164
3165 /**
3166 * gst_aggregator_pad_is_eos:
3167 * @pad: an aggregator pad
3168 *
3169 * Returns: %TRUE if the pad is EOS, otherwise %FALSE.
3170 */
3171 gboolean
gst_aggregator_pad_is_eos(GstAggregatorPad * pad)3172 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
3173 {
3174 gboolean is_eos;
3175
3176 PAD_LOCK (pad);
3177 is_eos = pad->priv->eos;
3178 PAD_UNLOCK (pad);
3179
3180 return is_eos;
3181 }
3182
3183 #if 0
3184 /*
3185 * gst_aggregator_merge_tags:
3186 * @self: a #GstAggregator
3187 * @tags: a #GstTagList to merge
3188 * @mode: the #GstTagMergeMode to use
3189 *
3190 * Adds tags to so-called pending tags, which will be processed
3191 * before pushing out data downstream.
3192 *
3193 * Note that this is provided for convenience, and the subclass is
3194 * not required to use this and can still do tag handling on its own.
3195 *
3196 * MT safe.
3197 */
3198 void
3199 gst_aggregator_merge_tags (GstAggregator * self,
3200 const GstTagList * tags, GstTagMergeMode mode)
3201 {
3202 GstTagList *otags;
3203
3204 g_return_if_fail (GST_IS_AGGREGATOR (self));
3205 g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
3206
3207 /* FIXME Check if we can use OBJECT lock here! */
3208 GST_OBJECT_LOCK (self);
3209 if (tags)
3210 GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
3211 otags = self->priv->tags;
3212 self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
3213 if (otags)
3214 gst_tag_list_unref (otags);
3215 self->priv->tags_changed = TRUE;
3216 GST_OBJECT_UNLOCK (self);
3217 }
3218 #endif
3219
3220 /**
3221 * gst_aggregator_set_latency:
3222 * @self: a #GstAggregator
3223 * @min_latency: minimum latency
3224 * @max_latency: maximum latency
3225 *
3226 * Lets #GstAggregator sub-classes tell the baseclass what their internal
3227 * latency is. Will also post a LATENCY message on the bus so the pipeline
3228 * can reconfigure its global latency.
3229 */
3230 void
gst_aggregator_set_latency(GstAggregator * self,GstClockTime min_latency,GstClockTime max_latency)3231 gst_aggregator_set_latency (GstAggregator * self,
3232 GstClockTime min_latency, GstClockTime max_latency)
3233 {
3234 gboolean changed = FALSE;
3235
3236 g_return_if_fail (GST_IS_AGGREGATOR (self));
3237 g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
3238 g_return_if_fail (max_latency >= min_latency);
3239
3240 SRC_LOCK (self);
3241 if (self->priv->sub_latency_min != min_latency) {
3242 self->priv->sub_latency_min = min_latency;
3243 changed = TRUE;
3244 }
3245 if (self->priv->sub_latency_max != max_latency) {
3246 self->priv->sub_latency_max = max_latency;
3247 changed = TRUE;
3248 }
3249
3250 if (changed)
3251 SRC_BROADCAST (self);
3252 SRC_UNLOCK (self);
3253
3254 if (changed) {
3255 gst_element_post_message (GST_ELEMENT_CAST (self),
3256 gst_message_new_latency (GST_OBJECT_CAST (self)));
3257 }
3258 }
3259
3260 /**
3261 * gst_aggregator_get_buffer_pool:
3262 * @self: a #GstAggregator
3263 *
3264 * Returns: (transfer full): the instance of the #GstBufferPool used
3265 * by @trans; free it after use it
3266 */
3267 GstBufferPool *
gst_aggregator_get_buffer_pool(GstAggregator * self)3268 gst_aggregator_get_buffer_pool (GstAggregator * self)
3269 {
3270 GstBufferPool *pool;
3271
3272 g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL);
3273
3274 GST_OBJECT_LOCK (self);
3275 pool = self->priv->pool;
3276 if (pool)
3277 gst_object_ref (pool);
3278 GST_OBJECT_UNLOCK (self);
3279
3280 return pool;
3281 }
3282
3283 /**
3284 * gst_aggregator_get_allocator:
3285 * @self: a #GstAggregator
3286 * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
3287 * used
3288 * @params: (out) (allow-none) (transfer full): the
3289 * #GstAllocationParams of @allocator
3290 *
3291 * Lets #GstAggregator sub-classes get the memory @allocator
3292 * acquired by the base class and its @params.
3293 *
3294 * Unref the @allocator after use it.
3295 */
3296 void
gst_aggregator_get_allocator(GstAggregator * self,GstAllocator ** allocator,GstAllocationParams * params)3297 gst_aggregator_get_allocator (GstAggregator * self,
3298 GstAllocator ** allocator, GstAllocationParams * params)
3299 {
3300 g_return_if_fail (GST_IS_AGGREGATOR (self));
3301
3302 if (allocator)
3303 *allocator = self->priv->allocator ?
3304 gst_object_ref (self->priv->allocator) : NULL;
3305
3306 if (params)
3307 *params = self->priv->allocation_params;
3308 }
3309
3310 /**
3311 * gst_aggregator_simple_get_next_time:
3312 * @self: A #GstAggregator
3313 *
3314 * This is a simple #GstAggregator::get_next_time implementation that
3315 * just looks at the #GstSegment on the srcpad of the aggregator and bases
3316 * the next time on the running time there.
3317 *
3318 * This is the desired behaviour in most cases where you have a live source
3319 * and you have a dead line based aggregator subclass.
3320 *
3321 * Returns: The running time based on the position
3322 *
3323 * Since: 1.16
3324 */
3325 GstClockTime
gst_aggregator_simple_get_next_time(GstAggregator * self)3326 gst_aggregator_simple_get_next_time (GstAggregator * self)
3327 {
3328 GstClockTime next_time;
3329 GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
3330 GstSegment *segment = &srcpad->segment;
3331
3332 GST_OBJECT_LOCK (self);
3333 if (segment->position == -1 || segment->position < segment->start)
3334 next_time = segment->start;
3335 else
3336 next_time = segment->position;
3337
3338 if (segment->stop != -1 && next_time > segment->stop)
3339 next_time = segment->stop;
3340
3341 next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time);
3342 GST_OBJECT_UNLOCK (self);
3343
3344 return next_time;
3345 }
3346