1 /* RTP Retransmission sender element for GStreamer
2  *
3  * gstrtprtxsend.c:
4  *
5  * Copyright (C) 2013 Collabora Ltd.
6  *   @author Julien Isorce <julien.isorce@collabora.co.uk>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 
24 /**
25  * SECTION:element-rtprtxsend
26  *
27  * See #GstRtpRtxReceive for examples
28  *
29  * The purpose of the sender RTX object is to keep a history of RTP packets up
30  * to a configurable limit (max-size-time or max-size-packets). It will listen
31  * for upstream custom retransmission events (GstRTPRetransmissionRequest) that
32  * comes from downstream (#GstRtpSession). When receiving a request it will
33  * look up the requested seqnum in its list of stored packets. If the packet
34  * is available, it will create a RTX packet according to RFC 4588 and send
35  * this as an auxiliary stream. RTX is SSRC-multiplexed
36  */
37 
38 #ifdef HAVE_CONFIG_H
39 #include "config.h"
40 #endif
41 
42 #include <gst/gst.h>
43 #include <gst/rtp/gstrtpbuffer.h>
44 #include <string.h>
45 #include <stdlib.h>
46 
47 #include "gstrtprtxsend.h"
48 
49 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_send_debug);
50 #define GST_CAT_DEFAULT gst_rtp_rtx_send_debug
51 
52 #define DEFAULT_RTX_PAYLOAD_TYPE 0
53 #define DEFAULT_MAX_SIZE_TIME    0
54 #define DEFAULT_MAX_SIZE_PACKETS 100
55 
56 enum
57 {
58   PROP_0,
59   PROP_SSRC_MAP,
60   PROP_PAYLOAD_TYPE_MAP,
61   PROP_MAX_SIZE_TIME,
62   PROP_MAX_SIZE_PACKETS,
63   PROP_NUM_RTX_REQUESTS,
64   PROP_NUM_RTX_PACKETS
65 };
66 
67 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
68     GST_PAD_SRC,
69     GST_PAD_ALWAYS,
70     GST_STATIC_CAPS ("application/x-rtp")
71     );
72 
73 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
74     GST_PAD_SINK,
75     GST_PAD_ALWAYS,
76     GST_STATIC_CAPS ("application/x-rtp, " "clock-rate = (int) [1, MAX]")
77     );
78 
79 static gboolean gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
80     guint visible, guint bytes, guint64 time, gpointer checkdata);
81 
82 static gboolean gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent,
83     GstEvent * event);
84 static gboolean gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent,
85     GstEvent * event);
86 static GstFlowReturn gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent,
87     GstBuffer * buffer);
88 static GstFlowReturn gst_rtp_rtx_send_chain_list (GstPad * pad,
89     GstObject * parent, GstBufferList * list);
90 
91 static void gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx);
92 static gboolean gst_rtp_rtx_send_activate_mode (GstPad * pad,
93     GstObject * parent, GstPadMode mode, gboolean active);
94 
95 static GstStateChangeReturn gst_rtp_rtx_send_change_state (GstElement *
96     element, GstStateChange transition);
97 
98 static void gst_rtp_rtx_send_set_property (GObject * object, guint prop_id,
99     const GValue * value, GParamSpec * pspec);
100 static void gst_rtp_rtx_send_get_property (GObject * object, guint prop_id,
101     GValue * value, GParamSpec * pspec);
102 static void gst_rtp_rtx_send_finalize (GObject * object);
103 
104 G_DEFINE_TYPE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT);
105 
106 typedef struct
107 {
108   guint16 seqnum;
109   guint32 timestamp;
110   GstBuffer *buffer;
111 } BufferQueueItem;
112 
113 static void
buffer_queue_item_free(BufferQueueItem * item)114 buffer_queue_item_free (BufferQueueItem * item)
115 {
116   gst_buffer_unref (item->buffer);
117   g_slice_free (BufferQueueItem, item);
118 }
119 
120 typedef struct
121 {
122   guint32 rtx_ssrc;
123   guint16 seqnum_base, next_seqnum;
124   gint clock_rate;
125 
126   /* history of rtp packets */
127   GSequence *queue;
128 } SSRCRtxData;
129 
130 static SSRCRtxData *
ssrc_rtx_data_new(guint32 rtx_ssrc)131 ssrc_rtx_data_new (guint32 rtx_ssrc)
132 {
133   SSRCRtxData *data = g_slice_new0 (SSRCRtxData);
134 
135   data->rtx_ssrc = rtx_ssrc;
136   data->next_seqnum = data->seqnum_base = g_random_int_range (0, G_MAXUINT16);
137   data->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free);
138 
139   return data;
140 }
141 
142 static void
ssrc_rtx_data_free(SSRCRtxData * data)143 ssrc_rtx_data_free (SSRCRtxData * data)
144 {
145   g_sequence_free (data->queue);
146   g_slice_free (SSRCRtxData, data);
147 }
148 
149 static void
gst_rtp_rtx_send_class_init(GstRtpRtxSendClass * klass)150 gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass)
151 {
152   GObjectClass *gobject_class;
153   GstElementClass *gstelement_class;
154 
155   gobject_class = (GObjectClass *) klass;
156   gstelement_class = (GstElementClass *) klass;
157 
158   gobject_class->get_property = gst_rtp_rtx_send_get_property;
159   gobject_class->set_property = gst_rtp_rtx_send_set_property;
160   gobject_class->finalize = gst_rtp_rtx_send_finalize;
161 
162   g_object_class_install_property (gobject_class, PROP_SSRC_MAP,
163       g_param_spec_boxed ("ssrc-map", "SSRC Map",
164           "Map of SSRCs to their retransmission SSRCs for SSRC-multiplexed mode"
165           " (default = random)", GST_TYPE_STRUCTURE,
166           G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
167 
168   g_object_class_install_property (gobject_class, PROP_PAYLOAD_TYPE_MAP,
169       g_param_spec_boxed ("payload-type-map", "Payload Type Map",
170           "Map of original payload types to their retransmission payload types",
171           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
172 
173   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
174       g_param_spec_uint ("max-size-time", "Max Size Time",
175           "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
176           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
177 
178   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
179       g_param_spec_uint ("max-size-packets", "Max Size Packets",
180           "Amount of packets to queue (0 = unlimited)", 0, G_MAXINT16,
181           DEFAULT_MAX_SIZE_PACKETS,
182           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
183 
184   g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
185       g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
186           "Number of retransmission events received", 0, G_MAXUINT,
187           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
188 
189   g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS,
190       g_param_spec_uint ("num-rtx-packets", "Num RTX Packets",
191           " Number of retransmission packets sent", 0, G_MAXUINT,
192           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
193 
194   gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
195   gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
196 
197   gst_element_class_set_static_metadata (gstelement_class,
198       "RTP Retransmission Sender", "Codec",
199       "Retransmit RTP packets when needed, according to RFC4588",
200       "Julien Isorce <julien.isorce@collabora.co.uk>");
201 
202   gstelement_class->change_state =
203       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_change_state);
204 }
205 
206 static void
gst_rtp_rtx_send_reset(GstRtpRtxSend * rtx)207 gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx)
208 {
209   GST_OBJECT_LOCK (rtx);
210   gst_data_queue_flush (rtx->queue);
211   g_hash_table_remove_all (rtx->ssrc_data);
212   g_hash_table_remove_all (rtx->rtx_ssrcs);
213   rtx->num_rtx_requests = 0;
214   rtx->num_rtx_packets = 0;
215   GST_OBJECT_UNLOCK (rtx);
216 }
217 
218 static void
gst_rtp_rtx_send_finalize(GObject * object)219 gst_rtp_rtx_send_finalize (GObject * object)
220 {
221   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
222 
223   g_hash_table_unref (rtx->ssrc_data);
224   g_hash_table_unref (rtx->rtx_ssrcs);
225   if (rtx->external_ssrc_map)
226     gst_structure_free (rtx->external_ssrc_map);
227   g_hash_table_unref (rtx->rtx_pt_map);
228   if (rtx->rtx_pt_map_structure)
229     gst_structure_free (rtx->rtx_pt_map_structure);
230   g_object_unref (rtx->queue);
231 
232   G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object);
233 }
234 
235 static void
gst_rtp_rtx_send_init(GstRtpRtxSend * rtx)236 gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
237 {
238   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
239 
240   rtx->srcpad =
241       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
242           "src"), "src");
243   GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
244   GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
245   gst_pad_set_event_function (rtx->srcpad,
246       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_src_event));
247   gst_pad_set_activatemode_function (rtx->srcpad,
248       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_activate_mode));
249   gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
250 
251   rtx->sinkpad =
252       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
253           "sink"), "sink");
254   GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
255   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
256   gst_pad_set_event_function (rtx->sinkpad,
257       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_sink_event));
258   gst_pad_set_chain_function (rtx->sinkpad,
259       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain));
260   gst_pad_set_chain_list_function (rtx->sinkpad,
261       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain_list));
262   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
263 
264   rtx->queue = gst_data_queue_new (gst_rtp_rtx_send_queue_check_full, NULL,
265       NULL, rtx);
266   rtx->ssrc_data = g_hash_table_new_full (g_direct_hash, g_direct_equal,
267       NULL, (GDestroyNotify) ssrc_rtx_data_free);
268   rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
269   rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
270 
271   rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
272   rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
273 }
274 
275 static void
gst_rtp_rtx_send_set_flushing(GstRtpRtxSend * rtx,gboolean flush)276 gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
277 {
278   GST_OBJECT_LOCK (rtx);
279   gst_data_queue_set_flushing (rtx->queue, flush);
280   gst_data_queue_flush (rtx->queue);
281   GST_OBJECT_UNLOCK (rtx);
282 }
283 
284 static gboolean
gst_rtp_rtx_send_queue_check_full(GstDataQueue * queue,guint visible,guint bytes,guint64 time,gpointer checkdata)285 gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
286     guint visible, guint bytes, guint64 time, gpointer checkdata)
287 {
288   return FALSE;
289 }
290 
291 static void
gst_rtp_rtx_data_queue_item_free(gpointer item)292 gst_rtp_rtx_data_queue_item_free (gpointer item)
293 {
294   GstDataQueueItem *data = item;
295   if (data->object)
296     gst_mini_object_unref (data->object);
297   g_slice_free (GstDataQueueItem, data);
298 }
299 
300 static gboolean
gst_rtp_rtx_send_push_out(GstRtpRtxSend * rtx,gpointer object)301 gst_rtp_rtx_send_push_out (GstRtpRtxSend * rtx, gpointer object)
302 {
303   GstDataQueueItem *data;
304   gboolean success;
305 
306   data = g_slice_new0 (GstDataQueueItem);
307   data->object = GST_MINI_OBJECT (object);
308   data->size = 1;
309   data->duration = 1;
310   data->visible = TRUE;
311   data->destroy = gst_rtp_rtx_data_queue_item_free;
312 
313   success = gst_data_queue_push (rtx->queue, data);
314 
315   if (!success)
316     data->destroy (data);
317 
318   return success;
319 }
320 
321 static guint32
gst_rtp_rtx_send_choose_ssrc(GstRtpRtxSend * rtx,guint32 choice,gboolean consider_choice)322 gst_rtp_rtx_send_choose_ssrc (GstRtpRtxSend * rtx, guint32 choice,
323     gboolean consider_choice)
324 {
325   guint32 ssrc = consider_choice ? choice : g_random_int ();
326 
327   /* make sure to be different than any other */
328   while (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc)) ||
329       g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
330     ssrc = g_random_int ();
331   }
332 
333   return ssrc;
334 }
335 
336 static SSRCRtxData *
gst_rtp_rtx_send_get_ssrc_data(GstRtpRtxSend * rtx,guint32 ssrc)337 gst_rtp_rtx_send_get_ssrc_data (GstRtpRtxSend * rtx, guint32 ssrc)
338 {
339   SSRCRtxData *data;
340   guint32 rtx_ssrc = 0;
341   gboolean consider = FALSE;
342 
343   if (G_UNLIKELY (!g_hash_table_contains (rtx->ssrc_data,
344               GUINT_TO_POINTER (ssrc)))) {
345     if (rtx->external_ssrc_map) {
346       gchar *ssrc_str;
347       ssrc_str = g_strdup_printf ("%" G_GUINT32_FORMAT, ssrc);
348       consider = gst_structure_get_uint (rtx->external_ssrc_map, ssrc_str,
349           &rtx_ssrc);
350       g_free (ssrc_str);
351     }
352     rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, rtx_ssrc, consider);
353     data = ssrc_rtx_data_new (rtx_ssrc);
354     g_hash_table_insert (rtx->ssrc_data, GUINT_TO_POINTER (ssrc), data);
355     g_hash_table_insert (rtx->rtx_ssrcs, GUINT_TO_POINTER (rtx_ssrc),
356         GUINT_TO_POINTER (ssrc));
357   } else {
358     data = g_hash_table_lookup (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
359   }
360   return data;
361 }
362 
363 /* Copy fixed header and extension. Add OSN before to copy payload
364  * Copy memory to avoid to manually copy each rtp buffer field.
365  */
366 static GstBuffer *
gst_rtp_rtx_buffer_new(GstRtpRtxSend * rtx,GstBuffer * buffer)367 gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
368 {
369   GstMemory *mem = NULL;
370   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
371   GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
372   GstBuffer *new_buffer = gst_buffer_new ();
373   GstMapInfo map;
374   guint payload_len = 0;
375   SSRCRtxData *data;
376   guint32 ssrc;
377   guint16 seqnum;
378   guint8 fmtp;
379 
380   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
381 
382   /* get needed data from GstRtpRtxSend */
383   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
384   data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
385   ssrc = data->rtx_ssrc;
386   seqnum = data->next_seqnum++;
387   fmtp = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
388           GUINT_TO_POINTER (gst_rtp_buffer_get_payload_type (&rtp))));
389 
390   GST_DEBUG_OBJECT (rtx, "creating rtx buffer, orig seqnum: %u, "
391       "rtx seqnum: %u, rtx ssrc: %X", gst_rtp_buffer_get_seq (&rtp),
392       seqnum, ssrc);
393 
394   /* gst_rtp_buffer_map does not map the payload so do it now */
395   gst_rtp_buffer_get_payload (&rtp);
396 
397   /* copy fixed header */
398   mem = gst_memory_copy (rtp.map[0].memory, 0, rtp.size[0]);
399   gst_buffer_append_memory (new_buffer, mem);
400 
401   /* copy extension if any */
402   if (rtp.size[1]) {
403     mem = gst_allocator_alloc (NULL, rtp.size[1], NULL);
404     gst_memory_map (mem, &map, GST_MAP_WRITE);
405     memcpy (map.data, rtp.data[1], rtp.size[1]);
406     gst_memory_unmap (mem, &map);
407     gst_buffer_append_memory (new_buffer, mem);
408   }
409 
410   /* copy payload and add OSN just before */
411   payload_len = 2 + rtp.size[2];
412   mem = gst_allocator_alloc (NULL, payload_len, NULL);
413 
414   gst_memory_map (mem, &map, GST_MAP_WRITE);
415   GST_WRITE_UINT16_BE (map.data, gst_rtp_buffer_get_seq (&rtp));
416   if (rtp.size[2])
417     memcpy (map.data + 2, rtp.data[2], rtp.size[2]);
418   gst_memory_unmap (mem, &map);
419   gst_buffer_append_memory (new_buffer, mem);
420 
421   /* everything needed is copied */
422   gst_rtp_buffer_unmap (&rtp);
423 
424   /* set ssrc, seqnum and fmtp */
425   gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
426   gst_rtp_buffer_set_ssrc (&new_rtp, ssrc);
427   gst_rtp_buffer_set_seq (&new_rtp, seqnum);
428   gst_rtp_buffer_set_payload_type (&new_rtp, fmtp);
429   /* RFC 4588: let other elements do the padding, as normal */
430   gst_rtp_buffer_set_padding (&new_rtp, FALSE);
431   gst_rtp_buffer_unmap (&new_rtp);
432 
433   /* Copy over timestamps */
434   gst_buffer_copy_into (new_buffer, buffer, GST_BUFFER_COPY_TIMESTAMPS, 0, -1);
435 
436   return new_buffer;
437 }
438 
439 static gint
buffer_queue_items_cmp(BufferQueueItem * a,BufferQueueItem * b,gpointer user_data)440 buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b,
441     gpointer user_data)
442 {
443   /* gst_rtp_buffer_compare_seqnum returns the opposite of what we want,
444    * it returns negative when seqnum1 > seqnum2 and we want negative
445    * when b > a, i.e. a is smaller, so it comes first in the sequence */
446   return gst_rtp_buffer_compare_seqnum (b->seqnum, a->seqnum);
447 }
448 
449 static gboolean
gst_rtp_rtx_send_src_event(GstPad * pad,GstObject * parent,GstEvent * event)450 gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
451 {
452   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
453   gboolean res;
454 
455   switch (GST_EVENT_TYPE (event)) {
456     case GST_EVENT_CUSTOM_UPSTREAM:
457     {
458       const GstStructure *s = gst_event_get_structure (event);
459 
460       /* This event usually comes from the downstream gstrtpsession */
461       if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
462         guint seqnum = 0;
463         guint ssrc = 0;
464         GstBuffer *rtx_buf = NULL;
465 
466         /* retrieve seqnum of the packet that need to be retransmitted */
467         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
468           seqnum = -1;
469 
470         /* retrieve ssrc of the packet that need to be retransmitted */
471         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
472           ssrc = -1;
473 
474         GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X",
475             seqnum, ssrc);
476 
477         GST_OBJECT_LOCK (rtx);
478         /* check if request is for us */
479         if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
480           SSRCRtxData *data;
481           GSequenceIter *iter;
482           BufferQueueItem search_item;
483 
484           /* update statistics */
485           ++rtx->num_rtx_requests;
486 
487           data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
488 
489           search_item.seqnum = seqnum;
490           iter = g_sequence_lookup (data->queue, &search_item,
491               (GCompareDataFunc) buffer_queue_items_cmp, NULL);
492           if (iter) {
493             BufferQueueItem *item = g_sequence_get (iter);
494             GST_LOG_OBJECT (rtx, "found %u", item->seqnum);
495             rtx_buf = gst_rtp_rtx_buffer_new (rtx, item->buffer);
496           }
497 #ifndef GST_DISABLE_DEBUG
498           else {
499             BufferQueueItem *item = NULL;
500 
501             iter = g_sequence_get_begin_iter (data->queue);
502             if (!g_sequence_iter_is_end (iter))
503               item = g_sequence_get (iter);
504 
505             if (item && seqnum < item->seqnum) {
506               GST_DEBUG_OBJECT (rtx, "requested seqnum %u has already been "
507                   "removed from the rtx queue; the first available is %u",
508                   seqnum, item->seqnum);
509             } else {
510               GST_WARNING_OBJECT (rtx, "requested seqnum %u has not been "
511                   "transmitted yet in the original stream; either the remote end "
512                   "is not configured correctly, or the source is too slow",
513                   seqnum);
514             }
515           }
516 #endif
517         }
518         GST_OBJECT_UNLOCK (rtx);
519 
520         if (rtx_buf)
521           gst_rtp_rtx_send_push_out (rtx, rtx_buf);
522 
523         gst_event_unref (event);
524         res = TRUE;
525 
526         /* This event usually comes from the downstream gstrtpsession */
527       } else if (gst_structure_has_name (s, "GstRTPCollision")) {
528         guint ssrc = 0;
529 
530         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
531           ssrc = -1;
532 
533         GST_DEBUG_OBJECT (rtx, "got ssrc collision, ssrc: %X", ssrc);
534 
535         GST_OBJECT_LOCK (rtx);
536 
537         /* choose another ssrc for our retransmited stream */
538         if (g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
539           guint master_ssrc;
540           SSRCRtxData *data;
541 
542           master_ssrc = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_ssrcs,
543                   GUINT_TO_POINTER (ssrc)));
544           data = gst_rtp_rtx_send_get_ssrc_data (rtx, master_ssrc);
545 
546           /* change rtx_ssrc and update the reverse map */
547           data->rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, 0, FALSE);
548           g_hash_table_remove (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc));
549           g_hash_table_insert (rtx->rtx_ssrcs,
550               GUINT_TO_POINTER (data->rtx_ssrc),
551               GUINT_TO_POINTER (master_ssrc));
552 
553           GST_OBJECT_UNLOCK (rtx);
554 
555           /* no need to forward to payloader because we make sure to have
556            * a different ssrc
557            */
558           gst_event_unref (event);
559           res = TRUE;
560         } else {
561           /* if master ssrc has collided, remove it from our data, as it
562            * is not going to be used any longer */
563           if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
564             SSRCRtxData *data;
565             data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
566             g_hash_table_remove (rtx->rtx_ssrcs,
567                 GUINT_TO_POINTER (data->rtx_ssrc));
568             g_hash_table_remove (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
569           }
570 
571           GST_OBJECT_UNLOCK (rtx);
572 
573           /* forward event to payloader in case collided ssrc is
574            * master stream */
575           res = gst_pad_event_default (pad, parent, event);
576         }
577       } else {
578         res = gst_pad_event_default (pad, parent, event);
579       }
580       break;
581     }
582     default:
583       res = gst_pad_event_default (pad, parent, event);
584       break;
585   }
586   return res;
587 }
588 
589 static gboolean
gst_rtp_rtx_send_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)590 gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
591 {
592   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
593 
594   switch (GST_EVENT_TYPE (event)) {
595     case GST_EVENT_FLUSH_START:
596       gst_pad_push_event (rtx->srcpad, event);
597       gst_rtp_rtx_send_set_flushing (rtx, TRUE);
598       gst_pad_pause_task (rtx->srcpad);
599       return TRUE;
600     case GST_EVENT_FLUSH_STOP:
601       gst_pad_push_event (rtx->srcpad, event);
602       gst_rtp_rtx_send_set_flushing (rtx, FALSE);
603       gst_pad_start_task (rtx->srcpad,
604           (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
605       return TRUE;
606     case GST_EVENT_EOS:
607       GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it");
608       gst_rtp_rtx_send_push_out (rtx, event);
609       return TRUE;
610     case GST_EVENT_CAPS:
611     {
612       GstCaps *caps;
613       GstStructure *s;
614       guint ssrc;
615       gint payload;
616       gpointer rtx_payload;
617       SSRCRtxData *data;
618 
619       gst_event_parse_caps (event, &caps);
620 
621       s = gst_caps_get_structure (caps, 0);
622       if (!gst_structure_get_uint (s, "ssrc", &ssrc))
623         ssrc = -1;
624       if (!gst_structure_get_int (s, "payload", &payload))
625         payload = -1;
626 
627       if (payload == -1)
628         GST_WARNING_OBJECT (rtx, "No payload in caps");
629 
630       GST_OBJECT_LOCK (rtx);
631       data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
632       if (!g_hash_table_lookup_extended (rtx->rtx_pt_map,
633               GUINT_TO_POINTER (payload), NULL, &rtx_payload))
634         rtx_payload = GINT_TO_POINTER (-1);
635 
636       if (GPOINTER_TO_INT (rtx_payload) == -1 && payload != -1)
637         GST_WARNING_OBJECT (rtx, "Payload %d not in rtx-pt-map", payload);
638 
639       GST_DEBUG_OBJECT (rtx,
640           "got caps for payload: %d->%d, ssrc: %u->%u : %" GST_PTR_FORMAT,
641           payload, GPOINTER_TO_INT (rtx_payload), ssrc, data->rtx_ssrc, caps);
642 
643       gst_structure_get_int (s, "clock-rate", &data->clock_rate);
644 
645       /* The session might need to know the RTX ssrc */
646       caps = gst_caps_copy (caps);
647       gst_caps_set_simple (caps, "rtx-ssrc", G_TYPE_UINT, data->rtx_ssrc,
648           "rtx-seqnum-offset", G_TYPE_UINT, data->seqnum_base, NULL);
649 
650       if (GPOINTER_TO_INT (rtx_payload) != -1)
651         gst_caps_set_simple (caps, "rtx-payload", G_TYPE_INT,
652             GPOINTER_TO_INT (rtx_payload), NULL);
653 
654       GST_DEBUG_OBJECT (rtx, "got clock-rate from caps: %d for ssrc: %u",
655           data->clock_rate, ssrc);
656       GST_OBJECT_UNLOCK (rtx);
657 
658       gst_event_unref (event);
659       event = gst_event_new_caps (caps);
660       gst_caps_unref (caps);
661       break;
662     }
663     default:
664       break;
665   }
666   return gst_pad_event_default (pad, parent, event);
667 }
668 
669 /* like rtp_jitter_buffer_get_ts_diff() */
670 static guint32
gst_rtp_rtx_send_get_ts_diff(SSRCRtxData * data)671 gst_rtp_rtx_send_get_ts_diff (SSRCRtxData * data)
672 {
673   guint64 high_ts, low_ts;
674   BufferQueueItem *high_buf, *low_buf;
675   guint32 result;
676 
677   high_buf =
678       g_sequence_get (g_sequence_iter_prev (g_sequence_get_end_iter
679           (data->queue)));
680   low_buf = g_sequence_get (g_sequence_get_begin_iter (data->queue));
681 
682   if (!high_buf || !low_buf || high_buf == low_buf)
683     return 0;
684 
685   high_ts = high_buf->timestamp;
686   low_ts = low_buf->timestamp;
687 
688   /* it needs to work if ts wraps */
689   if (high_ts >= low_ts) {
690     result = (guint32) (high_ts - low_ts);
691   } else {
692     result = (guint32) (high_ts + G_MAXUINT32 + 1 - low_ts);
693   }
694 
695   /* return value in ms instead of clock ticks */
696   return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate);
697 }
698 
699 /* Must be called with lock */
700 static void
process_buffer(GstRtpRtxSend * rtx,GstBuffer * buffer)701 process_buffer (GstRtpRtxSend * rtx, GstBuffer * buffer)
702 {
703   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
704   BufferQueueItem *item;
705   SSRCRtxData *data;
706   guint16 seqnum;
707   guint8 payload_type;
708   guint32 ssrc, rtptime;
709 
710   /* read the information we want from the buffer */
711   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
712   seqnum = gst_rtp_buffer_get_seq (&rtp);
713   payload_type = gst_rtp_buffer_get_payload_type (&rtp);
714   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
715   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
716   gst_rtp_buffer_unmap (&rtp);
717 
718   GST_TRACE_OBJECT (rtx, "Processing buffer seqnum: %u, ssrc: %X", seqnum,
719       ssrc);
720 
721   /* do not store the buffer if it's payload type is unknown */
722   if (g_hash_table_contains (rtx->rtx_pt_map, GUINT_TO_POINTER (payload_type))) {
723     data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
724 
725     /* add current rtp buffer to queue history */
726     item = g_slice_new0 (BufferQueueItem);
727     item->seqnum = seqnum;
728     item->timestamp = rtptime;
729     item->buffer = gst_buffer_ref (buffer);
730     g_sequence_append (data->queue, item);
731 
732     /* remove oldest packets from history if they are too many */
733     if (rtx->max_size_packets) {
734       while (g_sequence_get_length (data->queue) > rtx->max_size_packets)
735         g_sequence_remove (g_sequence_get_begin_iter (data->queue));
736     }
737     if (rtx->max_size_time) {
738       while (gst_rtp_rtx_send_get_ts_diff (data) > rtx->max_size_time)
739         g_sequence_remove (g_sequence_get_begin_iter (data->queue));
740     }
741   }
742 }
743 
744 static GstFlowReturn
gst_rtp_rtx_send_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)745 gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
746 {
747   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
748   GstFlowReturn ret;
749 
750   GST_OBJECT_LOCK (rtx);
751   process_buffer (rtx, buffer);
752   GST_OBJECT_UNLOCK (rtx);
753   ret = gst_pad_push (rtx->srcpad, buffer);
754 
755   return ret;
756 }
757 
758 static gboolean
process_buffer_from_list(GstBuffer ** buffer,guint idx,gpointer user_data)759 process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
760 {
761   process_buffer (user_data, *buffer);
762   return TRUE;
763 }
764 
765 static GstFlowReturn
gst_rtp_rtx_send_chain_list(GstPad * pad,GstObject * parent,GstBufferList * list)766 gst_rtp_rtx_send_chain_list (GstPad * pad, GstObject * parent,
767     GstBufferList * list)
768 {
769   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
770   GstFlowReturn ret;
771 
772   GST_OBJECT_LOCK (rtx);
773   gst_buffer_list_foreach (list, process_buffer_from_list, rtx);
774   GST_OBJECT_UNLOCK (rtx);
775 
776   ret = gst_pad_push_list (rtx->srcpad, list);
777 
778   return ret;
779 }
780 
781 static void
gst_rtp_rtx_send_src_loop(GstRtpRtxSend * rtx)782 gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx)
783 {
784   GstDataQueueItem *data;
785 
786   if (gst_data_queue_pop (rtx->queue, &data)) {
787     GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object);
788 
789     if (G_LIKELY (GST_IS_BUFFER (data->object))) {
790       GST_OBJECT_LOCK (rtx);
791       /* Update statistics just before pushing. */
792       rtx->num_rtx_packets++;
793       GST_OBJECT_UNLOCK (rtx);
794 
795       gst_pad_push (rtx->srcpad, GST_BUFFER (data->object));
796     } else if (GST_IS_EVENT (data->object)) {
797       gst_pad_push_event (rtx->srcpad, GST_EVENT (data->object));
798 
799       /* after EOS, we should not send any more buffers,
800        * even if there are more requests coming in */
801       if (GST_EVENT_TYPE (data->object) == GST_EVENT_EOS) {
802         gst_rtp_rtx_send_set_flushing (rtx, TRUE);
803       }
804     } else {
805       g_assert_not_reached ();
806     }
807 
808     data->object = NULL;        /* we no longer own that object */
809     data->destroy (data);
810   } else {
811     GST_LOG_OBJECT (rtx, "flushing");
812     gst_pad_pause_task (rtx->srcpad);
813   }
814 }
815 
816 static gboolean
gst_rtp_rtx_send_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)817 gst_rtp_rtx_send_activate_mode (GstPad * pad, GstObject * parent,
818     GstPadMode mode, gboolean active)
819 {
820   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
821   gboolean ret = FALSE;
822 
823   switch (mode) {
824     case GST_PAD_MODE_PUSH:
825       if (active) {
826         gst_rtp_rtx_send_set_flushing (rtx, FALSE);
827         ret = gst_pad_start_task (rtx->srcpad,
828             (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
829       } else {
830         gst_rtp_rtx_send_set_flushing (rtx, TRUE);
831         ret = gst_pad_stop_task (rtx->srcpad);
832       }
833       GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret);
834       break;
835     default:
836       break;
837   }
838   return ret;
839 }
840 
841 static void
gst_rtp_rtx_send_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)842 gst_rtp_rtx_send_get_property (GObject * object,
843     guint prop_id, GValue * value, GParamSpec * pspec)
844 {
845   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
846 
847   switch (prop_id) {
848     case PROP_PAYLOAD_TYPE_MAP:
849       GST_OBJECT_LOCK (rtx);
850       g_value_set_boxed (value, rtx->rtx_pt_map_structure);
851       GST_OBJECT_UNLOCK (rtx);
852       break;
853     case PROP_MAX_SIZE_TIME:
854       GST_OBJECT_LOCK (rtx);
855       g_value_set_uint (value, rtx->max_size_time);
856       GST_OBJECT_UNLOCK (rtx);
857       break;
858     case PROP_MAX_SIZE_PACKETS:
859       GST_OBJECT_LOCK (rtx);
860       g_value_set_uint (value, rtx->max_size_packets);
861       GST_OBJECT_UNLOCK (rtx);
862       break;
863     case PROP_NUM_RTX_REQUESTS:
864       GST_OBJECT_LOCK (rtx);
865       g_value_set_uint (value, rtx->num_rtx_requests);
866       GST_OBJECT_UNLOCK (rtx);
867       break;
868     case PROP_NUM_RTX_PACKETS:
869       GST_OBJECT_LOCK (rtx);
870       g_value_set_uint (value, rtx->num_rtx_packets);
871       GST_OBJECT_UNLOCK (rtx);
872       break;
873     default:
874       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
875       break;
876   }
877 }
878 
879 static gboolean
structure_to_hash_table(GQuark field_id,const GValue * value,gpointer hash)880 structure_to_hash_table (GQuark field_id, const GValue * value, gpointer hash)
881 {
882   const gchar *field_str;
883   guint field_uint;
884   guint value_uint;
885 
886   field_str = g_quark_to_string (field_id);
887   field_uint = atoi (field_str);
888   value_uint = g_value_get_uint (value);
889   g_hash_table_insert ((GHashTable *) hash, GUINT_TO_POINTER (field_uint),
890       GUINT_TO_POINTER (value_uint));
891 
892   return TRUE;
893 }
894 
895 static void
gst_rtp_rtx_send_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)896 gst_rtp_rtx_send_set_property (GObject * object,
897     guint prop_id, const GValue * value, GParamSpec * pspec)
898 {
899   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
900 
901   switch (prop_id) {
902     case PROP_SSRC_MAP:
903       GST_OBJECT_LOCK (rtx);
904       if (rtx->external_ssrc_map)
905         gst_structure_free (rtx->external_ssrc_map);
906       rtx->external_ssrc_map = g_value_dup_boxed (value);
907       GST_OBJECT_UNLOCK (rtx);
908       break;
909     case PROP_PAYLOAD_TYPE_MAP:
910       GST_OBJECT_LOCK (rtx);
911       if (rtx->rtx_pt_map_structure)
912         gst_structure_free (rtx->rtx_pt_map_structure);
913       rtx->rtx_pt_map_structure = g_value_dup_boxed (value);
914       g_hash_table_remove_all (rtx->rtx_pt_map);
915       gst_structure_foreach (rtx->rtx_pt_map_structure, structure_to_hash_table,
916           rtx->rtx_pt_map);
917       GST_OBJECT_UNLOCK (rtx);
918       break;
919     case PROP_MAX_SIZE_TIME:
920       GST_OBJECT_LOCK (rtx);
921       rtx->max_size_time = g_value_get_uint (value);
922       GST_OBJECT_UNLOCK (rtx);
923       break;
924     case PROP_MAX_SIZE_PACKETS:
925       GST_OBJECT_LOCK (rtx);
926       rtx->max_size_packets = g_value_get_uint (value);
927       GST_OBJECT_UNLOCK (rtx);
928       break;
929     default:
930       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
931       break;
932   }
933 }
934 
935 static GstStateChangeReturn
gst_rtp_rtx_send_change_state(GstElement * element,GstStateChange transition)936 gst_rtp_rtx_send_change_state (GstElement * element, GstStateChange transition)
937 {
938   GstStateChangeReturn ret;
939   GstRtpRtxSend *rtx;
940 
941   rtx = GST_RTP_RTX_SEND (element);
942 
943   switch (transition) {
944     default:
945       break;
946   }
947 
948   ret =
949       GST_ELEMENT_CLASS (gst_rtp_rtx_send_parent_class)->change_state (element,
950       transition);
951 
952   switch (transition) {
953     case GST_STATE_CHANGE_PAUSED_TO_READY:
954       gst_rtp_rtx_send_reset (rtx);
955       break;
956     default:
957       break;
958   }
959 
960   return ret;
961 }
962 
963 gboolean
gst_rtp_rtx_send_plugin_init(GstPlugin * plugin)964 gst_rtp_rtx_send_plugin_init (GstPlugin * plugin)
965 {
966   GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_send_debug, "rtprtxsend", 0,
967       "rtp retransmission sender");
968 
969   return gst_element_register (plugin, "rtprtxsend", GST_RANK_NONE,
970       GST_TYPE_RTP_RTX_SEND);
971 }
972