1 /* GStreamer
2  * Copyright (C) <2005> Philippe Khalaf <burger@speedy.org>
3  * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 
21 /**
22  * SECTION:gstrtpbasedepayload
23  * @title: GstRTPBaseDepayload
24  * @short_description: Base class for RTP depayloader
25  *
26  * Provides a base class for RTP depayloaders
27  */
28 #ifdef HAVE_CONFIG_H
29 #include "config.h"
30 #endif
31 
32 #include "gstrtpbasedepayload.h"
33 #include "gstrtpmeta.h"
34 
35 GST_DEBUG_CATEGORY_STATIC (rtpbasedepayload_debug);
36 #define GST_CAT_DEFAULT (rtpbasedepayload_debug)
37 
38 struct _GstRTPBaseDepayloadPrivate
39 {
40   GstClockTime npt_start;
41   GstClockTime npt_stop;
42   gdouble play_speed;
43   gdouble play_scale;
44   guint clock_base;
45 
46   gboolean discont;
47   GstClockTime pts;
48   GstClockTime dts;
49   GstClockTime duration;
50 
51   guint32 last_ssrc;
52   guint32 last_seqnum;
53   guint32 last_rtptime;
54   guint32 next_seqnum;
55 
56   gboolean negotiated;
57 
58   GstCaps *last_caps;
59   GstEvent *segment_event;
60   guint32 segment_seqnum;       /* Note: this is a GstEvent seqnum */
61 
62   gboolean source_info;
63   GstBuffer *input_buffer;
64 };
65 
66 /* Filter signals and args */
67 enum
68 {
69   /* FILL ME */
70   LAST_SIGNAL
71 };
72 
73 #define DEFAULT_SOURCE_INFO FALSE
74 
75 enum
76 {
77   PROP_0,
78   PROP_STATS,
79   PROP_SOURCE_INFO,
80   PROP_LAST
81 };
82 
83 static void gst_rtp_base_depayload_finalize (GObject * object);
84 static void gst_rtp_base_depayload_set_property (GObject * object,
85     guint prop_id, const GValue * value, GParamSpec * pspec);
86 static void gst_rtp_base_depayload_get_property (GObject * object,
87     guint prop_id, GValue * value, GParamSpec * pspec);
88 
89 static GstFlowReturn gst_rtp_base_depayload_chain (GstPad * pad,
90     GstObject * parent, GstBuffer * in);
91 static GstFlowReturn gst_rtp_base_depayload_chain_list (GstPad * pad,
92     GstObject * parent, GstBufferList * list);
93 static gboolean gst_rtp_base_depayload_handle_sink_event (GstPad * pad,
94     GstObject * parent, GstEvent * event);
95 
96 static GstStateChangeReturn gst_rtp_base_depayload_change_state (GstElement *
97     element, GstStateChange transition);
98 
99 static gboolean gst_rtp_base_depayload_packet_lost (GstRTPBaseDepayload *
100     filter, GstEvent * event);
101 static gboolean gst_rtp_base_depayload_handle_event (GstRTPBaseDepayload *
102     filter, GstEvent * event);
103 
104 static GstElementClass *parent_class = NULL;
105 static gint private_offset = 0;
106 
107 static void gst_rtp_base_depayload_class_init (GstRTPBaseDepayloadClass *
108     klass);
109 static void gst_rtp_base_depayload_init (GstRTPBaseDepayload * rtpbasepayload,
110     GstRTPBaseDepayloadClass * klass);
111 static GstEvent *create_segment_event (GstRTPBaseDepayload * filter,
112     guint rtptime, GstClockTime position);
113 
114 GType
gst_rtp_base_depayload_get_type(void)115 gst_rtp_base_depayload_get_type (void)
116 {
117   static GType rtp_base_depayload_type = 0;
118 
119   if (g_once_init_enter ((gsize *) & rtp_base_depayload_type)) {
120     static const GTypeInfo rtp_base_depayload_info = {
121       sizeof (GstRTPBaseDepayloadClass),
122       NULL,
123       NULL,
124       (GClassInitFunc) gst_rtp_base_depayload_class_init,
125       NULL,
126       NULL,
127       sizeof (GstRTPBaseDepayload),
128       0,
129       (GInstanceInitFunc) gst_rtp_base_depayload_init,
130     };
131     GType _type;
132 
133     _type = g_type_register_static (GST_TYPE_ELEMENT, "GstRTPBaseDepayload",
134         &rtp_base_depayload_info, G_TYPE_FLAG_ABSTRACT);
135 
136     private_offset =
137         g_type_add_instance_private (_type,
138         sizeof (GstRTPBaseDepayloadPrivate));
139 
140     g_once_init_leave ((gsize *) & rtp_base_depayload_type, _type);
141   }
142   return rtp_base_depayload_type;
143 }
144 
145 static inline GstRTPBaseDepayloadPrivate *
gst_rtp_base_depayload_get_instance_private(GstRTPBaseDepayload * self)146 gst_rtp_base_depayload_get_instance_private (GstRTPBaseDepayload * self)
147 {
148   return (G_STRUCT_MEMBER_P (self, private_offset));
149 }
150 
151 static void
gst_rtp_base_depayload_class_init(GstRTPBaseDepayloadClass * klass)152 gst_rtp_base_depayload_class_init (GstRTPBaseDepayloadClass * klass)
153 {
154   GObjectClass *gobject_class;
155   GstElementClass *gstelement_class;
156 
157   gobject_class = G_OBJECT_CLASS (klass);
158   gstelement_class = (GstElementClass *) klass;
159   parent_class = g_type_class_peek_parent (klass);
160 
161   if (private_offset != 0)
162     g_type_class_adjust_private_offset (klass, &private_offset);
163 
164   gobject_class->finalize = gst_rtp_base_depayload_finalize;
165   gobject_class->set_property = gst_rtp_base_depayload_set_property;
166   gobject_class->get_property = gst_rtp_base_depayload_get_property;
167 
168 
169   /**
170    * GstRTPBaseDepayload:stats:
171    *
172    * Various depayloader statistics retrieved atomically (and are therefore
173    * synchroized with each other). This property return a GstStructure named
174    * application/x-rtp-depayload-stats containing the following fields relating to
175    * the last processed buffer and current state of the stream being depayloaded:
176    *
177    *   * `clock-rate`: #G_TYPE_UINT, clock-rate of the stream
178    *   * `npt-start`: #G_TYPE_UINT64, time of playback start
179    *   * `npt-stop`: #G_TYPE_UINT64, time of playback stop
180    *   * `play-speed`: #G_TYPE_DOUBLE, the playback speed
181    *   * `play-scale`: #G_TYPE_DOUBLE, the playback scale
182    *   * `running-time-dts`: #G_TYPE_UINT64, the last running-time of the
183    *      last DTS
184    *   * `running-time-pts`: #G_TYPE_UINT64, the last running-time of the
185    *      last PTS
186    *   * `seqnum`: #G_TYPE_UINT, the last seen seqnum
187    *   * `timestamp`: #G_TYPE_UINT, the last seen RTP timestamp
188    **/
189   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_STATS,
190       g_param_spec_boxed ("stats", "Statistics", "Various statistics",
191           GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
192 
193   /**
194    * GstRTPBaseDepayload:source-info:
195    *
196    * Add RTP source information found in RTP header as meta to output buffer.
197    *
198    * Since: 1.16
199    **/
200   g_object_class_install_property (gobject_class, PROP_SOURCE_INFO,
201       g_param_spec_boolean ("source-info", "RTP source information",
202           "Add RTP source information as buffer meta",
203           DEFAULT_SOURCE_INFO, G_PARAM_READWRITE));
204 
205   gstelement_class->change_state = gst_rtp_base_depayload_change_state;
206 
207   klass->packet_lost = gst_rtp_base_depayload_packet_lost;
208   klass->handle_event = gst_rtp_base_depayload_handle_event;
209 
210   GST_DEBUG_CATEGORY_INIT (rtpbasedepayload_debug, "rtpbasedepayload", 0,
211       "Base class for RTP Depayloaders");
212 }
213 
214 static void
gst_rtp_base_depayload_init(GstRTPBaseDepayload * filter,GstRTPBaseDepayloadClass * klass)215 gst_rtp_base_depayload_init (GstRTPBaseDepayload * filter,
216     GstRTPBaseDepayloadClass * klass)
217 {
218   GstPadTemplate *pad_template;
219   GstRTPBaseDepayloadPrivate *priv;
220 
221   priv = gst_rtp_base_depayload_get_instance_private (filter);
222 
223   filter->priv = priv;
224 
225   GST_DEBUG_OBJECT (filter, "init");
226 
227   pad_template =
228       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
229   g_return_if_fail (pad_template != NULL);
230   filter->sinkpad = gst_pad_new_from_template (pad_template, "sink");
231   gst_pad_set_chain_function (filter->sinkpad, gst_rtp_base_depayload_chain);
232   gst_pad_set_chain_list_function (filter->sinkpad,
233       gst_rtp_base_depayload_chain_list);
234   gst_pad_set_event_function (filter->sinkpad,
235       gst_rtp_base_depayload_handle_sink_event);
236   gst_element_add_pad (GST_ELEMENT (filter), filter->sinkpad);
237 
238   pad_template =
239       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
240   g_return_if_fail (pad_template != NULL);
241   filter->srcpad = gst_pad_new_from_template (pad_template, "src");
242   gst_pad_use_fixed_caps (filter->srcpad);
243   gst_element_add_pad (GST_ELEMENT (filter), filter->srcpad);
244 
245   priv->npt_start = 0;
246   priv->npt_stop = -1;
247   priv->play_speed = 1.0;
248   priv->play_scale = 1.0;
249   priv->clock_base = -1;
250   priv->dts = -1;
251   priv->pts = -1;
252   priv->duration = -1;
253   priv->source_info = DEFAULT_SOURCE_INFO;
254 
255   gst_segment_init (&filter->segment, GST_FORMAT_UNDEFINED);
256 }
257 
258 static void
gst_rtp_base_depayload_finalize(GObject * object)259 gst_rtp_base_depayload_finalize (GObject * object)
260 {
261   G_OBJECT_CLASS (parent_class)->finalize (object);
262 }
263 
264 static gboolean
gst_rtp_base_depayload_setcaps(GstRTPBaseDepayload * filter,GstCaps * caps)265 gst_rtp_base_depayload_setcaps (GstRTPBaseDepayload * filter, GstCaps * caps)
266 {
267   GstRTPBaseDepayloadClass *bclass;
268   GstRTPBaseDepayloadPrivate *priv;
269   gboolean res;
270   GstStructure *caps_struct;
271   const GValue *value;
272 
273   priv = filter->priv;
274 
275   bclass = GST_RTP_BASE_DEPAYLOAD_GET_CLASS (filter);
276 
277   GST_DEBUG_OBJECT (filter, "Set caps %" GST_PTR_FORMAT, caps);
278 
279   if (priv->last_caps) {
280     if (gst_caps_is_equal (priv->last_caps, caps)) {
281       res = TRUE;
282       goto caps_not_changed;
283     } else {
284       gst_caps_unref (priv->last_caps);
285       priv->last_caps = NULL;
286     }
287   }
288 
289   caps_struct = gst_caps_get_structure (caps, 0);
290 
291   /* get other values for newsegment */
292   value = gst_structure_get_value (caps_struct, "npt-start");
293   if (value && G_VALUE_HOLDS_UINT64 (value))
294     priv->npt_start = g_value_get_uint64 (value);
295   else
296     priv->npt_start = 0;
297   GST_DEBUG_OBJECT (filter, "NPT start %" G_GUINT64_FORMAT, priv->npt_start);
298 
299   value = gst_structure_get_value (caps_struct, "npt-stop");
300   if (value && G_VALUE_HOLDS_UINT64 (value))
301     priv->npt_stop = g_value_get_uint64 (value);
302   else
303     priv->npt_stop = -1;
304 
305   GST_DEBUG_OBJECT (filter, "NPT stop %" G_GUINT64_FORMAT, priv->npt_stop);
306 
307   value = gst_structure_get_value (caps_struct, "play-speed");
308   if (value && G_VALUE_HOLDS_DOUBLE (value))
309     priv->play_speed = g_value_get_double (value);
310   else
311     priv->play_speed = 1.0;
312 
313   value = gst_structure_get_value (caps_struct, "play-scale");
314   if (value && G_VALUE_HOLDS_DOUBLE (value))
315     priv->play_scale = g_value_get_double (value);
316   else
317     priv->play_scale = 1.0;
318 
319   value = gst_structure_get_value (caps_struct, "clock-base");
320   if (value && G_VALUE_HOLDS_UINT (value))
321     priv->clock_base = g_value_get_uint (value);
322   else
323     priv->clock_base = -1;
324 
325   if (bclass->set_caps) {
326     res = bclass->set_caps (filter, caps);
327     if (!res) {
328       GST_WARNING_OBJECT (filter, "Subclass rejected caps %" GST_PTR_FORMAT,
329           caps);
330     }
331   } else {
332     res = TRUE;
333   }
334 
335   priv->negotiated = res;
336 
337   if (priv->negotiated)
338     priv->last_caps = gst_caps_ref (caps);
339 
340   return res;
341 
342 caps_not_changed:
343   {
344     GST_DEBUG_OBJECT (filter, "Caps did not change");
345     return res;
346   }
347 }
348 
349 /* takes ownership of the input buffer */
350 static GstFlowReturn
gst_rtp_base_depayload_handle_buffer(GstRTPBaseDepayload * filter,GstRTPBaseDepayloadClass * bclass,GstBuffer * in)351 gst_rtp_base_depayload_handle_buffer (GstRTPBaseDepayload * filter,
352     GstRTPBaseDepayloadClass * bclass, GstBuffer * in)
353 {
354   GstBuffer *(*process_rtp_packet_func) (GstRTPBaseDepayload * base,
355       GstRTPBuffer * rtp_buffer);
356   GstBuffer *(*process_func) (GstRTPBaseDepayload * base, GstBuffer * in);
357   GstRTPBaseDepayloadPrivate *priv;
358   GstFlowReturn ret = GST_FLOW_OK;
359   GstBuffer *out_buf;
360   guint32 ssrc;
361   guint16 seqnum;
362   guint32 rtptime;
363   gboolean discont, buf_discont;
364   gint gap;
365   GstRTPBuffer rtp = { NULL };
366 
367   priv = filter->priv;
368 
369   process_func = bclass->process;
370   process_rtp_packet_func = bclass->process_rtp_packet;
371 
372   /* we must have a setcaps first */
373   if (G_UNLIKELY (!priv->negotiated))
374     goto not_negotiated;
375 
376   if (G_UNLIKELY (!gst_rtp_buffer_map (in, GST_MAP_READ, &rtp)))
377     goto invalid_buffer;
378 
379   buf_discont = GST_BUFFER_IS_DISCONT (in);
380 
381   priv->pts = GST_BUFFER_PTS (in);
382   priv->dts = GST_BUFFER_DTS (in);
383   priv->duration = GST_BUFFER_DURATION (in);
384 
385   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
386   seqnum = gst_rtp_buffer_get_seq (&rtp);
387   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
388 
389   priv->last_seqnum = seqnum;
390   priv->last_rtptime = rtptime;
391 
392   discont = buf_discont;
393 
394   GST_LOG_OBJECT (filter, "discont %d, seqnum %u, rtptime %u, pts %"
395       GST_TIME_FORMAT ", dts %" GST_TIME_FORMAT, buf_discont, seqnum, rtptime,
396       GST_TIME_ARGS (priv->pts), GST_TIME_ARGS (priv->dts));
397 
398   /* Check seqnum. This is a very simple check that makes sure that the seqnums
399    * are strictly increasing, dropping anything that is out of the ordinary. We
400    * can only do this when the next_seqnum is known. */
401   if (G_LIKELY (priv->next_seqnum != -1)) {
402     if (ssrc != priv->last_ssrc) {
403       GST_LOG_OBJECT (filter,
404           "New ssrc %u (current ssrc %u), sender restarted",
405           ssrc, priv->last_ssrc);
406       discont = TRUE;
407     } else {
408       gap = gst_rtp_buffer_compare_seqnum (seqnum, priv->next_seqnum);
409 
410       /* if we have no gap, all is fine */
411       if (G_UNLIKELY (gap != 0)) {
412         GST_LOG_OBJECT (filter, "got packet %u, expected %u, gap %d", seqnum,
413             priv->next_seqnum, gap);
414         if (gap < 0) {
415           /* seqnum > next_seqnum, we are missing some packets, this is always a
416            * DISCONT. */
417           GST_LOG_OBJECT (filter, "%d missing packets", gap);
418           discont = TRUE;
419         } else {
420           /* seqnum < next_seqnum, we have seen this packet before or the sender
421            * could be restarted. If the packet is not too old, we throw it away as
422            * a duplicate, otherwise we mark discont and continue. 100 misordered
423            * packets is a good threshold. See also RFC 4737. */
424           if (gap < 100)
425             goto dropping;
426 
427           GST_LOG_OBJECT (filter,
428               "%d > 100, packet too old, sender likely restarted", gap);
429           discont = TRUE;
430         }
431       }
432     }
433   }
434   priv->next_seqnum = (seqnum + 1) & 0xffff;
435   priv->last_ssrc = ssrc;
436 
437   if (G_UNLIKELY (discont)) {
438     priv->discont = TRUE;
439     if (!buf_discont) {
440       gpointer old_inbuf = in;
441 
442       /* we detected a seqnum discont but the buffer was not flagged with a discont,
443        * set the discont flag so that the subclass can throw away old data. */
444       GST_LOG_OBJECT (filter, "mark DISCONT on input buffer");
445       in = gst_buffer_make_writable (in);
446       GST_BUFFER_FLAG_SET (in, GST_BUFFER_FLAG_DISCONT);
447       /* depayloaders will check flag on rtpbuffer->buffer, so if the input
448        * buffer was not writable already we need to remap to make our
449        * newly-flagged buffer current on the rtpbuffer */
450       if (in != old_inbuf) {
451         gst_rtp_buffer_unmap (&rtp);
452         if (G_UNLIKELY (!gst_rtp_buffer_map (in, GST_MAP_READ, &rtp)))
453           goto invalid_buffer;
454       }
455     }
456   }
457 
458   /* prepare segment event if needed */
459   if (filter->need_newsegment) {
460     priv->segment_event = create_segment_event (filter, rtptime,
461         GST_BUFFER_PTS (in));
462     filter->need_newsegment = FALSE;
463   }
464 
465   priv->input_buffer = in;
466 
467   if (process_rtp_packet_func != NULL) {
468     out_buf = process_rtp_packet_func (filter, &rtp);
469     gst_rtp_buffer_unmap (&rtp);
470   } else if (process_func != NULL) {
471     gst_rtp_buffer_unmap (&rtp);
472     out_buf = process_func (filter, in);
473   } else {
474     goto no_process;
475   }
476 
477   /* let's send it out to processing */
478   if (out_buf) {
479     ret = gst_rtp_base_depayload_push (filter, out_buf);
480   }
481 
482   gst_buffer_unref (in);
483   priv->input_buffer = NULL;
484 
485   return ret;
486 
487   /* ERRORS */
488 not_negotiated:
489   {
490     /* this is not fatal but should be filtered earlier */
491     GST_ELEMENT_ERROR (filter, CORE, NEGOTIATION,
492         ("No RTP format was negotiated."),
493         ("Input buffers need to have RTP caps set on them. This is usually "
494             "achieved by setting the 'caps' property of the upstream source "
495             "element (often udpsrc or appsrc), or by putting a capsfilter "
496             "element before the depayloader and setting the 'caps' property "
497             "on that. Also see http://cgit.freedesktop.org/gstreamer/"
498             "gst-plugins-good/tree/gst/rtp/README"));
499     gst_buffer_unref (in);
500     return GST_FLOW_NOT_NEGOTIATED;
501   }
502 invalid_buffer:
503   {
504     /* this is not fatal but should be filtered earlier */
505     GST_ELEMENT_WARNING (filter, STREAM, DECODE, (NULL),
506         ("Received invalid RTP payload, dropping"));
507     gst_buffer_unref (in);
508     return GST_FLOW_OK;
509   }
510 dropping:
511   {
512     gst_rtp_buffer_unmap (&rtp);
513     GST_WARNING_OBJECT (filter, "%d <= 100, dropping old packet", gap);
514     gst_buffer_unref (in);
515     return GST_FLOW_OK;
516   }
517 no_process:
518   {
519     gst_rtp_buffer_unmap (&rtp);
520     /* this is not fatal but should be filtered earlier */
521     GST_ELEMENT_ERROR (filter, STREAM, NOT_IMPLEMENTED, (NULL),
522         ("The subclass does not have a process or process_rtp_packet method"));
523     gst_buffer_unref (in);
524     return GST_FLOW_ERROR;
525   }
526 }
527 
528 static GstFlowReturn
gst_rtp_base_depayload_chain(GstPad * pad,GstObject * parent,GstBuffer * in)529 gst_rtp_base_depayload_chain (GstPad * pad, GstObject * parent, GstBuffer * in)
530 {
531   GstRTPBaseDepayloadClass *bclass;
532   GstRTPBaseDepayload *basedepay;
533   GstFlowReturn flow_ret;
534 
535   basedepay = GST_RTP_BASE_DEPAYLOAD_CAST (parent);
536 
537   bclass = GST_RTP_BASE_DEPAYLOAD_GET_CLASS (basedepay);
538 
539   flow_ret = gst_rtp_base_depayload_handle_buffer (basedepay, bclass, in);
540 
541   return flow_ret;
542 }
543 
544 static GstFlowReturn
gst_rtp_base_depayload_chain_list(GstPad * pad,GstObject * parent,GstBufferList * list)545 gst_rtp_base_depayload_chain_list (GstPad * pad, GstObject * parent,
546     GstBufferList * list)
547 {
548   GstRTPBaseDepayloadClass *bclass;
549   GstRTPBaseDepayload *basedepay;
550   GstFlowReturn flow_ret;
551   GstBuffer *buffer;
552   guint i, len;
553 
554   basedepay = GST_RTP_BASE_DEPAYLOAD_CAST (parent);
555 
556   bclass = GST_RTP_BASE_DEPAYLOAD_GET_CLASS (basedepay);
557 
558   flow_ret = GST_FLOW_OK;
559 
560   /* chain each buffer in list individually */
561   len = gst_buffer_list_length (list);
562 
563   if (len == 0)
564     goto done;
565 
566   for (i = 0; i < len; i++) {
567     buffer = gst_buffer_list_get (list, i);
568 
569     /* handle_buffer takes ownership of input buffer */
570     /* FIXME: add a way to steal buffers from list as we will unref it anyway */
571     gst_buffer_ref (buffer);
572 
573     /* Should we fix up any missing timestamps for list buffers here
574      * (e.g. set to first or previous timestamp in list) or just assume
575      * the's a jitterbuffer that will have done that for us? */
576     flow_ret = gst_rtp_base_depayload_handle_buffer (basedepay, bclass, buffer);
577     if (flow_ret != GST_FLOW_OK)
578       break;
579   }
580 
581 done:
582 
583   gst_buffer_list_unref (list);
584 
585   return flow_ret;
586 }
587 
588 static gboolean
gst_rtp_base_depayload_handle_event(GstRTPBaseDepayload * filter,GstEvent * event)589 gst_rtp_base_depayload_handle_event (GstRTPBaseDepayload * filter,
590     GstEvent * event)
591 {
592   gboolean res = TRUE;
593   gboolean forward = TRUE;
594 
595   switch (GST_EVENT_TYPE (event)) {
596     case GST_EVENT_FLUSH_STOP:
597       GST_OBJECT_LOCK (filter);
598       gst_segment_init (&filter->segment, GST_FORMAT_UNDEFINED);
599       GST_OBJECT_UNLOCK (filter);
600 
601       filter->need_newsegment = TRUE;
602       filter->priv->next_seqnum = -1;
603       gst_event_replace (&filter->priv->segment_event, NULL);
604       break;
605     case GST_EVENT_CAPS:
606     {
607       GstCaps *caps;
608 
609       gst_event_parse_caps (event, &caps);
610 
611       res = gst_rtp_base_depayload_setcaps (filter, caps);
612       forward = FALSE;
613       break;
614     }
615     case GST_EVENT_SEGMENT:
616     {
617       GstSegment segment;
618 
619       GST_OBJECT_LOCK (filter);
620       gst_event_copy_segment (event, &segment);
621 
622       if (segment.format != GST_FORMAT_TIME) {
623         GST_ERROR_OBJECT (filter, "Segment with non-TIME format not supported");
624         res = FALSE;
625       }
626       filter->priv->segment_seqnum = gst_event_get_seqnum (event);
627       filter->segment = segment;
628       GST_OBJECT_UNLOCK (filter);
629 
630       /* don't pass the event downstream, we generate our own segment including
631        * the NTP time and other things we receive in caps */
632       forward = FALSE;
633       break;
634     }
635     case GST_EVENT_CUSTOM_DOWNSTREAM:
636     {
637       GstRTPBaseDepayloadClass *bclass;
638 
639       bclass = GST_RTP_BASE_DEPAYLOAD_GET_CLASS (filter);
640 
641       if (gst_event_has_name (event, "GstRTPPacketLost")) {
642         /* we get this event from the jitterbuffer when it considers a packet as
643          * being lost. We send it to our packet_lost vmethod. The default
644          * implementation will make time progress by pushing out a GAP event.
645          * Subclasses can override and do one of the following:
646          *  - Adjust timestamp/duration to something more accurate before
647          *    calling the parent (default) packet_lost method.
648          *  - do some more advanced error concealing on the already received
649          *    (fragmented) packets.
650          *  - ignore the packet lost.
651          */
652         if (bclass->packet_lost)
653           res = bclass->packet_lost (filter, event);
654         forward = FALSE;
655       }
656       break;
657     }
658     default:
659       break;
660   }
661 
662   if (forward)
663     res = gst_pad_push_event (filter->srcpad, event);
664   else
665     gst_event_unref (event);
666 
667   return res;
668 }
669 
670 static gboolean
gst_rtp_base_depayload_handle_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)671 gst_rtp_base_depayload_handle_sink_event (GstPad * pad, GstObject * parent,
672     GstEvent * event)
673 {
674   gboolean res = FALSE;
675   GstRTPBaseDepayload *filter;
676   GstRTPBaseDepayloadClass *bclass;
677 
678   filter = GST_RTP_BASE_DEPAYLOAD (parent);
679   bclass = GST_RTP_BASE_DEPAYLOAD_GET_CLASS (filter);
680   if (bclass->handle_event)
681     res = bclass->handle_event (filter, event);
682   else
683     gst_event_unref (event);
684 
685   return res;
686 }
687 
688 static GstEvent *
create_segment_event(GstRTPBaseDepayload * filter,guint rtptime,GstClockTime position)689 create_segment_event (GstRTPBaseDepayload * filter, guint rtptime,
690     GstClockTime position)
691 {
692   GstEvent *event;
693   GstClockTime start, stop, running_time;
694   GstRTPBaseDepayloadPrivate *priv;
695   GstSegment segment;
696 
697   priv = filter->priv;
698 
699   /* We don't need the object lock around - the segment
700    * can't change here while we're holding the STREAM_LOCK
701    */
702 
703   /* determining the start of the segment */
704   start = filter->segment.start;
705   if (priv->clock_base != -1 && position != -1) {
706     GstClockTime exttime, gap;
707 
708     exttime = priv->clock_base;
709     gst_rtp_buffer_ext_timestamp (&exttime, rtptime);
710     gap = gst_util_uint64_scale_int (exttime - priv->clock_base,
711         filter->clock_rate, GST_SECOND);
712 
713     /* account for lost packets */
714     if (position > gap) {
715       GST_DEBUG_OBJECT (filter,
716           "Found gap of %" GST_TIME_FORMAT ", adjusting start: %"
717           GST_TIME_FORMAT " = %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
718           GST_TIME_ARGS (gap), GST_TIME_ARGS (position - gap),
719           GST_TIME_ARGS (position), GST_TIME_ARGS (gap));
720       start = position - gap;
721     }
722   }
723 
724   /* determining the stop of the segment */
725   stop = filter->segment.stop;
726   if (priv->npt_stop != -1)
727     stop = start + (priv->npt_stop - priv->npt_start);
728 
729   if (position == -1)
730     position = start;
731 
732   running_time = gst_segment_to_running_time (&filter->segment,
733       GST_FORMAT_TIME, start);
734 
735   gst_segment_init (&segment, GST_FORMAT_TIME);
736   segment.rate = priv->play_speed;
737   segment.applied_rate = priv->play_scale;
738   segment.start = start;
739   segment.stop = stop;
740   segment.time = priv->npt_start;
741   segment.position = position;
742   segment.base = running_time;
743 
744   GST_DEBUG_OBJECT (filter, "Creating segment event %" GST_SEGMENT_FORMAT,
745       &segment);
746   event = gst_event_new_segment (&segment);
747   if (filter->priv->segment_seqnum != GST_SEQNUM_INVALID)
748     gst_event_set_seqnum (event, filter->priv->segment_seqnum);
749 
750   return event;
751 }
752 
753 static void
add_rtp_source_meta(GstBuffer * outbuf,GstBuffer * rtpbuf)754 add_rtp_source_meta (GstBuffer * outbuf, GstBuffer * rtpbuf)
755 {
756   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
757   GstRTPSourceMeta *meta;
758   guint32 ssrc;
759 
760   if (!gst_rtp_buffer_map (rtpbuf, GST_MAP_READ, &rtp))
761     return;
762 
763   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
764   meta = gst_buffer_add_rtp_source_meta (outbuf, &ssrc, NULL, 0);
765   if (meta != NULL) {
766     gint i;
767     gint csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
768     for (i = 0; i < csrc_count; i++) {
769       guint32 csrc = gst_rtp_buffer_get_csrc (&rtp, i);
770       gst_rtp_source_meta_append_csrc (meta, &csrc, 1);
771     }
772   }
773 
774   gst_rtp_buffer_unmap (&rtp);
775 }
776 
777 static gboolean
set_headers(GstBuffer ** buffer,guint idx,GstRTPBaseDepayload * depayload)778 set_headers (GstBuffer ** buffer, guint idx, GstRTPBaseDepayload * depayload)
779 {
780   GstRTPBaseDepayloadPrivate *priv = depayload->priv;
781   GstClockTime pts, dts, duration;
782 
783   *buffer = gst_buffer_make_writable (*buffer);
784 
785   pts = GST_BUFFER_PTS (*buffer);
786   dts = GST_BUFFER_DTS (*buffer);
787   duration = GST_BUFFER_DURATION (*buffer);
788 
789   /* apply last incomming timestamp and duration to outgoing buffer if
790    * not otherwise set. */
791   if (!GST_CLOCK_TIME_IS_VALID (pts))
792     GST_BUFFER_PTS (*buffer) = priv->pts;
793   if (!GST_CLOCK_TIME_IS_VALID (dts))
794     GST_BUFFER_DTS (*buffer) = priv->dts;
795   if (!GST_CLOCK_TIME_IS_VALID (duration))
796     GST_BUFFER_DURATION (*buffer) = priv->duration;
797 
798   if (G_UNLIKELY (depayload->priv->discont)) {
799     GST_LOG_OBJECT (depayload, "Marking DISCONT on output buffer");
800     GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_DISCONT);
801     depayload->priv->discont = FALSE;
802   }
803 
804   /* make sure we only set the timestamp on the first packet */
805   priv->pts = GST_CLOCK_TIME_NONE;
806   priv->dts = GST_CLOCK_TIME_NONE;
807   priv->duration = GST_CLOCK_TIME_NONE;
808 
809   if (priv->source_info && priv->input_buffer)
810     add_rtp_source_meta (*buffer, priv->input_buffer);
811 
812   return TRUE;
813 }
814 
815 static GstFlowReturn
gst_rtp_base_depayload_prepare_push(GstRTPBaseDepayload * filter,gboolean is_list,gpointer obj)816 gst_rtp_base_depayload_prepare_push (GstRTPBaseDepayload * filter,
817     gboolean is_list, gpointer obj)
818 {
819   if (is_list) {
820     GstBufferList **blist = obj;
821     gst_buffer_list_foreach (*blist, (GstBufferListFunc) set_headers, filter);
822   } else {
823     GstBuffer **buf = obj;
824     set_headers (buf, 0, filter);
825   }
826 
827   /* if this is the first buffer send a NEWSEGMENT */
828   if (G_UNLIKELY (filter->priv->segment_event)) {
829     gst_pad_push_event (filter->srcpad, filter->priv->segment_event);
830     filter->priv->segment_event = NULL;
831     GST_DEBUG_OBJECT (filter, "Pushed newsegment event on this first buffer");
832   }
833 
834   return GST_FLOW_OK;
835 }
836 
837 /**
838  * gst_rtp_base_depayload_push:
839  * @filter: a #GstRTPBaseDepayload
840  * @out_buf: a #GstBuffer
841  *
842  * Push @out_buf to the peer of @filter. This function takes ownership of
843  * @out_buf.
844  *
845  * This function will by default apply the last incomming timestamp on
846  * the outgoing buffer when it didn't have a timestamp already.
847  *
848  * Returns: a #GstFlowReturn.
849  */
850 GstFlowReturn
gst_rtp_base_depayload_push(GstRTPBaseDepayload * filter,GstBuffer * out_buf)851 gst_rtp_base_depayload_push (GstRTPBaseDepayload * filter, GstBuffer * out_buf)
852 {
853   GstFlowReturn res;
854 
855   res = gst_rtp_base_depayload_prepare_push (filter, FALSE, &out_buf);
856 
857   if (G_LIKELY (res == GST_FLOW_OK))
858     res = gst_pad_push (filter->srcpad, out_buf);
859   else
860     gst_buffer_unref (out_buf);
861 
862   return res;
863 }
864 
865 /**
866  * gst_rtp_base_depayload_push_list:
867  * @filter: a #GstRTPBaseDepayload
868  * @out_list: a #GstBufferList
869  *
870  * Push @out_list to the peer of @filter. This function takes ownership of
871  * @out_list.
872  *
873  * Returns: a #GstFlowReturn.
874  */
875 GstFlowReturn
gst_rtp_base_depayload_push_list(GstRTPBaseDepayload * filter,GstBufferList * out_list)876 gst_rtp_base_depayload_push_list (GstRTPBaseDepayload * filter,
877     GstBufferList * out_list)
878 {
879   GstFlowReturn res;
880 
881   res = gst_rtp_base_depayload_prepare_push (filter, TRUE, &out_list);
882 
883   if (G_LIKELY (res == GST_FLOW_OK))
884     res = gst_pad_push_list (filter->srcpad, out_list);
885   else
886     gst_buffer_list_unref (out_list);
887 
888   return res;
889 }
890 
891 /* convert the PacketLost event from a jitterbuffer to a GAP event.
892  * subclasses can override this.  */
893 static gboolean
gst_rtp_base_depayload_packet_lost(GstRTPBaseDepayload * filter,GstEvent * event)894 gst_rtp_base_depayload_packet_lost (GstRTPBaseDepayload * filter,
895     GstEvent * event)
896 {
897   GstClockTime timestamp, duration;
898   GstEvent *sevent;
899   const GstStructure *s;
900   gboolean might_have_been_fec;
901   gboolean res = TRUE;
902 
903   s = gst_event_get_structure (event);
904 
905   /* first start by parsing the timestamp and duration */
906   timestamp = -1;
907   duration = -1;
908 
909   if (!gst_structure_get_clock_time (s, "timestamp", &timestamp) ||
910       !gst_structure_get_clock_time (s, "duration", &duration)) {
911     GST_ERROR_OBJECT (filter,
912         "Packet loss event without timestamp or duration");
913     return FALSE;
914   }
915 
916   sevent = gst_pad_get_sticky_event (filter->srcpad, GST_EVENT_SEGMENT, 0);
917   if (G_UNLIKELY (!sevent)) {
918     /* Typically happens if lost event arrives before first buffer */
919     GST_DEBUG_OBJECT (filter,
920         "Ignore packet loss because segment event missing");
921     return FALSE;
922   }
923   gst_event_unref (sevent);
924 
925   if (!gst_structure_get_boolean (s, "might-have-been-fec",
926           &might_have_been_fec) || !might_have_been_fec) {
927     /* send GAP event */
928     sevent = gst_event_new_gap (timestamp, duration);
929     res = gst_pad_push_event (filter->srcpad, sevent);
930   }
931 
932   return res;
933 }
934 
935 static GstStateChangeReturn
gst_rtp_base_depayload_change_state(GstElement * element,GstStateChange transition)936 gst_rtp_base_depayload_change_state (GstElement * element,
937     GstStateChange transition)
938 {
939   GstRTPBaseDepayload *filter;
940   GstRTPBaseDepayloadPrivate *priv;
941   GstStateChangeReturn ret;
942 
943   filter = GST_RTP_BASE_DEPAYLOAD (element);
944   priv = filter->priv;
945 
946   switch (transition) {
947     case GST_STATE_CHANGE_NULL_TO_READY:
948       break;
949     case GST_STATE_CHANGE_READY_TO_PAUSED:
950       filter->need_newsegment = TRUE;
951       priv->npt_start = 0;
952       priv->npt_stop = -1;
953       priv->play_speed = 1.0;
954       priv->play_scale = 1.0;
955       priv->clock_base = -1;
956       priv->next_seqnum = -1;
957       priv->negotiated = FALSE;
958       priv->discont = FALSE;
959       priv->segment_seqnum = GST_SEQNUM_INVALID;
960       break;
961     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
962       break;
963     default:
964       break;
965   }
966 
967   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
968 
969   switch (transition) {
970     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
971       break;
972     case GST_STATE_CHANGE_PAUSED_TO_READY:
973       gst_caps_replace (&priv->last_caps, NULL);
974       gst_event_replace (&priv->segment_event, NULL);
975       break;
976     case GST_STATE_CHANGE_READY_TO_NULL:
977       break;
978     default:
979       break;
980   }
981   return ret;
982 }
983 
984 static GstStructure *
gst_rtp_base_depayload_create_stats(GstRTPBaseDepayload * depayload)985 gst_rtp_base_depayload_create_stats (GstRTPBaseDepayload * depayload)
986 {
987   GstRTPBaseDepayloadPrivate *priv;
988   GstStructure *s;
989   GstClockTime pts = GST_CLOCK_TIME_NONE, dts = GST_CLOCK_TIME_NONE;
990 
991   priv = depayload->priv;
992 
993   GST_OBJECT_LOCK (depayload);
994   if (depayload->segment.format != GST_FORMAT_UNDEFINED) {
995     pts = gst_segment_to_running_time (&depayload->segment, GST_FORMAT_TIME,
996         priv->pts);
997     dts = gst_segment_to_running_time (&depayload->segment, GST_FORMAT_TIME,
998         priv->dts);
999   }
1000   GST_OBJECT_UNLOCK (depayload);
1001 
1002   s = gst_structure_new ("application/x-rtp-depayload-stats",
1003       "clock_rate", G_TYPE_UINT, depayload->clock_rate,
1004       "npt-start", G_TYPE_UINT64, priv->npt_start,
1005       "npt-stop", G_TYPE_UINT64, priv->npt_stop,
1006       "play-speed", G_TYPE_DOUBLE, priv->play_speed,
1007       "play-scale", G_TYPE_DOUBLE, priv->play_scale,
1008       "running-time-dts", G_TYPE_UINT64, dts,
1009       "running-time-pts", G_TYPE_UINT64, pts,
1010       "seqnum", G_TYPE_UINT, (guint) priv->last_seqnum,
1011       "timestamp", G_TYPE_UINT, (guint) priv->last_rtptime, NULL);
1012 
1013   return s;
1014 }
1015 
1016 
1017 static void
gst_rtp_base_depayload_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1018 gst_rtp_base_depayload_set_property (GObject * object, guint prop_id,
1019     const GValue * value, GParamSpec * pspec)
1020 {
1021   GstRTPBaseDepayload *depayload;
1022 
1023   depayload = GST_RTP_BASE_DEPAYLOAD (object);
1024 
1025   switch (prop_id) {
1026     case PROP_SOURCE_INFO:
1027       gst_rtp_base_depayload_set_source_info_enabled (depayload,
1028           g_value_get_boolean (value));
1029       break;
1030     default:
1031       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1032       break;
1033   }
1034 }
1035 
1036 static void
gst_rtp_base_depayload_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1037 gst_rtp_base_depayload_get_property (GObject * object, guint prop_id,
1038     GValue * value, GParamSpec * pspec)
1039 {
1040   GstRTPBaseDepayload *depayload;
1041 
1042   depayload = GST_RTP_BASE_DEPAYLOAD (object);
1043 
1044   switch (prop_id) {
1045     case PROP_STATS:
1046       g_value_take_boxed (value,
1047           gst_rtp_base_depayload_create_stats (depayload));
1048       break;
1049     case PROP_SOURCE_INFO:
1050       g_value_set_boolean (value,
1051           gst_rtp_base_depayload_is_source_info_enabled (depayload));
1052       break;
1053     default:
1054       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1055       break;
1056   }
1057 }
1058 
1059 /**
1060  * gst_rtp_base_depayload_set_source_info_enabled:
1061  * @depayload: a #GstRTPBaseDepayload
1062  * @enable: whether to add meta about RTP sources to buffer
1063  *
1064  * Enable or disable adding #GstRTPSourceMeta to depayloaded buffers.
1065  *
1066  * Since: 1.16
1067  **/
1068 void
gst_rtp_base_depayload_set_source_info_enabled(GstRTPBaseDepayload * depayload,gboolean enable)1069 gst_rtp_base_depayload_set_source_info_enabled (GstRTPBaseDepayload * depayload,
1070     gboolean enable)
1071 {
1072   depayload->priv->source_info = enable;
1073 }
1074 
1075 /**
1076  * gst_rtp_base_depayload_is_source_info_enabled:
1077  * @depayload: a #GstRTPBaseDepayload
1078  *
1079  * Queries whether #GstRTPSourceMeta will be added to depayloaded buffers.
1080  *
1081  * Returns: %TRUE if source-info is enabled.
1082  *
1083  * Since: 1.16
1084  **/
1085 gboolean
gst_rtp_base_depayload_is_source_info_enabled(GstRTPBaseDepayload * depayload)1086 gst_rtp_base_depayload_is_source_info_enabled (GstRTPBaseDepayload * depayload)
1087 {
1088   return depayload->priv->source_info;
1089 }
1090