1 /* GStreamer
2  * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23 
24 #include "gststreamsynchronizer.h"
25 
26 GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
27 #define GST_CAT_DEFAULT stream_synchronizer_debug
28 
29 #define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START {                \
30     GST_TRACE_OBJECT (obj,                                              \
31                     "locking from thread %p",                           \
32                     g_thread_self ());                                  \
33     g_mutex_lock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);            \
34     GST_TRACE_OBJECT (obj,                                              \
35                     "locked from thread %p",                            \
36                     g_thread_self ());                                  \
37 } G_STMT_END
38 
39 #define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START {              \
40     GST_TRACE_OBJECT (obj,                                              \
41                     "unlocking from thread %p",                         \
42                     g_thread_self ());                                  \
43     g_mutex_unlock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
44 } G_STMT_END
45 
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
47     GST_PAD_SRC,
48     GST_PAD_SOMETIMES,
49     GST_STATIC_CAPS_ANY);
50 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
51     GST_PAD_SINK,
52     GST_PAD_REQUEST,
53     GST_STATIC_CAPS_ANY);
54 
55 #define gst_stream_synchronizer_parent_class parent_class
56 G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
57     GST_TYPE_ELEMENT);
58 
59 typedef struct
60 {
61   GstStreamSynchronizer *transform;
62   guint stream_number;
63   GstPad *srcpad;
64   GstPad *sinkpad;
65   GstSegment segment;
66 
67   gboolean wait;                /* TRUE if waiting/blocking */
68   gboolean is_eos;              /* TRUE if EOS was received */
69   gboolean eos_sent;            /* when EOS was sent downstream */
70   gboolean flushing;            /* set after flush-start and before flush-stop */
71   gboolean seen_data;
72   gboolean send_gap_event;
73   GstClockTime gap_duration;
74 
75   GstStreamFlags flags;
76 
77   GCond stream_finish_cond;
78 
79   /* seqnum of the previously received STREAM_START
80    * default: G_MAXUINT32 */
81   guint32 stream_start_seqnum;
82   guint32 segment_seqnum;
83   guint group_id;
84 
85   gint refcount;
86 } GstSyncStream;
87 
88 static GstSyncStream *
gst_syncstream_ref(GstSyncStream * stream)89 gst_syncstream_ref (GstSyncStream * stream)
90 {
91   g_return_val_if_fail (stream != NULL, NULL);
92   g_atomic_int_add (&stream->refcount, 1);
93   return stream;
94 }
95 
96 static void
gst_syncstream_unref(GstSyncStream * stream)97 gst_syncstream_unref (GstSyncStream * stream)
98 {
99   g_return_if_fail (stream != NULL);
100   g_return_if_fail (stream->refcount > 0);
101 
102   if (g_atomic_int_dec_and_test (&stream->refcount))
103     g_slice_free (GstSyncStream, stream);
104 }
105 
106 G_BEGIN_DECLS
107 #define GST_TYPE_STREAMSYNC_PAD              (gst_streamsync_pad_get_type ())
108 #define GST_IS_STREAMSYNC_PAD(obj)           (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_STREAMSYNC_PAD))
109 #define GST_IS_STREAMSYNC_PAD_CLASS(klass)   (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_STREAMSYNC_PAD))
110 #define GST_STREAMSYNC_PAD(obj)              (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_STREAMSYNC_PAD, GstStreamSyncPad))
111 #define GST_STREAMSYNC_PAD_CLASS(klass)      (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_STREAMSYNC_PAD, GstStreamSyncPadClass))
112 typedef struct _GstStreamSyncPad GstStreamSyncPad;
113 typedef struct _GstStreamSyncPadClass GstStreamSyncPadClass;
114 
115 struct _GstStreamSyncPad
116 {
117   GstPad parent;
118 
119   GstSyncStream *stream;
120 
121   /* Since we need to access data associated with a pad in this
122    * element, it's important to manage the respective lifetimes of the
123    * stored pad data and the pads themselves. Pad deactivation happens
124    * without mutual exclusion to the use of pad data in this element.
125    *
126    * The approach here is to have the sinkpad (the request pad) hold a
127    * strong reference onto the srcpad (so that it stays alive until
128    * the last pad is destroyed). Similarly the srcpad has a weak
129    * reference to the sinkpad (request pad) to ensure it knows when
130    * the pads are destroyed, since the pad data may be requested from
131    * either the srcpad or the sinkpad. This avoids a nasty set of
132    * potential race conditions.
133    *
134    * The code is arranged so that in the srcpad, the pad pointer is
135    * always NULL (not used) and in the sinkpad, the otherpad is always
136    * NULL. */
137   GstPad *pad;
138   GWeakRef otherpad;
139 };
140 
141 struct _GstStreamSyncPadClass
142 {
143   GstPadClass parent_class;
144 };
145 
146 static GType gst_streamsync_pad_get_type (void);
147 static GstSyncStream *gst_streamsync_pad_get_stream (GstPad * pad);
148 
149 G_END_DECLS
150 #define GST_STREAMSYNC_PAD_CAST(obj)         ((GstStreamSyncPad *)obj)
151   G_DEFINE_TYPE (GstStreamSyncPad, gst_streamsync_pad, GST_TYPE_PAD);
152 
153 static void gst_streamsync_pad_dispose (GObject * object);
154 
155 static void
gst_streamsync_pad_class_init(GstStreamSyncPadClass * klass)156 gst_streamsync_pad_class_init (GstStreamSyncPadClass * klass)
157 {
158   GObjectClass *gobject_class;
159   gobject_class = G_OBJECT_CLASS (klass);
160   gobject_class->dispose = gst_streamsync_pad_dispose;
161 }
162 
163 static void
gst_streamsync_pad_init(GstStreamSyncPad * ppad)164 gst_streamsync_pad_init (GstStreamSyncPad * ppad)
165 {
166 }
167 
168 static void
gst_streamsync_pad_dispose(GObject * object)169 gst_streamsync_pad_dispose (GObject * object)
170 {
171   GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (object);
172 
173   if (GST_PAD_DIRECTION (spad) == GST_PAD_SINK)
174     gst_clear_object (&spad->pad);
175   else
176     g_weak_ref_clear (&spad->otherpad);
177 
178   g_clear_pointer (&spad->stream, gst_syncstream_unref);
179 
180   G_OBJECT_CLASS (gst_streamsync_pad_parent_class)->dispose (object);
181 }
182 
183 static GstPad *
gst_streamsync_pad_new_from_template(GstPadTemplate * templ,const gchar * name)184 gst_streamsync_pad_new_from_template (GstPadTemplate * templ,
185     const gchar * name)
186 {
187   g_return_val_if_fail (GST_IS_PAD_TEMPLATE (templ), NULL);
188 
189   return GST_PAD_CAST (g_object_new (GST_TYPE_STREAMSYNC_PAD,
190           "name", name, "direction", templ->direction, "template", templ,
191           NULL));
192 }
193 
194 static GstPad *
gst_streamsync_pad_new_from_static_template(GstStaticPadTemplate * templ,const gchar * name)195 gst_streamsync_pad_new_from_static_template (GstStaticPadTemplate * templ,
196     const gchar * name)
197 {
198   GstPad *pad;
199   GstPadTemplate *template;
200 
201   template = gst_static_pad_template_get (templ);
202   pad = gst_streamsync_pad_new_from_template (template, name);
203   gst_object_unref (template);
204 
205   return pad;
206 }
207 
208 static GstSyncStream *
gst_streamsync_pad_get_stream(GstPad * pad)209 gst_streamsync_pad_get_stream (GstPad * pad)
210 {
211   GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (pad);
212   return gst_syncstream_ref (spad->stream);
213 }
214 
215 static GstPad *
gst_stream_get_other_pad_from_pad(GstStreamSynchronizer * self,GstPad * pad)216 gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
217 {
218   GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (pad);
219   GstPad *opad = NULL;
220 
221   if (GST_PAD_DIRECTION (pad) == GST_PAD_SINK)
222     opad = gst_object_ref (spad->pad);
223   else
224     opad = g_weak_ref_get (&spad->otherpad);
225 
226   if (!opad)
227     GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
228 
229   return opad;
230 }
231 
232 /* Generic pad functions */
233 static GstIterator *
gst_stream_synchronizer_iterate_internal_links(GstPad * pad,GstObject * parent)234 gst_stream_synchronizer_iterate_internal_links (GstPad * pad,
235     GstObject * parent)
236 {
237   GstIterator *it = NULL;
238   GstPad *opad;
239 
240   opad =
241       gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
242   if (opad) {
243     GValue value = { 0, };
244 
245     g_value_init (&value, GST_TYPE_PAD);
246     g_value_set_object (&value, opad);
247     it = gst_iterator_new_single (GST_TYPE_PAD, &value);
248     g_value_unset (&value);
249     gst_object_unref (opad);
250   }
251 
252   return it;
253 }
254 
255 /* srcpad functions */
256 static gboolean
gst_stream_synchronizer_src_event(GstPad * pad,GstObject * parent,GstEvent * event)257 gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
258     GstEvent * event)
259 {
260   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
261   gboolean ret = FALSE;
262 
263   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
264       GST_EVENT_TYPE_NAME (event), event);
265 
266   switch (GST_EVENT_TYPE (event)) {
267     case GST_EVENT_QOS:{
268       gdouble proportion;
269       GstClockTimeDiff diff;
270       GstClockTime timestamp;
271       gint64 running_time_diff = -1;
272       GstSyncStream *stream;
273 
274       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
275       gst_event_unref (event);
276 
277       GST_STREAM_SYNCHRONIZER_LOCK (self);
278       stream = gst_streamsync_pad_get_stream (pad);
279       running_time_diff = stream->segment.base;
280       gst_syncstream_unref (stream);
281       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
282 
283       if (running_time_diff == -1) {
284         GST_WARNING_OBJECT (pad, "QOS event before group start");
285         goto out;
286       }
287       if (timestamp < running_time_diff) {
288         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
289         goto out;
290       }
291 
292       GST_LOG_OBJECT (pad,
293           "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
294           GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
295           GST_TIME_ARGS (running_time_diff),
296           GST_TIME_ARGS (timestamp - running_time_diff));
297 
298       timestamp -= running_time_diff;
299 
300       /* That case is invalid for QoS events */
301       if (diff < 0 && -diff > timestamp) {
302         GST_DEBUG_OBJECT (pad, "QOS event from previous group");
303         ret = TRUE;
304         goto out;
305       }
306 
307       event =
308           gst_event_new_qos (GST_QOS_TYPE_UNDERFLOW, proportion, diff,
309           timestamp);
310       break;
311     }
312     default:
313       break;
314   }
315 
316   ret = gst_pad_event_default (pad, parent, event);
317 
318 out:
319   return ret;
320 }
321 
322 /* must be called with the STREAM_SYNCHRONIZER_LOCK */
323 static gboolean
gst_stream_synchronizer_wait(GstStreamSynchronizer * self,GstPad * pad)324 gst_stream_synchronizer_wait (GstStreamSynchronizer * self, GstPad * pad)
325 {
326   gboolean ret = FALSE;
327   GstSyncStream *stream;
328 
329   stream = gst_streamsync_pad_get_stream (pad);
330 
331   while (!self->eos && !self->flushing) {
332     if (stream->flushing) {
333       GST_DEBUG_OBJECT (pad, "Flushing");
334       break;
335     }
336     if (!stream->wait) {
337       GST_DEBUG_OBJECT (pad, "Stream not waiting anymore");
338       break;
339     }
340 
341     if (stream->send_gap_event) {
342       GstEvent *event;
343 
344       if (!GST_CLOCK_TIME_IS_VALID (stream->segment.position)) {
345         GST_WARNING_OBJECT (pad, "Have no position and can't send GAP event");
346         stream->send_gap_event = FALSE;
347         continue;
348       }
349 
350       event =
351           gst_event_new_gap (stream->segment.position, stream->gap_duration);
352       GST_DEBUG_OBJECT (pad,
353           "Send GAP event, position: %" GST_TIME_FORMAT " duration: %"
354           GST_TIME_FORMAT, GST_TIME_ARGS (stream->segment.position),
355           GST_TIME_ARGS (stream->gap_duration));
356 
357       /* drop lock when sending GAP event, which may block in e.g. preroll */
358       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
359       ret = gst_pad_push_event (pad, event);
360       GST_STREAM_SYNCHRONIZER_LOCK (self);
361       if (!ret) {
362         gst_syncstream_unref (stream);
363         return ret;
364       }
365       stream->send_gap_event = FALSE;
366 
367       /* force a check on the loop conditions as we unlocked a
368        * few lines above and those variables could have changed */
369       continue;
370     }
371 
372     g_cond_wait (&stream->stream_finish_cond, &self->lock);
373   }
374 
375   gst_syncstream_unref (stream);
376   return TRUE;
377 }
378 
379 /* sinkpad functions */
380 static gboolean
gst_stream_synchronizer_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)381 gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
382     GstEvent * event)
383 {
384   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
385   gboolean ret = FALSE;
386 
387   GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
388       GST_EVENT_TYPE_NAME (event), event);
389 
390   switch (GST_EVENT_TYPE (event)) {
391     case GST_EVENT_STREAM_START:
392     {
393       GstSyncStream *stream, *ostream;
394       guint32 seqnum = gst_event_get_seqnum (event);
395       guint group_id;
396       gboolean have_group_id;
397       GList *l;
398       gboolean all_wait = TRUE;
399       gboolean new_stream = TRUE;
400 
401       have_group_id = gst_event_parse_group_id (event, &group_id);
402 
403       GST_STREAM_SYNCHRONIZER_LOCK (self);
404       self->have_group_id &= have_group_id;
405       have_group_id = self->have_group_id;
406 
407       stream = gst_streamsync_pad_get_stream (pad);
408 
409       gst_event_parse_stream_flags (event, &stream->flags);
410 
411       if ((have_group_id && stream->group_id != group_id) || (!have_group_id
412               && stream->stream_start_seqnum != seqnum)) {
413         stream->is_eos = FALSE;
414         stream->eos_sent = FALSE;
415         stream->flushing = FALSE;
416         stream->stream_start_seqnum = seqnum;
417         stream->group_id = group_id;
418 
419         if (!have_group_id) {
420           /* Check if this belongs to a stream that is already there,
421            * e.g. we got the visualizations for an audio stream */
422           for (l = self->streams; l; l = l->next) {
423             ostream = l->data;
424 
425             if (ostream != stream && ostream->stream_start_seqnum == seqnum
426                 && !ostream->wait) {
427               new_stream = FALSE;
428               break;
429             }
430           }
431 
432           if (!new_stream) {
433             GST_DEBUG_OBJECT (pad,
434                 "Stream %d belongs to running stream %d, no waiting",
435                 stream->stream_number, ostream->stream_number);
436             stream->wait = FALSE;
437 
438             GST_STREAM_SYNCHRONIZER_UNLOCK (self);
439             break;
440           }
441         } else if (group_id == self->group_id) {
442           GST_DEBUG_OBJECT (pad, "Stream %d belongs to running group %d, "
443               "no waiting", stream->stream_number, group_id);
444           GST_STREAM_SYNCHRONIZER_UNLOCK (self);
445           break;
446         }
447 
448         GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
449 
450         stream->wait = TRUE;
451 
452         for (l = self->streams; l; l = l->next) {
453           GstSyncStream *ostream = l->data;
454 
455           all_wait = all_wait && ((ostream->flags & GST_STREAM_FLAG_SPARSE)
456               || (ostream->wait && (!have_group_id
457                       || ostream->group_id == group_id)));
458           if (!all_wait)
459             break;
460         }
461 
462         if (all_wait) {
463           gint64 position = 0;
464 
465           if (have_group_id)
466             GST_DEBUG_OBJECT (self,
467                 "All streams have changed to group id %u -- unblocking",
468                 group_id);
469           else
470             GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
471 
472           self->group_id = group_id;
473 
474           for (l = self->streams; l; l = l->next) {
475             GstSyncStream *ostream = l->data;
476             gint64 stop_running_time;
477             gint64 position_running_time;
478 
479             ostream->wait = FALSE;
480 
481             if (ostream->segment.format == GST_FORMAT_TIME) {
482               if (ostream->segment.rate > 0)
483                 stop_running_time =
484                     gst_segment_to_running_time (&ostream->segment,
485                     GST_FORMAT_TIME, ostream->segment.stop);
486               else
487                 stop_running_time =
488                     gst_segment_to_running_time (&ostream->segment,
489                     GST_FORMAT_TIME, ostream->segment.start);
490 
491               position_running_time =
492                   gst_segment_to_running_time (&ostream->segment,
493                   GST_FORMAT_TIME, ostream->segment.position);
494 
495               position_running_time =
496                   MAX (position_running_time, stop_running_time);
497 
498               if (ostream->segment.rate > 0)
499                 position_running_time -=
500                     gst_segment_to_running_time (&ostream->segment,
501                     GST_FORMAT_TIME, ostream->segment.start);
502               else
503                 position_running_time -=
504                     gst_segment_to_running_time (&ostream->segment,
505                     GST_FORMAT_TIME, ostream->segment.stop);
506 
507               position_running_time = MAX (0, position_running_time);
508 
509               position = MAX (position, position_running_time);
510             }
511           }
512 
513           self->group_start_time += position;
514 
515           GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
516               GST_TIME_ARGS (self->group_start_time));
517 
518           for (l = self->streams; l; l = l->next) {
519             GstSyncStream *ostream = l->data;
520             ostream->wait = FALSE;
521             g_cond_broadcast (&ostream->stream_finish_cond);
522           }
523         }
524       }
525 
526       gst_syncstream_unref (stream);
527       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
528       break;
529     }
530     case GST_EVENT_SEGMENT:{
531       GstSyncStream *stream;
532       GstSegment segment;
533 
534       gst_event_copy_segment (event, &segment);
535 
536       GST_STREAM_SYNCHRONIZER_LOCK (self);
537 
538       gst_stream_synchronizer_wait (self, pad);
539 
540       if (self->shutdown) {
541         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
542         gst_event_unref (event);
543         goto done;
544       }
545 
546       stream = gst_streamsync_pad_get_stream (pad);
547       if (segment.format == GST_FORMAT_TIME) {
548         GST_DEBUG_OBJECT (pad,
549             "New stream, updating base from %" GST_TIME_FORMAT " to %"
550             GST_TIME_FORMAT, GST_TIME_ARGS (segment.base),
551             GST_TIME_ARGS (segment.base + self->group_start_time));
552         segment.base += self->group_start_time;
553 
554         GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
555             &stream->segment);
556         gst_segment_copy_into (&segment, &stream->segment);
557         GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
558             &stream->segment);
559         stream->segment_seqnum = gst_event_get_seqnum (event);
560 
561         GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
562             GST_TIME_ARGS (stream->segment.base));
563         {
564           GstEvent *tmpev;
565 
566           tmpev = gst_event_new_segment (&stream->segment);
567           gst_event_set_seqnum (tmpev, stream->segment_seqnum);
568           gst_event_unref (event);
569           event = tmpev;
570         }
571       } else if (stream) {
572         GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
573             gst_format_get_name (segment.format));
574         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
575       }
576       gst_syncstream_unref (stream);
577       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
578       break;
579     }
580     case GST_EVENT_FLUSH_START:{
581       GstSyncStream *stream;
582 
583       GST_STREAM_SYNCHRONIZER_LOCK (self);
584       stream = gst_streamsync_pad_get_stream (pad);
585       self->eos = FALSE;
586       GST_DEBUG_OBJECT (pad, "Flushing streams");
587       stream->flushing = TRUE;
588       g_cond_broadcast (&stream->stream_finish_cond);
589       gst_syncstream_unref (stream);
590       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
591       break;
592     }
593     case GST_EVENT_FLUSH_STOP:{
594       GstSyncStream *stream;
595       GList *l;
596       GstClockTime new_group_start_time = 0;
597 
598       GST_STREAM_SYNCHRONIZER_LOCK (self);
599       stream = gst_streamsync_pad_get_stream (pad);
600       GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
601           stream->stream_number);
602       gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
603 
604       stream->is_eos = FALSE;
605       stream->eos_sent = FALSE;
606       stream->flushing = FALSE;
607       stream->wait = FALSE;
608       g_cond_broadcast (&stream->stream_finish_cond);
609 
610       for (l = self->streams; l; l = l->next) {
611         GstSyncStream *ostream = l->data;
612         GstClockTime start_running_time;
613 
614         if (ostream == stream || ostream->flushing)
615           continue;
616 
617         if (ostream->segment.format == GST_FORMAT_TIME) {
618           if (ostream->segment.rate > 0)
619             start_running_time =
620                 gst_segment_to_running_time (&ostream->segment,
621                 GST_FORMAT_TIME, ostream->segment.start);
622           else
623             start_running_time =
624                 gst_segment_to_running_time (&ostream->segment,
625                 GST_FORMAT_TIME, ostream->segment.stop);
626 
627           new_group_start_time = MAX (new_group_start_time, start_running_time);
628         }
629       }
630 
631       GST_DEBUG_OBJECT (pad,
632           "Updating group start time from %" GST_TIME_FORMAT " to %"
633           GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
634           GST_TIME_ARGS (new_group_start_time));
635       self->group_start_time = new_group_start_time;
636 
637       gst_syncstream_unref (stream);
638       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
639       break;
640     }
641       /* unblocking EOS wait when track switch. */
642     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:{
643       if (gst_event_has_name (event, "playsink-custom-video-flush")
644           || gst_event_has_name (event, "playsink-custom-audio-flush")
645           || gst_event_has_name (event, "playsink-custom-subtitle-flush")) {
646         GstSyncStream *stream;
647 
648         GST_STREAM_SYNCHRONIZER_LOCK (self);
649         stream = gst_streamsync_pad_get_stream (pad);
650         stream->is_eos = FALSE;
651         stream->eos_sent = FALSE;
652         stream->wait = FALSE;
653         g_cond_broadcast (&stream->stream_finish_cond);
654         gst_syncstream_unref (stream);
655         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
656       }
657       break;
658     }
659     case GST_EVENT_EOS:{
660       GstSyncStream *stream;
661       GList *l;
662       gboolean all_eos = TRUE;
663       gboolean seen_data;
664       GSList *pads = NULL;
665       GstPad *srcpad;
666       GstClockTime timestamp;
667       guint32 seqnum;
668 
669       GST_STREAM_SYNCHRONIZER_LOCK (self);
670       stream = gst_streamsync_pad_get_stream (pad);
671 
672       GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
673       stream->is_eos = TRUE;
674 
675       seen_data = stream->seen_data;
676       srcpad = gst_object_ref (stream->srcpad);
677       seqnum = stream->segment_seqnum;
678 
679       if (seen_data && stream->segment.position != -1)
680         timestamp = stream->segment.position;
681       else if (stream->segment.rate < 0.0 || stream->segment.stop == -1)
682         timestamp = stream->segment.start;
683       else
684         timestamp = stream->segment.stop;
685 
686       stream->segment.position = timestamp;
687 
688       for (l = self->streams; l; l = l->next) {
689         GstSyncStream *ostream = l->data;
690 
691         all_eos = all_eos && ostream->is_eos;
692         if (!all_eos)
693           break;
694       }
695 
696       if (all_eos) {
697         GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
698         self->eos = TRUE;
699         for (l = self->streams; l; l = l->next) {
700           GstSyncStream *ostream = l->data;
701           /* local snapshot of current pads */
702           gst_object_ref (ostream->srcpad);
703           pads = g_slist_prepend (pads, ostream->srcpad);
704         }
705       }
706       if (pads) {
707         GstPad *pad;
708         GSList *epad;
709         GstSyncStream *ostream;
710 
711         ret = TRUE;
712         epad = pads;
713         while (epad) {
714           pad = epad->data;
715           ostream = gst_streamsync_pad_get_stream (pad);
716           g_cond_broadcast (&ostream->stream_finish_cond);
717           gst_syncstream_unref (ostream);
718           gst_object_unref (pad);
719           epad = g_slist_next (epad);
720         }
721         g_slist_free (pads);
722       } else {
723         if (seen_data) {
724           stream->send_gap_event = TRUE;
725           stream->gap_duration = GST_CLOCK_TIME_NONE;
726           stream->wait = TRUE;
727           ret = gst_stream_synchronizer_wait (self, srcpad);
728         }
729       }
730 
731       /* send eos if haven't seen data. seen_data will be true if data buffer
732        * of the track have received in anytime. sink is ready if seen_data is
733        * true, so can send GAP event. Will send EOS if sink isn't ready. The
734        * scenario for the case is one track haven't any media data and then
735        * send EOS. Or no any valid media data in one track, so decoder can't
736        * get valid CAPS for the track. sink can't ready without received CAPS.*/
737       if (!seen_data || self->eos) {
738         GstEvent *topush;
739         GST_DEBUG_OBJECT (pad, "send EOS event");
740         /* drop lock when sending eos, which may block in e.g. preroll */
741         topush = gst_event_new_eos ();
742         gst_event_set_seqnum (topush, seqnum);
743         GST_STREAM_SYNCHRONIZER_UNLOCK (self);
744         ret = gst_pad_push_event (srcpad, topush);
745         GST_STREAM_SYNCHRONIZER_LOCK (self);
746         stream = gst_streamsync_pad_get_stream (pad);
747         stream->eos_sent = TRUE;
748         gst_syncstream_unref (stream);
749       }
750 
751       gst_object_unref (srcpad);
752       gst_event_unref (event);
753       gst_syncstream_unref (stream);
754       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
755       goto done;
756     }
757     default:
758       break;
759   }
760 
761   ret = gst_pad_event_default (pad, parent, event);
762 
763 done:
764 
765   return ret;
766 }
767 
768 static GstFlowReturn
gst_stream_synchronizer_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)769 gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
770     GstBuffer * buffer)
771 {
772   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
773   GstPad *opad;
774   GstFlowReturn ret = GST_FLOW_ERROR;
775   GstSyncStream *stream;
776   GstClockTime duration = GST_CLOCK_TIME_NONE;
777   GstClockTime timestamp = GST_CLOCK_TIME_NONE;
778   GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
779 
780   GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT
781       ", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
782       " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
783       buffer, gst_buffer_get_size (buffer),
784       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
785       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
786       GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
787 
788   timestamp = GST_BUFFER_TIMESTAMP (buffer);
789   duration = GST_BUFFER_DURATION (buffer);
790   if (GST_CLOCK_TIME_IS_VALID (timestamp)
791       && GST_CLOCK_TIME_IS_VALID (duration))
792     timestamp_end = timestamp + duration;
793 
794   GST_STREAM_SYNCHRONIZER_LOCK (self);
795   stream = gst_streamsync_pad_get_stream (pad);
796 
797   stream->seen_data = TRUE;
798   if (stream->segment.format == GST_FORMAT_TIME
799       && GST_CLOCK_TIME_IS_VALID (timestamp)) {
800     GST_LOG_OBJECT (pad,
801         "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
802         GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
803     if (stream->segment.rate > 0.0)
804       stream->segment.position = timestamp;
805     else
806       stream->segment.position = timestamp_end;
807   }
808 
809   gst_syncstream_unref (stream);
810   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
811 
812   opad = gst_stream_get_other_pad_from_pad (self, pad);
813   if (opad) {
814     ret = gst_pad_push (opad, buffer);
815     gst_object_unref (opad);
816   }
817 
818   GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
819   if (ret == GST_FLOW_OK) {
820     GList *l;
821 
822     GST_STREAM_SYNCHRONIZER_LOCK (self);
823     stream = gst_streamsync_pad_get_stream (pad);
824     if (stream->segment.format == GST_FORMAT_TIME) {
825       GstClockTime position;
826 
827       if (stream->segment.rate > 0.0)
828         position = timestamp_end;
829       else
830         position = timestamp;
831 
832       if (GST_CLOCK_TIME_IS_VALID (position)) {
833         GST_LOG_OBJECT (pad,
834             "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
835             GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (position));
836         stream->segment.position = position;
837       }
838     }
839 
840     /* Advance EOS streams if necessary. For non-EOS
841      * streams the demuxers should already do this! */
842     if (!GST_CLOCK_TIME_IS_VALID (timestamp_end) &&
843         GST_CLOCK_TIME_IS_VALID (timestamp)) {
844       timestamp_end = timestamp + GST_SECOND;
845     }
846 
847     for (l = self->streams; l; l = l->next) {
848       GstSyncStream *ostream = l->data;
849       gint64 position;
850 
851       if (!ostream->is_eos || ostream->eos_sent ||
852           ostream->segment.format != GST_FORMAT_TIME)
853         continue;
854 
855       if (ostream->segment.position != -1)
856         position = ostream->segment.position;
857       else
858         position = ostream->segment.start;
859 
860       /* Is there a 1 second lag? */
861       if (position != -1 && GST_CLOCK_TIME_IS_VALID (timestamp_end) &&
862           position + GST_SECOND < timestamp_end) {
863         gint64 new_start;
864 
865         new_start = timestamp_end - GST_SECOND;
866 
867         GST_DEBUG_OBJECT (ostream->sinkpad,
868             "Advancing stream %u from %" GST_TIME_FORMAT " to %"
869             GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (position),
870             GST_TIME_ARGS (new_start));
871 
872         ostream->segment.position = new_start;
873 
874         ostream->send_gap_event = TRUE;
875         ostream->gap_duration = new_start - position;
876         g_cond_broadcast (&ostream->stream_finish_cond);
877       }
878     }
879     gst_syncstream_unref (stream);
880     GST_STREAM_SYNCHRONIZER_UNLOCK (self);
881   }
882 
883   return ret;
884 }
885 
886 /* Must be called with lock! */
887 static GstPad *
gst_stream_synchronizer_new_pad(GstStreamSynchronizer * sync)888 gst_stream_synchronizer_new_pad (GstStreamSynchronizer * sync)
889 {
890   GstSyncStream *stream = NULL;
891   GstStreamSyncPad *sinkpad, *srcpad;
892   gchar *tmp;
893 
894   stream = g_slice_new0 (GstSyncStream);
895   stream->transform = sync;
896   stream->stream_number = sync->current_stream_number;
897   g_cond_init (&stream->stream_finish_cond);
898   stream->stream_start_seqnum = G_MAXUINT32;
899   stream->segment_seqnum = G_MAXUINT32;
900   stream->group_id = G_MAXUINT;
901   stream->seen_data = FALSE;
902   stream->send_gap_event = FALSE;
903   stream->refcount = 1;
904 
905   tmp = g_strdup_printf ("sink_%u", sync->current_stream_number);
906   stream->sinkpad =
907       gst_streamsync_pad_new_from_static_template (&sinktemplate, tmp);
908   g_free (tmp);
909 
910   GST_STREAMSYNC_PAD_CAST (stream->sinkpad)->stream =
911       gst_syncstream_ref (stream);
912 
913   gst_pad_set_iterate_internal_links_function (stream->sinkpad,
914       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
915   gst_pad_set_event_function (stream->sinkpad,
916       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
917   gst_pad_set_chain_function (stream->sinkpad,
918       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
919   GST_PAD_SET_PROXY_CAPS (stream->sinkpad);
920   GST_PAD_SET_PROXY_ALLOCATION (stream->sinkpad);
921   GST_PAD_SET_PROXY_SCHEDULING (stream->sinkpad);
922 
923   tmp = g_strdup_printf ("src_%u", sync->current_stream_number);
924   stream->srcpad =
925       gst_streamsync_pad_new_from_static_template (&srctemplate, tmp);
926   g_free (tmp);
927 
928   GST_STREAMSYNC_PAD_CAST (stream->srcpad)->stream =
929       gst_syncstream_ref (stream);
930 
931   sinkpad = GST_STREAMSYNC_PAD_CAST (stream->sinkpad);
932   srcpad = GST_STREAMSYNC_PAD_CAST (stream->srcpad);
933   /* Hold a strong reference from the sink (request pad) to the src to
934    * ensure a predicatable destruction order */
935   sinkpad->pad = gst_object_ref (srcpad);
936   /* And a weak reference from the src to the sink, to know when pad
937    * release is occuring, and to ensure we do not try and take
938    * references to inactive / destructing streams. */
939   g_weak_ref_init (&srcpad->otherpad, stream->sinkpad);
940 
941   gst_pad_set_iterate_internal_links_function (stream->srcpad,
942       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
943   gst_pad_set_event_function (stream->srcpad,
944       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
945   GST_PAD_SET_PROXY_CAPS (stream->srcpad);
946   GST_PAD_SET_PROXY_ALLOCATION (stream->srcpad);
947   GST_PAD_SET_PROXY_SCHEDULING (stream->srcpad);
948 
949   gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
950 
951   GST_STREAM_SYNCHRONIZER_UNLOCK (sync);
952 
953   /* Add pads and activate unless we're going to NULL */
954   g_rec_mutex_lock (GST_STATE_GET_LOCK (sync));
955   if (GST_STATE_TARGET (sync) != GST_STATE_NULL) {
956     gst_pad_set_active (stream->srcpad, TRUE);
957     gst_pad_set_active (stream->sinkpad, TRUE);
958   }
959   gst_element_add_pad (GST_ELEMENT_CAST (sync), stream->srcpad);
960   gst_element_add_pad (GST_ELEMENT_CAST (sync), stream->sinkpad);
961   g_rec_mutex_unlock (GST_STATE_GET_LOCK (sync));
962 
963   GST_STREAM_SYNCHRONIZER_LOCK (sync);
964 
965   sync->streams = g_list_prepend (sync->streams, g_steal_pointer (&stream));
966   sync->current_stream_number++;
967 
968   return GST_PAD_CAST (sinkpad);
969 }
970 
971 /* GstElement vfuncs */
972 static GstPad *
gst_stream_synchronizer_request_new_pad(GstElement * element,GstPadTemplate * temp,const gchar * name,const GstCaps * caps)973 gst_stream_synchronizer_request_new_pad (GstElement * element,
974     GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
975 {
976   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
977   GstPad *request_pad;
978 
979   GST_STREAM_SYNCHRONIZER_LOCK (self);
980   GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
981       self->current_stream_number);
982 
983   request_pad = gst_stream_synchronizer_new_pad (self);
984 
985   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
986 
987   return request_pad;
988 }
989 
990 /* Must be called with lock! */
991 static void
gst_stream_synchronizer_release_stream(GstStreamSynchronizer * self,GstSyncStream * stream)992 gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
993     GstSyncStream * stream)
994 {
995   GList *l;
996 
997   GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
998 
999   for (l = self->streams; l; l = l->next) {
1000     if (l->data == stream) {
1001       self->streams = g_list_delete_link (self->streams, l);
1002       break;
1003     }
1004   }
1005   g_assert (l != NULL);
1006   if (self->streams == NULL) {
1007     self->have_group_id = TRUE;
1008     self->group_id = G_MAXUINT;
1009   }
1010 
1011   /* we can drop the lock, since stream exists now only local.
1012    * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
1013    * (due to reverse lock order) when deactivating pads */
1014   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1015 
1016   gst_pad_set_active (stream->srcpad, FALSE);
1017   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
1018   gst_pad_set_active (stream->sinkpad, FALSE);
1019   gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
1020 
1021   g_cond_clear (&stream->stream_finish_cond);
1022 
1023   /* Release the ref maintaining validity in the streams list */
1024   gst_syncstream_unref (stream);
1025 
1026   /* NOTE: In theory we have to check here if all streams
1027    * are EOS but the one that was removed wasn't and then
1028    * send EOS downstream. But due to the way how playsink
1029    * works this is not necessary and will only cause problems
1030    * for gapless playback. playsink will only add/remove pads
1031    * when it's reconfigured, which happens when the streams
1032    * change
1033    */
1034 
1035   /* lock for good measure, since the caller had it */
1036   GST_STREAM_SYNCHRONIZER_LOCK (self);
1037 }
1038 
1039 static void
gst_stream_synchronizer_release_pad(GstElement * element,GstPad * pad)1040 gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
1041 {
1042   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
1043   GstSyncStream *stream;
1044 
1045   GST_STREAM_SYNCHRONIZER_LOCK (self);
1046   stream = gst_streamsync_pad_get_stream (pad);
1047   g_assert (stream->sinkpad == pad);
1048 
1049   gst_stream_synchronizer_release_stream (self, stream);
1050 
1051   gst_syncstream_unref (stream);
1052   GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1053 }
1054 
1055 static GstStateChangeReturn
gst_stream_synchronizer_change_state(GstElement * element,GstStateChange transition)1056 gst_stream_synchronizer_change_state (GstElement * element,
1057     GstStateChange transition)
1058 {
1059   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
1060   GstStateChangeReturn ret;
1061 
1062   switch (transition) {
1063     case GST_STATE_CHANGE_NULL_TO_READY:
1064       GST_DEBUG_OBJECT (self, "State change NULL->READY");
1065       self->shutdown = FALSE;
1066       break;
1067     case GST_STATE_CHANGE_READY_TO_PAUSED:
1068       GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
1069       self->group_start_time = 0;
1070       self->have_group_id = TRUE;
1071       self->group_id = G_MAXUINT;
1072       self->shutdown = FALSE;
1073       self->flushing = FALSE;
1074       self->eos = FALSE;
1075       break;
1076     case GST_STATE_CHANGE_PAUSED_TO_READY:{
1077       GList *l;
1078 
1079       GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
1080 
1081       GST_STREAM_SYNCHRONIZER_LOCK (self);
1082       self->flushing = TRUE;
1083       self->shutdown = TRUE;
1084       for (l = self->streams; l; l = l->next) {
1085         GstSyncStream *ostream = l->data;
1086         g_cond_broadcast (&ostream->stream_finish_cond);
1087       }
1088       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1089     }
1090     default:
1091       break;
1092   }
1093 
1094   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1095   GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", ret);
1096   if (G_UNLIKELY (ret != GST_STATE_CHANGE_SUCCESS))
1097     return ret;
1098 
1099   switch (transition) {
1100     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:{
1101       GList *l;
1102 
1103       GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
1104 
1105       GST_STREAM_SYNCHRONIZER_LOCK (self);
1106       for (l = self->streams; l; l = l->next) {
1107         GstSyncStream *stream = l->data;
1108         /* send GAP event to sink to finished pre-roll. The reason is function
1109          * chain () will be blocked on pad_push (), so can't trigger the track
1110          * which reach EOS to send GAP event. */
1111         if (stream->is_eos && !stream->eos_sent) {
1112           stream->send_gap_event = TRUE;
1113           stream->gap_duration = GST_CLOCK_TIME_NONE;
1114           g_cond_broadcast (&stream->stream_finish_cond);
1115         }
1116       }
1117       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1118       break;
1119     }
1120     case GST_STATE_CHANGE_PAUSED_TO_READY:{
1121       GList *l;
1122 
1123       GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
1124       self->group_start_time = 0;
1125 
1126       GST_STREAM_SYNCHRONIZER_LOCK (self);
1127       for (l = self->streams; l; l = l->next) {
1128         GstSyncStream *stream = l->data;
1129 
1130         gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
1131         stream->gap_duration = GST_CLOCK_TIME_NONE;
1132         stream->wait = FALSE;
1133         stream->is_eos = FALSE;
1134         stream->eos_sent = FALSE;
1135         stream->flushing = FALSE;
1136         stream->send_gap_event = FALSE;
1137       }
1138       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1139       break;
1140     }
1141     case GST_STATE_CHANGE_READY_TO_NULL:{
1142       GST_DEBUG_OBJECT (self, "State change READY->NULL");
1143 
1144       GST_STREAM_SYNCHRONIZER_LOCK (self);
1145       self->current_stream_number = 0;
1146       GST_STREAM_SYNCHRONIZER_UNLOCK (self);
1147       break;
1148     }
1149     default:
1150       break;
1151   }
1152 
1153   return ret;
1154 }
1155 
1156 /* GObject vfuncs */
1157 static void
gst_stream_synchronizer_finalize(GObject * object)1158 gst_stream_synchronizer_finalize (GObject * object)
1159 {
1160   GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
1161 
1162   g_mutex_clear (&self->lock);
1163 
1164   G_OBJECT_CLASS (parent_class)->finalize (object);
1165 }
1166 
1167 /* GObject type initialization */
1168 static void
gst_stream_synchronizer_init(GstStreamSynchronizer * self)1169 gst_stream_synchronizer_init (GstStreamSynchronizer * self)
1170 {
1171   g_mutex_init (&self->lock);
1172 }
1173 
1174 static void
gst_stream_synchronizer_class_init(GstStreamSynchronizerClass * klass)1175 gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
1176 {
1177   GObjectClass *gobject_class = (GObjectClass *) klass;
1178   GstElementClass *element_class = (GstElementClass *) klass;
1179 
1180   gobject_class->finalize = gst_stream_synchronizer_finalize;
1181 
1182   gst_element_class_add_static_pad_template (element_class, &srctemplate);
1183   gst_element_class_add_static_pad_template (element_class, &sinktemplate);
1184 
1185   gst_element_class_set_static_metadata (element_class,
1186       "Stream Synchronizer", "Generic",
1187       "Synchronizes a group of streams to have equal durations and starting points",
1188       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
1189 
1190   element_class->change_state =
1191       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
1192   element_class->request_new_pad =
1193       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
1194   element_class->release_pad =
1195       GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
1196 }
1197 
1198 gboolean
gst_stream_synchronizer_plugin_init(GstPlugin * plugin)1199 gst_stream_synchronizer_plugin_init (GstPlugin * plugin)
1200 {
1201   GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
1202       "streamsynchronizer", 0, "Stream Synchronizer");
1203 
1204   return gst_element_register (plugin, "streamsynchronizer", GST_RANK_NONE,
1205       GST_TYPE_STREAM_SYNCHRONIZER);
1206 }
1207