1 /* GStreamer
2  * Copyright (C) <2006> Lutz Mueller <lutz at topfrose dot de>
3  *               <2006> Wim Taymans <wim@fluendo.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #  include "config.h"
23 #endif
24 
25 #include <string.h>
26 
27 #include "gstrdtbuffer.h"
28 #include "rdtdepay.h"
29 
30 GST_DEBUG_CATEGORY_STATIC (rdtdepay_debug);
31 #define GST_CAT_DEFAULT rdtdepay_debug
32 
33 /* RDTDepay signals and args */
34 enum
35 {
36   /* FILL ME */
37   LAST_SIGNAL
38 };
39 
40 enum
41 {
42   PROP_0,
43 };
44 
45 static GstStaticPadTemplate gst_rdt_depay_src_template =
46 GST_STATIC_PAD_TEMPLATE ("src",
47     GST_PAD_SRC,
48     GST_PAD_ALWAYS,
49     GST_STATIC_CAPS ("application/vnd.rn-realmedia")
50     );
51 
52 static GstStaticPadTemplate gst_rdt_depay_sink_template =
53 GST_STATIC_PAD_TEMPLATE ("sink",
54     GST_PAD_SINK,
55     GST_PAD_ALWAYS,
56     GST_STATIC_CAPS ("application/x-rdt, "
57         "media = (string) \"application\", "
58         "clock-rate = (int) [1, MAX ], "
59         "encoding-name = (string) \"X-REAL-RDT\""
60         /* All optional parameters
61          *
62          * "config="
63          */
64     )
65     );
66 
67 #define gst_rdt_depay_parent_class parent_class
68 G_DEFINE_TYPE (GstRDTDepay, gst_rdt_depay, GST_TYPE_ELEMENT);
69 
70 static void gst_rdt_depay_finalize (GObject * object);
71 
72 static GstStateChangeReturn gst_rdt_depay_change_state (GstElement *
73     element, GstStateChange transition);
74 
75 static gboolean gst_rdt_depay_sink_event (GstPad * pad, GstObject * parent,
76     GstEvent * event);
77 static GstFlowReturn gst_rdt_depay_chain (GstPad * pad, GstObject * parent,
78     GstBuffer * buf);
79 
80 static void
gst_rdt_depay_class_init(GstRDTDepayClass * klass)81 gst_rdt_depay_class_init (GstRDTDepayClass * klass)
82 {
83   GObjectClass *gobject_class;
84   GstElementClass *gstelement_class;
85 
86   gobject_class = (GObjectClass *) klass;
87   gstelement_class = (GstElementClass *) klass;
88 
89   parent_class = g_type_class_peek_parent (klass);
90 
91   gobject_class->finalize = gst_rdt_depay_finalize;
92 
93   gstelement_class->change_state = gst_rdt_depay_change_state;
94 
95   gst_element_class_add_static_pad_template (gstelement_class,
96       &gst_rdt_depay_src_template);
97   gst_element_class_add_static_pad_template (gstelement_class,
98       &gst_rdt_depay_sink_template);
99 
100   gst_element_class_set_static_metadata (gstelement_class, "RDT packet parser",
101       "Codec/Depayloader/Network",
102       "Extracts RealMedia from RDT packets",
103       "Lutz Mueller <lutz at topfrose dot de>, "
104       "Wim Taymans <wim@fluendo.com>");
105 
106   GST_DEBUG_CATEGORY_INIT (rdtdepay_debug, "rdtdepay",
107       0, "Depayloader for RDT RealMedia packets");
108 }
109 
110 static void
gst_rdt_depay_init(GstRDTDepay * rdtdepay)111 gst_rdt_depay_init (GstRDTDepay * rdtdepay)
112 {
113   rdtdepay->sinkpad =
114       gst_pad_new_from_static_template (&gst_rdt_depay_sink_template, "sink");
115   gst_pad_set_chain_function (rdtdepay->sinkpad, gst_rdt_depay_chain);
116   gst_pad_set_event_function (rdtdepay->sinkpad, gst_rdt_depay_sink_event);
117   gst_element_add_pad (GST_ELEMENT_CAST (rdtdepay), rdtdepay->sinkpad);
118 
119   rdtdepay->srcpad =
120       gst_pad_new_from_static_template (&gst_rdt_depay_src_template, "src");
121   gst_element_add_pad (GST_ELEMENT_CAST (rdtdepay), rdtdepay->srcpad);
122 }
123 
124 static void
gst_rdt_depay_finalize(GObject * object)125 gst_rdt_depay_finalize (GObject * object)
126 {
127   GstRDTDepay *rdtdepay;
128 
129   rdtdepay = GST_RDT_DEPAY (object);
130 
131   if (rdtdepay->header)
132     gst_buffer_unref (rdtdepay->header);
133 
134   G_OBJECT_CLASS (parent_class)->finalize (object);
135 }
136 
137 static gboolean
gst_rdt_depay_setcaps(GstPad * pad,GstCaps * caps)138 gst_rdt_depay_setcaps (GstPad * pad, GstCaps * caps)
139 {
140   GstStructure *structure;
141   GstRDTDepay *rdtdepay;
142   GstCaps *srccaps;
143   gint clock_rate = 1000;       /* default */
144   const GValue *value;
145   GstBuffer *header;
146 
147   rdtdepay = GST_RDT_DEPAY (GST_PAD_PARENT (pad));
148 
149   structure = gst_caps_get_structure (caps, 0);
150 
151   if (gst_structure_has_field (structure, "clock-rate"))
152     gst_structure_get_int (structure, "clock-rate", &clock_rate);
153 
154   /* config contains the RealMedia header as a buffer. */
155   value = gst_structure_get_value (structure, "config");
156   if (!value)
157     goto no_header;
158 
159   header = gst_value_get_buffer (value);
160   if (!header)
161     goto no_header;
162 
163   /* get other values for newsegment */
164   value = gst_structure_get_value (structure, "npt-start");
165   if (value && G_VALUE_HOLDS_UINT64 (value))
166     rdtdepay->npt_start = g_value_get_uint64 (value);
167   else
168     rdtdepay->npt_start = 0;
169   GST_DEBUG_OBJECT (rdtdepay, "NPT start %" G_GUINT64_FORMAT,
170       rdtdepay->npt_start);
171 
172   value = gst_structure_get_value (structure, "npt-stop");
173   if (value && G_VALUE_HOLDS_UINT64 (value))
174     rdtdepay->npt_stop = g_value_get_uint64 (value);
175   else
176     rdtdepay->npt_stop = -1;
177 
178   GST_DEBUG_OBJECT (rdtdepay, "NPT stop %" G_GUINT64_FORMAT,
179       rdtdepay->npt_stop);
180 
181   value = gst_structure_get_value (structure, "play-speed");
182   if (value && G_VALUE_HOLDS_DOUBLE (value))
183     rdtdepay->play_speed = g_value_get_double (value);
184   else
185     rdtdepay->play_speed = 1.0;
186 
187   value = gst_structure_get_value (structure, "play-scale");
188   if (value && G_VALUE_HOLDS_DOUBLE (value))
189     rdtdepay->play_scale = g_value_get_double (value);
190   else
191     rdtdepay->play_scale = 1.0;
192 
193   /* caps seem good, configure element */
194   rdtdepay->clock_rate = clock_rate;
195 
196   /* set caps on pad and on header */
197   srccaps = gst_caps_new_empty_simple ("application/vnd.rn-realmedia");
198   gst_pad_set_caps (rdtdepay->srcpad, srccaps);
199   gst_caps_unref (srccaps);
200 
201   if (rdtdepay->header)
202     gst_buffer_unref (rdtdepay->header);
203   rdtdepay->header = gst_buffer_ref (header);
204 
205   return TRUE;
206 
207   /* ERRORS */
208 no_header:
209   {
210     GST_ERROR_OBJECT (rdtdepay, "no header found in caps, no 'config' field");
211     return FALSE;
212   }
213 }
214 
215 static gboolean
gst_rdt_depay_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)216 gst_rdt_depay_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
217 {
218   GstRDTDepay *depay;
219   gboolean res = TRUE;
220 
221   depay = GST_RDT_DEPAY (parent);
222 
223   switch (GST_EVENT_TYPE (event)) {
224     case GST_EVENT_CAPS:
225     {
226       GstCaps *caps;
227 
228       gst_event_parse_caps (event, &caps);
229       res = gst_rdt_depay_setcaps (pad, caps);
230       gst_event_unref (event);
231       break;
232     }
233     case GST_EVENT_FLUSH_STOP:
234       res = gst_pad_push_event (depay->srcpad, event);
235 
236       gst_segment_init (&depay->segment, GST_FORMAT_UNDEFINED);
237       depay->need_newsegment = TRUE;
238       depay->next_seqnum = -1;
239       break;
240     case GST_EVENT_SEGMENT:
241     {
242       gst_event_copy_segment (event, &depay->segment);
243       /* don't pass the event downstream, we generate our own segment
244        * including the NTP time and other things we receive in caps */
245       gst_event_unref (event);
246       break;
247     }
248     default:
249       /* pass other events forward */
250       res = gst_pad_push_event (depay->srcpad, event);
251       break;
252   }
253   return res;
254 }
255 
256 static GstEvent *
create_segment_event(GstRDTDepay * depay,gboolean update,GstClockTime position)257 create_segment_event (GstRDTDepay * depay, gboolean update,
258     GstClockTime position)
259 {
260   GstSegment segment;
261 
262   gst_segment_init (&segment, GST_FORMAT_TIME);
263   segment.rate = depay->play_speed;
264   segment.applied_rate = depay->play_scale;
265   segment.start = position;
266 
267   if (depay->npt_stop != -1)
268     segment.stop = depay->npt_stop - depay->npt_start;
269   else
270     segment.stop = -1;
271 
272   segment.time = position + depay->npt_start;
273 
274   return gst_event_new_segment (&segment);
275 }
276 
277 static GstFlowReturn
gst_rdt_depay_push(GstRDTDepay * rdtdepay,GstBuffer * buffer)278 gst_rdt_depay_push (GstRDTDepay * rdtdepay, GstBuffer * buffer)
279 {
280   GstFlowReturn ret;
281 
282   if (rdtdepay->need_newsegment) {
283     GstEvent *event;
284 
285     event = create_segment_event (rdtdepay, FALSE, 0);
286     gst_pad_push_event (rdtdepay->srcpad, event);
287 
288     rdtdepay->need_newsegment = FALSE;
289   }
290 
291   if (rdtdepay->discont) {
292     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
293     rdtdepay->discont = FALSE;
294   }
295   ret = gst_pad_push (rdtdepay->srcpad, buffer);
296 
297   return ret;
298 }
299 
300 static GstFlowReturn
gst_rdt_depay_handle_data(GstRDTDepay * rdtdepay,GstClockTime outtime,GstRDTPacket * packet)301 gst_rdt_depay_handle_data (GstRDTDepay * rdtdepay, GstClockTime outtime,
302     GstRDTPacket * packet)
303 {
304   GstFlowReturn ret;
305   GstBuffer *outbuf;
306   GstMapInfo outmap;
307   guint8 *data, *outdata;
308   guint size;
309   guint16 stream_id;
310   guint32 timestamp;
311   gint gap;
312   guint16 seqnum;
313   guint8 flags;
314   guint16 outflags;
315 
316   /* get pointers to the packet data */
317   data = gst_rdt_packet_data_map (packet, &size);
318 
319   outbuf = gst_buffer_new_and_alloc (12 + size);
320   GST_BUFFER_TIMESTAMP (outbuf) = outtime;
321 
322   GST_DEBUG_OBJECT (rdtdepay, "have size %u", size);
323 
324   /* copy over some things */
325   stream_id = gst_rdt_packet_data_get_stream_id (packet);
326   timestamp = gst_rdt_packet_data_get_timestamp (packet);
327   flags = gst_rdt_packet_data_get_flags (packet);
328 
329   seqnum = gst_rdt_packet_data_get_seq (packet);
330 
331   GST_DEBUG_OBJECT (rdtdepay, "stream_id %u, timestamp %u, seqnum %d, flags %d",
332       stream_id, timestamp, seqnum, flags);
333 
334   if (rdtdepay->next_seqnum != -1) {
335     gap = gst_rdt_buffer_compare_seqnum (seqnum, rdtdepay->next_seqnum);
336 
337     /* if we have no gap, all is fine */
338     if (G_UNLIKELY (gap != 0)) {
339       GST_LOG_OBJECT (rdtdepay, "got packet %u, expected %u, gap %d", seqnum,
340           rdtdepay->next_seqnum, gap);
341       if (gap < 0) {
342         /* seqnum > next_seqnum, we are missing some packets, this is always a
343          * DISCONT. */
344         GST_LOG_OBJECT (rdtdepay, "%d missing packets", gap);
345         rdtdepay->discont = TRUE;
346       } else {
347         /* seqnum < next_seqnum, we have seen this packet before or the sender
348          * could be restarted. If the packet is not too old, we throw it away as
349          * a duplicate, otherwise we mark discont and continue. 100 misordered
350          * packets is a good threshold. See also RFC 4737. */
351         if (gap < 100)
352           goto dropping;
353 
354         GST_LOG_OBJECT (rdtdepay,
355             "%d > 100, packet too old, sender likely restarted", gap);
356         rdtdepay->discont = TRUE;
357       }
358     }
359   }
360   rdtdepay->next_seqnum = (seqnum + 1);
361   if (rdtdepay->next_seqnum == 0xff00)
362     rdtdepay->next_seqnum = 0;
363 
364   if ((flags & 1) == 0)
365     outflags = 2;
366   else
367     outflags = 0;
368 
369   gst_buffer_map (outbuf, &outmap, GST_MAP_WRITE);
370   outdata = outmap.data;
371   GST_WRITE_UINT16_BE (outdata + 0, 0); /* version   */
372   GST_WRITE_UINT16_BE (outdata + 2, size + 12); /* length    */
373   GST_WRITE_UINT16_BE (outdata + 4, stream_id); /* stream    */
374   GST_WRITE_UINT32_BE (outdata + 6, timestamp); /* timestamp */
375   GST_WRITE_UINT16_BE (outdata + 10, outflags); /* flags     */
376   memcpy (outdata + 12, data, size);
377   gst_buffer_unmap (outbuf, &outmap);
378   gst_buffer_resize (outbuf, 0, 12 + size);
379 
380   gst_rdt_packet_data_unmap (packet);
381 
382   GST_DEBUG_OBJECT (rdtdepay, "Pushing packet, outtime %" GST_TIME_FORMAT,
383       GST_TIME_ARGS (outtime));
384 
385   ret = gst_rdt_depay_push (rdtdepay, outbuf);
386 
387   return ret;
388 
389   /* ERRORS */
390 dropping:
391   {
392     GST_WARNING_OBJECT (rdtdepay, "%d <= 100, dropping old packet", gap);
393     return GST_FLOW_OK;
394   }
395 }
396 
397 static GstFlowReturn
gst_rdt_depay_chain(GstPad * pad,GstObject * parent,GstBuffer * buf)398 gst_rdt_depay_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
399 {
400   GstRDTDepay *rdtdepay;
401   GstFlowReturn ret;
402   GstClockTime timestamp;
403   gboolean more;
404   GstRDTPacket packet;
405 
406   rdtdepay = GST_RDT_DEPAY (parent);
407 
408   if (GST_BUFFER_IS_DISCONT (buf)) {
409     GST_LOG_OBJECT (rdtdepay, "received discont");
410     rdtdepay->discont = TRUE;
411   }
412 
413   if (rdtdepay->header) {
414     GstBuffer *out;
415 
416     out = rdtdepay->header;
417     rdtdepay->header = NULL;
418 
419     /* push header data first */
420     gst_rdt_depay_push (rdtdepay, out);
421   }
422 
423   /* save timestamp */
424   timestamp = GST_BUFFER_TIMESTAMP (buf);
425 
426   ret = GST_FLOW_OK;
427 
428   GST_LOG_OBJECT (rdtdepay, "received buffer timestamp %" GST_TIME_FORMAT,
429       GST_TIME_ARGS (timestamp));
430 
431   /* data is in RDT format. */
432   more = gst_rdt_buffer_get_first_packet (buf, &packet);
433   while (more) {
434     GstRDTType type;
435 
436     type = gst_rdt_packet_get_type (&packet);
437     GST_DEBUG_OBJECT (rdtdepay, "Have packet of type %04x", type);
438 
439     if (GST_RDT_IS_DATA_TYPE (type)) {
440       GST_DEBUG_OBJECT (rdtdepay, "We have a data packet");
441       ret = gst_rdt_depay_handle_data (rdtdepay, timestamp, &packet);
442     } else {
443       switch (type) {
444         default:
445           GST_DEBUG_OBJECT (rdtdepay, "Ignoring packet");
446           break;
447       }
448     }
449     if (ret != GST_FLOW_OK)
450       break;
451 
452     more = gst_rdt_packet_move_to_next (&packet);
453   }
454 
455   gst_buffer_unref (buf);
456 
457   return ret;
458 }
459 
460 static GstStateChangeReturn
gst_rdt_depay_change_state(GstElement * element,GstStateChange transition)461 gst_rdt_depay_change_state (GstElement * element, GstStateChange transition)
462 {
463   GstRDTDepay *rdtdepay;
464   GstStateChangeReturn ret;
465 
466   rdtdepay = GST_RDT_DEPAY (element);
467 
468   switch (transition) {
469     case GST_STATE_CHANGE_NULL_TO_READY:
470       break;
471     case GST_STATE_CHANGE_READY_TO_PAUSED:
472       gst_segment_init (&rdtdepay->segment, GST_FORMAT_UNDEFINED);
473       rdtdepay->next_seqnum = -1;
474       rdtdepay->need_newsegment = TRUE;
475       break;
476     default:
477       break;
478   }
479 
480   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
481 
482   switch (transition) {
483     case GST_STATE_CHANGE_PAUSED_TO_READY:
484       if (rdtdepay->header)
485         gst_buffer_unref (rdtdepay->header);
486       rdtdepay->header = NULL;
487       break;
488     case GST_STATE_CHANGE_READY_TO_NULL:
489       break;
490     default:
491       break;
492   }
493   return ret;
494 }
495 
496 gboolean
gst_rdt_depay_plugin_init(GstPlugin * plugin)497 gst_rdt_depay_plugin_init (GstPlugin * plugin)
498 {
499   return gst_element_register (plugin, "rdtdepay",
500       GST_RANK_MARGINAL, GST_TYPE_RDT_DEPAY);
501 }
502