1 /*
2  * Farstream - Farstream RTP TFRC support
3  *
4  * Copyright 2010 Collabora Ltd.
5  *  @author: Olivier Crete <olivier.crete@collabora.co.uk>
6  * Copyright 2010 Nokia Corp.
7  *
8  * fs-rtp-tfrc.c - Rate control for Farstream RTP sessions
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License as published by the Free Software Foundation; either
13  * version 2.1 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Lesser General Public License for more details.
19  *
20  * You should have received a copy of the GNU Lesser General Public
21  * License along with this library; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
23  */
24 
25 #ifdef HAVE_CONFIG_H
26 #include "config.h"
27 #endif
28 
29 #include "fs-rtp-tfrc.h"
30 
31 #include <string.h>
32 
33 #include "fs-rtp-packet-modder.h"
34 #include "farstream/fs-rtp.h"
35 #include "fs-rtp-codec-negotiation.h"
36 
37 #include <gst/rtp/gstrtpbuffer.h>
38 #include <gst/rtp/gstrtcpbuffer.h>
39 
40 #define ONE_32BIT_CYCLE ((guint64) (((guint64)0xffffffff) + ((guint64)1)))
41 
42 
43 GST_DEBUG_CATEGORY_STATIC (fsrtpconference_tfrc);
44 #define GST_CAT_DEFAULT fsrtpconference_tfrc
45 
46 G_DEFINE_TYPE (FsRtpTfrc, fs_rtp_tfrc, GST_TYPE_OBJECT);
47 
48 /* props */
49 enum
50 {
51   PROP_0,
52   PROP_BITRATE,
53   PROP_SENDING
54 };
55 
56 static void fs_rtp_tfrc_get_property (GObject *object,
57     guint prop_id,
58     GValue *value,
59     GParamSpec *pspec);
60 static void fs_rtp_tfrc_set_property (GObject *object,
61     guint prop_id,
62     const GValue *value,
63     GParamSpec *pspec);
64 static void fs_rtp_tfrc_dispose (GObject *object);
65 
66 static void fs_rtp_tfrc_update_sender_timer_locked (
67   FsRtpTfrc *self,
68   struct TrackedSource *src,
69   guint64 now);
70 
71 static gboolean feedback_timer_expired (GstClock *clock, GstClockTime time,
72     GstClockID id, gpointer user_data);
73 
74 static void fs_rtp_tfrc_clear_sender (FsRtpTfrc *self);
75 
76 static void
fs_rtp_tfrc_class_init(FsRtpTfrcClass * klass)77 fs_rtp_tfrc_class_init (FsRtpTfrcClass *klass)
78 {
79   GObjectClass *gobject_class;
80 
81   gobject_class = (GObjectClass *) klass;
82 
83   gobject_class->get_property = fs_rtp_tfrc_get_property;
84   gobject_class->set_property = fs_rtp_tfrc_set_property;
85   gobject_class->dispose = fs_rtp_tfrc_dispose;
86 
87   g_object_class_install_property (gobject_class,
88       PROP_BITRATE,
89       g_param_spec_uint ("bitrate",
90           "The bitrate at which data should be sent",
91           "The bitrate that the session should try to send at in bits/sec",
92           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
93 
94  g_object_class_install_property (gobject_class,
95       PROP_SENDING,
96       g_param_spec_boolean ("sending",
97           "The bitrate at which data should be sent",
98           "The bitrate that the session should try to send at in bits/sec",
99           FALSE, G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
100 }
101 
102 
103 static struct TrackedSource *
tracked_src_new(FsRtpTfrc * self)104 tracked_src_new (FsRtpTfrc *self)
105 {
106   struct TrackedSource *src;
107 
108   src = g_slice_new0 (struct TrackedSource);
109   src->self = self;
110   src->next_feedback_timer = G_MAXUINT64;
111 
112   return src;
113 }
114 
115 static void
tracked_src_free(struct TrackedSource * src)116 tracked_src_free (struct TrackedSource *src)
117 {
118   if (src->sender_id)
119   {
120     gst_clock_id_unschedule (src->sender_id);
121     gst_clock_id_unref (src->sender_id);
122   }
123 
124   if (src->receiver_id)
125   {
126     gst_clock_id_unschedule (src->receiver_id);
127     gst_clock_id_unref (src->receiver_id);
128   }
129 
130   if (src->rtpsource)
131     g_object_unref (src->rtpsource);
132 
133   if (src->sender)
134     tfrc_sender_free (src->sender);
135   if (src->receiver)
136     tfrc_receiver_free (src->receiver);
137 
138   if (src->idl)
139     tfrc_is_data_limited_free (src->idl);
140 
141   g_slice_free (struct TrackedSource, src);
142 }
143 
144 static void
fs_rtp_tfrc_init(FsRtpTfrc * self)145 fs_rtp_tfrc_init (FsRtpTfrc *self)
146 {
147   GST_DEBUG_CATEGORY_INIT (fsrtpconference_tfrc,
148       "fsrtpconference_tfrc", 0,
149       "Farstream RTP Conference Element Rate Control logic");
150 
151   /* member init */
152 
153   self->tfrc_sources = g_hash_table_new_full (g_direct_hash,
154       g_direct_equal, NULL, (GDestroyNotify) tracked_src_free);
155 
156   fs_rtp_tfrc_clear_sender (self);
157   self->send_bitrate = tfrc_sender_get_send_rate (NULL)  * 8;
158 
159   self->extension_type = EXTENSION_NONE;
160   self->extension_id = 0;
161   memset (self->pts, 0, 128 * sizeof (gboolean));
162 
163   self->systemclock = gst_system_clock_obtain ();
164 }
165 
166 void
fs_rtp_tfrc_destroy(FsRtpTfrc * self)167 fs_rtp_tfrc_destroy (FsRtpTfrc *self)
168 {
169   GST_OBJECT_LOCK (self);
170 
171   if (self->modder_check_probe_id)
172     gst_pad_remove_probe (self->in_rtp_pad, self->modder_check_probe_id);
173   self->modder_check_probe_id = 0;
174 
175   if (self->in_rtp_probe_id)
176     gst_pad_remove_probe (self->in_rtp_pad, self->in_rtp_probe_id);
177   self->in_rtp_probe_id = 0;
178   if (self->in_rtcp_probe_id)
179     gst_pad_remove_probe (self->in_rtcp_pad, self->in_rtcp_probe_id);
180   self->in_rtcp_probe_id = 0;
181 
182 
183   if (self->on_ssrc_validated_id)
184     g_signal_handler_disconnect (self->rtpsession, self->on_ssrc_validated_id);
185   self->on_ssrc_validated_id = 0;
186   if (self->on_sending_rtcp_id)
187     g_signal_handler_disconnect (self->rtpsession, self->on_sending_rtcp_id);
188   self->on_sending_rtcp_id = 0;
189 
190   g_hash_table_destroy (g_hash_table_ref (self->tfrc_sources));
191 
192   self->fsrtpsession = NULL;
193 
194   GST_OBJECT_UNLOCK (self);
195 }
196 
197 static void
fs_rtp_tfrc_dispose(GObject * object)198 fs_rtp_tfrc_dispose (GObject *object)
199 {
200   FsRtpTfrc *self = FS_RTP_TFRC (object);
201 
202   GST_OBJECT_LOCK (self);
203 
204   if (self->tfrc_sources)
205     g_hash_table_destroy (self->tfrc_sources);
206   self->tfrc_sources = NULL;
207   self->last_src = NULL;
208 
209   if (self->initial_src)
210     tracked_src_free (self->initial_src);
211   self->initial_src = NULL;
212 
213   if (self->packet_modder)
214   {
215     gst_bin_remove (self->parent_bin, self->packet_modder);
216     gst_element_set_state (self->packet_modder, GST_STATE_NULL);
217     g_object_unref (self->packet_modder);
218   }
219 
220   if (self->rtpsession)
221       g_object_unref (self->rtpsession);
222   if (self->in_rtp_pad)
223     g_object_unref (self->in_rtp_pad);
224   if (self->in_rtcp_pad)
225     g_object_unref (self->in_rtcp_pad);
226   if (self->out_rtp_pad)
227     g_object_unref (self->out_rtp_pad);
228 
229   if (self->parent_bin)
230     gst_object_unref (self->parent_bin);
231 
232   gst_object_unref (self->systemclock);
233   self->systemclock = NULL;
234 
235   GST_OBJECT_UNLOCK (self);
236 
237   if (G_OBJECT_CLASS (fs_rtp_tfrc_parent_class)->dispose)
238     G_OBJECT_CLASS (fs_rtp_tfrc_parent_class)->dispose (object);
239 }
240 
241 
242 static void
fs_rtp_tfrc_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)243 fs_rtp_tfrc_get_property (GObject *object,
244     guint prop_id,
245     GValue *value,
246     GParamSpec *pspec)
247 {
248   FsRtpTfrc *self = FS_RTP_TFRC (object);
249 
250   switch (prop_id)
251   {
252     case PROP_BITRATE:
253       GST_OBJECT_LOCK (self);
254       g_value_set_uint (value, self->send_bitrate);
255       GST_OBJECT_UNLOCK (self);
256       break;
257     default:
258       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
259       break;
260   }
261 }
262 
263 gboolean
clear_sender(gpointer key,gpointer value,gpointer user_data)264 clear_sender (gpointer key, gpointer value, gpointer user_data)
265 {
266   FsRtpTfrc *self = FS_RTP_TFRC (user_data);
267   struct TrackedSource *src = value;
268 
269   src->send_ts_base = 0;
270   src->send_ts_cycles = 0;
271   src->fb_last_ts = 0;
272   src->fb_ts_cycles = 0;
273 
274   if (src->sender_id)
275   {
276     gst_clock_id_unschedule (src->sender_id);
277     gst_clock_id_unref (src->sender_id);
278     src->sender_id = 0;
279   }
280 
281   if (src->sender)
282     tfrc_sender_free (src->sender);
283   src->sender = NULL;
284 
285   if (src->idl)
286   {
287     tfrc_is_data_limited_free (src->idl);
288     src->idl = NULL;
289   }
290 
291   if (self->last_src == src)
292     self->last_src = NULL;
293 
294   if (src->receiver)
295     return FALSE;
296   else
297     return TRUE;
298 }
299 
300 static void
fs_rtp_tfrc_clear_sender(FsRtpTfrc * self)301 fs_rtp_tfrc_clear_sender (FsRtpTfrc *self)
302 {
303   g_hash_table_foreach_remove (self->tfrc_sources, clear_sender, self);
304   if (self->initial_src)
305     if (clear_sender (NULL, self->initial_src, self))
306       self->initial_src = NULL;
307 
308   self->last_sent_ts = GST_CLOCK_TIME_NONE;
309   self->byte_reservoir = 1500; /* About one packet */
310 }
311 
312 static void
fs_rtp_tfrc_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)313 fs_rtp_tfrc_set_property (GObject *object,
314     guint prop_id,
315     const GValue *value,
316     GParamSpec *pspec)
317 {
318   FsRtpTfrc *self = FS_RTP_TFRC (object);
319 
320   switch (prop_id)
321   {
322     case PROP_SENDING:
323       GST_OBJECT_LOCK (self);
324       self->sending = g_value_get_boolean (value);
325       if (!self->sending)
326         fs_rtp_tfrc_clear_sender (self);
327       GST_OBJECT_UNLOCK (self);
328       break;
329     default:
330       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
331       break;
332   }
333 }
334 
335 
336 static gboolean
fs_rtp_tfrc_update_bitrate_locked(FsRtpTfrc * self,const gchar * source)337 fs_rtp_tfrc_update_bitrate_locked (FsRtpTfrc *self, const gchar *source)
338 {
339   guint byterate;
340   guint new_bitrate;
341   gboolean ret;
342 
343   if (self->last_src && self->last_src->sender)
344     byterate = tfrc_sender_get_send_rate (self->last_src->sender);
345   else
346     byterate = tfrc_sender_get_send_rate (NULL);
347 
348   if (G_LIKELY (byterate < G_MAXUINT / 8))
349     new_bitrate = byterate * 8;
350   else
351     new_bitrate = G_MAXUINT;
352 
353   ret = self->send_bitrate != new_bitrate;
354 
355   if (ret)
356     GST_DEBUG_OBJECT (self, "Send rate changed (%s): %u -> %u", source,
357         self->send_bitrate, new_bitrate);
358 
359   self->send_bitrate = new_bitrate;
360 
361   return ret;
362 }
363 
364 static guint64
fs_rtp_tfrc_get_now(FsRtpTfrc * self)365 fs_rtp_tfrc_get_now (FsRtpTfrc *self)
366 {
367   return GST_TIME_AS_USECONDS (gst_clock_get_time (self->systemclock));
368 }
369 
370 
371 static struct TrackedSource *
fs_rtp_tfrc_get_remote_ssrc_locked(FsRtpTfrc * self,guint ssrc,GObject * rtpsource)372 fs_rtp_tfrc_get_remote_ssrc_locked (FsRtpTfrc *self, guint ssrc,
373   GObject *rtpsource)
374 {
375   struct TrackedSource *src;
376 
377   src = g_hash_table_lookup (self->tfrc_sources, GUINT_TO_POINTER (ssrc));
378 
379   if (G_LIKELY (src))
380   {
381     if (G_UNLIKELY (rtpsource && !src->rtpsource))
382       src->rtpsource = g_object_ref (rtpsource);
383 
384     return src;
385   }
386 
387   if (self->initial_src)
388   {
389     src = self->initial_src;
390     self->initial_src = NULL;
391     src->ssrc = ssrc;
392     if (rtpsource && !src->rtpsource)
393       src->rtpsource = g_object_ref (rtpsource);
394     g_hash_table_insert (self->tfrc_sources, GUINT_TO_POINTER (ssrc), src);
395     return src;
396   }
397 
398   src = tracked_src_new (self);
399   src->ssrc = ssrc;
400   if (rtpsource)
401     src->rtpsource = g_object_ref (rtpsource);
402 
403   if (!self->last_src)
404     self->last_src = src;
405 
406   g_hash_table_insert (self->tfrc_sources, GUINT_TO_POINTER (ssrc), src);
407 
408   return src;
409 }
410 
411 static void
rtpsession_on_ssrc_validated(GObject * rtpsession,GObject * rtpsource,FsRtpTfrc * self)412 rtpsession_on_ssrc_validated (GObject *rtpsession, GObject *rtpsource,
413     FsRtpTfrc *self)
414 {
415   guint32 ssrc;
416 
417   g_object_get (rtpsource, "ssrc", &ssrc, NULL);
418 
419   GST_DEBUG_OBJECT (self, "ssrc validate: %X", ssrc);
420 
421   GST_OBJECT_LOCK (self);
422   fs_rtp_tfrc_get_remote_ssrc_locked (self, ssrc, rtpsource);
423   GST_OBJECT_UNLOCK (self);
424 }
425 
426 struct TimerData
427 {
428   FsRtpTfrc *self;
429   guint ssrc;
430 };
431 
432 static struct TimerData *
build_timer_data(FsRtpTfrc * self,guint ssrc)433 build_timer_data (FsRtpTfrc *self, guint ssrc)
434 {
435   struct TimerData *td = g_slice_new0 (struct TimerData);
436 
437   td->self = g_object_ref (self);
438   td->ssrc = ssrc;
439 
440   return td;
441 }
442 
443 static void
free_timer_data(gpointer data)444 free_timer_data (gpointer data)
445 {
446   struct TimerData *td = data;
447   g_object_unref (td->self);
448   g_slice_free (struct TimerData, td);
449 }
450 
451 static void
fs_rtp_tfrc_set_receiver_timer_locked(FsRtpTfrc * self,struct TrackedSource * src,guint64 now)452 fs_rtp_tfrc_set_receiver_timer_locked (FsRtpTfrc *self,
453     struct TrackedSource *src, guint64 now)
454 {
455   guint64 expiry = tfrc_receiver_get_feedback_timer_expiry (src->receiver);
456   GstClockReturn cret;
457 
458   if (expiry == 0)
459     return;
460 
461   if (src->receiver_id)
462   {
463     if (src->next_feedback_timer <= expiry)
464       return;
465 
466     gst_clock_id_unschedule (src->receiver_id);
467     gst_clock_id_unref (src->receiver_id);
468     src->receiver_id = NULL;
469   }
470   src->next_feedback_timer = expiry;
471 
472   g_assert (expiry != now);
473 
474   src->receiver_id = gst_clock_new_single_shot_id (self->systemclock,
475       expiry * GST_USECOND);
476 
477   cret = gst_clock_id_wait_async (src->receiver_id, feedback_timer_expired,
478       build_timer_data (self, src->ssrc), free_timer_data);
479   if (cret != GST_CLOCK_OK)
480     GST_ERROR_OBJECT (self,
481         "Could not schedule feedback time for %" G_GUINT64_FORMAT
482         " (now %" G_GUINT64_FORMAT ") error: %d", expiry, now, cret);
483 }
484 
485 static void
fs_rtp_tfrc_receiver_timer_func_locked(FsRtpTfrc * self,struct TrackedSource * src,guint64 now)486 fs_rtp_tfrc_receiver_timer_func_locked (FsRtpTfrc *self,
487     struct TrackedSource *src, guint64 now)
488 {
489   guint64 expiry;
490 
491   if (src->receiver_id)
492   {
493     gst_clock_id_unschedule (src->receiver_id);
494     gst_clock_id_unref (src->receiver_id);
495     src->receiver_id = NULL;
496   }
497 
498   expiry = tfrc_receiver_get_feedback_timer_expiry (src->receiver);
499 
500   if (expiry <= now &&
501       tfrc_receiver_feedback_timer_expired (src->receiver, now))
502   {
503     src->send_feedback = TRUE;
504     g_signal_emit_by_name (self->rtpsession, "send-rtcp", (guint64) 0);
505   }
506   else
507   {
508     fs_rtp_tfrc_set_receiver_timer_locked (self, src, now);
509   }
510 }
511 
512 static gboolean
feedback_timer_expired(GstClock * clock,GstClockTime time,GstClockID id,gpointer user_data)513 feedback_timer_expired (GstClock *clock, GstClockTime time, GstClockID id,
514   gpointer user_data)
515 {
516   struct TimerData *td = user_data;
517   struct TrackedSource *src;
518   guint64 now;
519 
520   if (time == GST_CLOCK_TIME_NONE)
521     return FALSE;
522 
523   GST_OBJECT_LOCK (td->self);
524 
525   src = g_hash_table_lookup (td->self->tfrc_sources,
526       GUINT_TO_POINTER (td->ssrc));
527 
528   now = fs_rtp_tfrc_get_now (td->self);
529 
530   if (G_LIKELY (src && src->receiver_id == id))
531     fs_rtp_tfrc_receiver_timer_func_locked (td->self, src, now);
532 
533   GST_OBJECT_UNLOCK (td->self);
534 
535   return FALSE;
536 }
537 
538 
539 struct SendingRtcpData {
540   FsRtpTfrc *self;
541   GstRTCPBuffer rtcpbuffer;
542   gboolean ret;
543   guint32 ssrc;
544   gboolean have_ssrc;
545 };
546 
547 static void
tfrc_sources_process(gpointer key,gpointer value,gpointer user_data)548 tfrc_sources_process (gpointer key, gpointer value, gpointer user_data)
549 {
550   struct SendingRtcpData *data = user_data;
551   struct TrackedSource *src = value;
552   GstRTCPPacket packet;
553   guint8 *pdata;
554   guint64 now;
555   gdouble loss_event_rate;
556   guint receive_rate;
557 
558   if (!src->receiver)
559     return;
560 
561   if (src->got_nohdr_pkt)
562     return;
563 
564   now = fs_rtp_tfrc_get_now (data->self);
565 
566   if (!src->send_feedback)
567     goto done;
568 
569   if (!gst_rtcp_buffer_add_packet (&data->rtcpbuffer, GST_RTCP_TYPE_RTPFB,
570           &packet))
571     goto done;
572 
573   if (!gst_rtcp_packet_fb_set_fci_length (&packet, 4))
574   {
575     gst_rtcp_packet_remove (&packet);
576     goto done;
577   }
578 
579   if (!tfrc_receiver_send_feedback (src->receiver, now, &loss_event_rate,
580           &receive_rate))
581   {
582     gst_rtcp_packet_remove (&packet);
583     goto done;
584   }
585 
586   if (!data->have_ssrc)
587     g_object_get (data->self->rtpsession, "internal-ssrc", &data->ssrc, NULL);
588   data->have_ssrc = TRUE;
589 
590   /* draft-ietf-avt-tfrc-profile-10 defines the type as 2 */
591   gst_rtcp_packet_fb_set_type (&packet, 2);
592   gst_rtcp_packet_fb_set_sender_ssrc (&packet, data->ssrc);
593   gst_rtcp_packet_fb_set_media_ssrc (&packet, src->ssrc);
594   pdata = gst_rtcp_packet_fb_get_fci (&packet);
595 
596   GST_WRITE_UINT32_BE (pdata, src->last_ts);
597   GST_WRITE_UINT32_BE (pdata + 4, now - src->last_now);
598   GST_WRITE_UINT32_BE (pdata + 8, receive_rate);
599   GST_WRITE_UINT32_BE (pdata + 12, loss_event_rate * G_MAXUINT);
600 
601   GST_LOG_OBJECT (data->self, "Sending RTCP report last_ts: %d delay: %"
602       G_GINT64_FORMAT", x_recv: %d, rate: %f",
603       src->last_ts, now - src->last_now, receive_rate, loss_event_rate);
604 
605   src->send_feedback = FALSE;
606 
607   data->ret = TRUE;
608 
609 done:
610   fs_rtp_tfrc_set_receiver_timer_locked (data->self, src, now);
611 }
612 
613 static gboolean
rtpsession_sending_rtcp(GObject * rtpsession,GstBuffer * buffer,gboolean is_early,FsRtpTfrc * self)614 rtpsession_sending_rtcp (GObject *rtpsession, GstBuffer *buffer,
615     gboolean is_early, FsRtpTfrc *self)
616 {
617   struct SendingRtcpData data = {NULL, GST_RTCP_BUFFER_INIT};
618 
619   gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &data.rtcpbuffer);
620 
621   data.self = self;
622   data.ret = FALSE;
623   data.have_ssrc = FALSE;
624 
625 
626   GST_OBJECT_LOCK (self);
627   g_hash_table_foreach (self->tfrc_sources, tfrc_sources_process, &data);
628   GST_OBJECT_UNLOCK (self);
629 
630   gst_rtcp_buffer_unmap (&data.rtcpbuffer);
631 
632   /* Return TRUE if something was added */
633   return data.ret;
634 }
635 
636 static GstPadProbeReturn
incoming_rtp_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)637 incoming_rtp_probe (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
638 {
639   FsRtpTfrc *self = FS_RTP_TFRC (user_data);
640   GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
641   guint32 ssrc;
642   guint8 *data;
643   guint size;
644   gboolean got_header = FALSE;
645   struct TrackedSource *src = NULL;
646   guint32 rtt, seq;
647   gint64 ts_delta;
648   guint64 ts;
649   gboolean send_rtcp = FALSE;
650   guint64 now;
651   guint8 pt;
652   gint seq_delta;
653   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
654 
655   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
656     return GST_PAD_PROBE_OK;
657 
658   GST_OBJECT_LOCK (self);
659 
660   if (!self->fsrtpsession)
661     goto out_no_header_unmap;
662 
663   ssrc = gst_rtp_buffer_get_ssrc (&rtpbuffer);
664   pt = gst_rtp_buffer_get_payload_type (&rtpbuffer);
665   seq = gst_rtp_buffer_get_seq (&rtpbuffer);
666 
667   if (pt >= 128 || !self->pts[pt])
668     goto out_no_header_unmap;
669 
670   if (self->extension_type == EXTENSION_NONE)
671     goto out_no_header_unmap;
672   else if (self->extension_type == EXTENSION_ONE_BYTE)
673     got_header = gst_rtp_buffer_get_extension_onebyte_header (&rtpbuffer,
674         self->extension_id, 0, (gpointer *) &data, &size);
675   else if (self->extension_type == EXTENSION_TWO_BYTES)
676     got_header = gst_rtp_buffer_get_extension_twobytes_header (&rtpbuffer,
677         NULL, self->extension_id, 0, (gpointer *) &data, &size);
678 
679   gst_rtp_buffer_unmap (&rtpbuffer);
680 
681   src = fs_rtp_tfrc_get_remote_ssrc_locked (self, ssrc, NULL);
682 
683   if (src->rtpsource == NULL)
684   {
685     GST_WARNING_OBJECT (self, "Got packet from unconfirmed source %X ?", ssrc);
686     goto out;
687   }
688 
689   if (!got_header || size != 7)
690     goto out_no_header;
691 
692   src->got_nohdr_pkt = FALSE;
693 
694   now =  fs_rtp_tfrc_get_now (self);
695 
696   rtt = GST_READ_UINT24_BE (data);
697   ts = GST_READ_UINT32_BE (data + 3);
698 
699 
700   if (!src->receiver)
701   {
702     src->receiver = tfrc_receiver_new (now);
703   }
704   else if (rtt == 0 && src->last_rtt != 0)
705   {
706     /* Detect sender reset */
707     src->seq_cycles = 0;
708     src->last_seq = 0;
709     src->ts_cycles = 0;
710     src->last_now = 0;
711     src->last_rtt = 0;
712     tfrc_receiver_free (src->receiver);
713     src->receiver = tfrc_receiver_new (now);
714     if (src->receiver_id)
715     {
716       gst_clock_id_unschedule (src->receiver_id);
717       gst_clock_id_unref (src->receiver_id);
718       src->receiver_id = NULL;
719     }
720   }
721   seq_delta = seq - src->last_seq;
722 
723   if (seq < src->last_seq && seq_delta < -3000)
724     src->seq_cycles += 1 << 16;
725   src->last_seq = seq;
726   seq += src->seq_cycles;
727 
728   ts_delta = ts - src->last_ts;
729   /* We declare there has been a cycle if the difference is more than
730    * 5 minutes
731    */
732   if (ts < src->last_ts && ts_delta < -(5 * 60 * 1000 * 1000))
733     src->ts_cycles += ONE_32BIT_CYCLE;
734   src->last_ts = ts;
735   ts += src->ts_cycles;
736 
737   send_rtcp = tfrc_receiver_got_packet (src->receiver, ts, now, seq, rtt,
738       gst_rtp_buffer_get_packet_len (&rtpbuffer));
739 
740   GST_LOG_OBJECT (self, "Got RTP packet");
741 
742   if (rtt && src->last_rtt == 0)
743     fs_rtp_tfrc_receiver_timer_func_locked (self, src, now);
744 
745   src->last_now = now;
746   src->last_rtt = rtt;
747 
748 out:
749 
750   if (send_rtcp)
751   {
752     src->send_feedback = TRUE;
753     GST_OBJECT_UNLOCK (self);
754     g_signal_emit_by_name (src->self->rtpsession, "send-rtcp", (guint64) 0);
755   }
756   else
757   {
758     GST_OBJECT_UNLOCK (self);
759   }
760 
761   return GST_PAD_PROBE_OK;
762 
763 out_no_header_unmap:
764 
765   gst_rtp_buffer_unmap (&rtpbuffer);
766 
767 out_no_header:
768   if (src)
769     src->got_nohdr_pkt = TRUE;
770   goto out;
771 }
772 
773 static gboolean
no_feedback_timer_expired(GstClock * clock,GstClockTime time,GstClockID id,gpointer user_data)774 no_feedback_timer_expired (GstClock *clock, GstClockTime time, GstClockID id,
775   gpointer user_data)
776 {
777   struct TimerData *td = user_data;
778   struct TrackedSource *src;
779   guint64 now;
780   gboolean notify = FALSE;
781 
782   if (time == GST_CLOCK_TIME_NONE)
783     return FALSE;
784 
785   GST_OBJECT_LOCK (td->self);
786 
787   if (!td->self->sending)
788     goto out;
789 
790   src = g_hash_table_lookup (td->self->tfrc_sources,
791       GUINT_TO_POINTER (td->ssrc));
792 
793   if (!src)
794     goto out;
795 
796   if (src->sender_id != id)
797     goto out;
798 
799   now = fs_rtp_tfrc_get_now (td->self);
800 
801   fs_rtp_tfrc_update_sender_timer_locked (td->self, src, now);
802 
803   if (fs_rtp_tfrc_update_bitrate_locked (td->self, "tm"))
804     notify = TRUE;
805 
806 out:
807 
808   GST_OBJECT_UNLOCK (td->self);
809 
810   if (notify)
811     g_object_notify (G_OBJECT (td->self), "bitrate");
812 
813   return FALSE;
814 }
815 
816 static void
fs_rtp_tfrc_update_sender_timer_locked(FsRtpTfrc * self,struct TrackedSource * src,guint64 now)817 fs_rtp_tfrc_update_sender_timer_locked (FsRtpTfrc *self,
818     struct TrackedSource *src, guint64 now)
819 {
820   guint64 expiry;
821   GstClockReturn cret;
822 
823   if (src->sender_id)
824   {
825     gst_clock_id_unschedule (src->sender_id);
826     gst_clock_id_unref (src->sender_id);
827     src->sender_id = NULL;
828   }
829 
830   if (src->sender == NULL)
831     return;
832 
833   expiry = tfrc_sender_get_no_feedback_timer_expiry (src->sender);
834 
835   if (expiry <= now)
836   {
837     tfrc_sender_no_feedback_timer_expired (src->sender, now);
838     expiry = tfrc_sender_get_no_feedback_timer_expiry (src->sender);
839   }
840 
841   src->sender_id = gst_clock_new_single_shot_id (self->systemclock,
842       expiry * GST_USECOND);
843 
844   cret = gst_clock_id_wait_async (src->sender_id,
845       no_feedback_timer_expired, build_timer_data (self, src->ssrc),
846       free_timer_data);
847   if (cret != GST_CLOCK_OK)
848     GST_ERROR_OBJECT (self,
849         "Could not schedule feedback time for %" G_GUINT64_FORMAT
850         " (now %" G_GUINT64_FORMAT ") error: %d",
851         expiry, now, cret);
852 }
853 
854 static void
tracked_src_add_sender(struct TrackedSource * src,guint64 now,guint initial_rate)855 tracked_src_add_sender (struct TrackedSource *src, guint64 now,
856   guint initial_rate)
857 {
858   src->sender = tfrc_sender_new (1460, now, initial_rate);
859   src->idl = tfrc_is_data_limited_new (now);
860   src->send_ts_base = now;
861 }
862 
863 static GstPadProbeReturn
incoming_rtcp_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)864 incoming_rtcp_probe (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
865 {
866   FsRtpTfrc *self = FS_RTP_TFRC (user_data);
867   GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
868   GstRTCPBuffer rtcpbuffer = GST_RTCP_BUFFER_INIT;
869   GstRTCPPacket packet;
870   gboolean notify = FALSE;
871 
872   if (!gst_rtcp_buffer_validate (buffer))
873     return GST_PAD_PROBE_OK;
874 
875   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcpbuffer);
876 
877   if (!gst_rtcp_buffer_get_first_packet (&rtcpbuffer, &packet))
878     goto out;
879 
880   do {
881     if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_RTPFB &&
882         gst_rtcp_packet_fb_get_type (&packet) == 2 &&
883         gst_rtcp_packet_get_length (&packet) == 6)
884     {
885       /* We have a TFRC packet */
886       guint32 media_ssrc;
887       guint32 sender_ssrc;
888       guint64 ts;
889       guint32 delay;
890       guint32 x_recv;
891       gdouble loss_event_rate;
892       guint8 *buf = rtcpbuffer.map.data + packet.offset;
893       struct TrackedSource *src;
894       guint64 now;
895       guint64 rtt;
896       guint32 local_ssrc;
897       gboolean is_data_limited;
898 
899       media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (&packet);
900 
901       g_object_get (self->rtpsession, "internal-ssrc", &local_ssrc, NULL);
902 
903       if (media_ssrc != local_ssrc)
904         continue;
905 
906       sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (&packet);
907 
908       buf += 4 * 3; /* skip the header, ssrc of sender and media sender */
909 
910       ts = GST_READ_UINT32_BE (buf);
911       buf += 4;
912       delay = GST_READ_UINT32_BE (buf);
913       buf += 4;
914       x_recv = GST_READ_UINT32_BE (buf);
915       buf += 4;
916       loss_event_rate = (gdouble) GST_READ_UINT32_BE (buf) / (gdouble) G_MAXUINT;
917       GST_LOG_OBJECT (self, "Got RTCP TFRC packet last_sent_ts: %"
918           G_GUINT64_FORMAT " delay: %u x_recv: %u loss_event_rate: %f", ts,
919           delay, x_recv, loss_event_rate);
920 
921       GST_OBJECT_LOCK (self);
922 
923       if (!self->fsrtpsession || !self->sending)
924         goto done;
925 
926       src = fs_rtp_tfrc_get_remote_ssrc_locked (self, sender_ssrc,
927           NULL);
928 
929       now = fs_rtp_tfrc_get_now (self);
930 
931       if (G_UNLIKELY (!src->sender))
932         tracked_src_add_sender (src, now, self->send_bitrate);
933 
934       /* Make sure we only use the RTT from the most recent packets from
935        * the remote side, ignore anything that got delayed in between.
936        */
937       if (ts < src->fb_last_ts)
938       {
939         if (src->fb_ts_cycles + ONE_32BIT_CYCLE == src->send_ts_cycles)
940         {
941           src->fb_ts_cycles = src->send_ts_cycles;
942         }
943         else
944         {
945           GST_DEBUG_OBJECT (self, "Ignoring packet because the timestamp is "
946               "older than one that has already been received,"
947               " probably reordered.");
948           goto done;
949         }
950       }
951 
952       src->fb_last_ts = ts;
953       ts += src->fb_ts_cycles + src->send_ts_base;
954 
955       if (ts > now || now - ts < delay)
956       {
957         GST_ERROR_OBJECT (self, "Ignoring packet because ts > now ||"
958             " now - ts < delay (ts: %" G_GUINT64_FORMAT
959             " now: %" G_GUINT64_FORMAT " delay:%u",
960             ts, now, delay);
961         goto done;
962       }
963 
964       rtt = now - ts - delay;
965 
966       if (rtt == 0)
967         rtt = 1;
968 
969       if (rtt > 10 * 1000 * 1000)
970       {
971         GST_WARNING_OBJECT (self, "Impossible RTT %" G_GUINT64_FORMAT
972             " ms, ignoring", rtt);
973         goto done;
974       }
975 
976       GST_LOG_OBJECT (self, "rtt: %" G_GUINT64_FORMAT
977           " = now %" G_GUINT64_FORMAT
978           " - ts %"G_GUINT64_FORMAT" - delay %u",
979           rtt, now, ts, delay);
980 
981       if (G_UNLIKELY (tfrc_sender_get_averaged_rtt (src->sender) == 0))
982         tfrc_sender_on_first_rtt (src->sender, now);
983 
984       is_data_limited =
985           tfrc_is_data_limited_received_feedback (src->idl, now, ts,
986               tfrc_sender_get_averaged_rtt (src->sender));
987 
988       tfrc_sender_on_feedback_packet (src->sender, now, rtt, x_recv,
989           loss_event_rate, is_data_limited);
990 
991       fs_rtp_tfrc_update_sender_timer_locked (self, src, now);
992 
993       self->last_src = src;
994 
995       if (fs_rtp_tfrc_update_bitrate_locked (self, "fb"))
996         notify = TRUE;
997 
998     done:
999       GST_OBJECT_UNLOCK (self);
1000     }
1001   } while (gst_rtcp_packet_move_to_next (&packet));
1002 
1003   if (notify)
1004     g_object_notify (G_OBJECT (self), "bitrate");
1005 
1006 out:
1007 
1008   gst_rtcp_buffer_unmap (&rtcpbuffer);
1009 
1010   return GST_PAD_PROBE_OK;
1011 }
1012 
1013 static GstClockTime
fs_rtp_tfrc_get_sync_time(FsRtpPacketModder * modder,GstBuffer * buffer,gpointer user_data)1014 fs_rtp_tfrc_get_sync_time (FsRtpPacketModder *modder,
1015     GstBuffer *buffer, gpointer user_data)
1016 {
1017   FsRtpTfrc *self = FS_RTP_TFRC (user_data);
1018   GstClockTime sync_time = GST_BUFFER_TIMESTAMP (buffer);
1019   gint bytes_for_one_rtt = 0;
1020   guint size = 0;
1021   guint send_rate;
1022 
1023   GST_OBJECT_LOCK (self);
1024 
1025   if (self->extension_type == EXTENSION_NONE || !self->sending)
1026   {
1027     GST_OBJECT_UNLOCK (self);
1028     return GST_CLOCK_TIME_NONE;
1029   }
1030 
1031   if (self->last_src && self->last_src->sender)
1032   {
1033     send_rate = tfrc_sender_get_send_rate (self->last_src->sender);
1034     bytes_for_one_rtt = send_rate *
1035         tfrc_sender_get_averaged_rtt (self->last_src->sender);
1036   }
1037   else
1038   {
1039     send_rate = tfrc_sender_get_send_rate (NULL);
1040     bytes_for_one_rtt = 0;
1041   }
1042 
1043   size = gst_buffer_get_size (buffer) + 10;
1044 
1045   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer))
1046   {
1047     if (GST_CLOCK_TIME_IS_VALID (self->last_sent_ts) &&
1048         self->last_sent_ts < GST_BUFFER_TIMESTAMP (buffer))
1049       self->byte_reservoir +=
1050         gst_util_uint64_scale (
1051             (GST_BUFFER_TIMESTAMP (buffer) - self->last_sent_ts),
1052             send_rate,
1053             GST_SECOND);
1054     self->last_sent_ts = GST_BUFFER_TIMESTAMP (buffer);
1055 
1056     if (bytes_for_one_rtt &&
1057         self->byte_reservoir > bytes_for_one_rtt)
1058       self->byte_reservoir = bytes_for_one_rtt;
1059   }
1060 
1061   self->byte_reservoir -= size;
1062 
1063   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer) &&
1064       self->byte_reservoir < 0)
1065   {
1066     GstClockTimeDiff diff = 0;
1067 
1068     diff = gst_util_uint64_scale_int (GST_SECOND,
1069         -self->byte_reservoir, send_rate);
1070     g_assert (diff > 0);
1071 
1072 
1073     GST_LOG_OBJECT (self, "Delaying packet by %"GST_TIME_FORMAT
1074         " = 1sec * bytes %d / rate %u",
1075         GST_TIME_ARGS (diff), self->byte_reservoir,
1076         send_rate);
1077 
1078     GST_BUFFER_TIMESTAMP (buffer) += diff;
1079   }
1080 
1081   GST_OBJECT_UNLOCK (self);
1082 
1083 
1084   return sync_time;
1085 }
1086 
1087 
1088 static GstBuffer *
fs_rtp_tfrc_outgoing_packets(FsRtpPacketModder * modder,GstBuffer * buffer,GstClockTime buffer_ts,gpointer user_data)1089 fs_rtp_tfrc_outgoing_packets (FsRtpPacketModder *modder,
1090     GstBuffer *buffer, GstClockTime buffer_ts, gpointer user_data)
1091 {
1092   FsRtpTfrc *self = FS_RTP_TFRC (user_data);
1093   gchar data[7];
1094   guint64 now;
1095   GstBuffer *headerbuf;
1096   GstBuffer *newbuf;
1097   gboolean is_data_limited;
1098   gsize header_size;
1099   gsize new_header_size;
1100   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
1101 
1102   if (!GST_CLOCK_TIME_IS_VALID (buffer_ts))
1103     return buffer;
1104 
1105   GST_OBJECT_LOCK (self);
1106 
1107   if (!self->fsrtpsession || self->extension_type == EXTENSION_NONE ||
1108       !self->sending)
1109   {
1110     GST_OBJECT_UNLOCK (self);
1111     return buffer;
1112   }
1113 
1114   now = fs_rtp_tfrc_get_now (self);
1115 
1116   if (G_UNLIKELY (self->last_src == NULL))
1117     self->initial_src = self->last_src = tracked_src_new (self);
1118 
1119   if (G_UNLIKELY (self->last_src->sender == NULL))
1120   {
1121     tracked_src_add_sender (self->last_src, now, self->send_bitrate);
1122     fs_rtp_tfrc_update_sender_timer_locked (self, self->last_src, now);
1123   }
1124 
1125   GST_WRITE_UINT24_BE (data,
1126       tfrc_sender_get_averaged_rtt (self->last_src->sender));
1127   GST_WRITE_UINT32_BE (data+3, now - self->last_src->send_ts_base);
1128 
1129   if (now - self->last_src->send_ts_base > self->last_src->send_ts_cycles +
1130       ONE_32BIT_CYCLE)
1131     self->last_src->send_ts_cycles += ONE_32BIT_CYCLE;
1132 
1133   is_data_limited = (GST_BUFFER_PTS (buffer) == buffer_ts);
1134 
1135   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer);
1136   header_size = gst_rtp_buffer_get_header_len (&rtpbuffer);
1137   gst_rtp_buffer_unmap (&rtpbuffer);
1138 
1139   headerbuf = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, 0,
1140       header_size);
1141   headerbuf = gst_buffer_make_writable (headerbuf);
1142   gst_buffer_set_size (headerbuf, header_size + 16);
1143 
1144   gst_rtp_buffer_map (headerbuf, GST_MAP_READWRITE, &rtpbuffer);
1145 
1146   if (self->extension_type == EXTENSION_ONE_BYTE)
1147   {
1148     if (!gst_rtp_buffer_add_extension_onebyte_header (&rtpbuffer,
1149             self->extension_id, data, 7))
1150       GST_WARNING_OBJECT (self,
1151           "Could not add extension to RTP header buf %p", headerbuf);
1152   }
1153   else if (self->extension_type == EXTENSION_TWO_BYTES)
1154   {
1155     if (!gst_rtp_buffer_add_extension_twobytes_header (&rtpbuffer, 0,
1156             self->extension_id, data, 7))
1157       GST_WARNING_OBJECT (self,
1158           "Could not add extension to RTP header in list %p", headerbuf);
1159   }
1160 
1161   /* FIXME:
1162    * This will break if any padding is applied
1163    */
1164   new_header_size = gst_rtp_buffer_get_header_len (&rtpbuffer);
1165 
1166   gst_rtp_buffer_unmap (&rtpbuffer);
1167   gst_buffer_set_size (headerbuf, new_header_size);
1168 
1169   /* append_region eats a ref */
1170   gst_buffer_ref (buffer);
1171   newbuf = gst_buffer_append_region (headerbuf, buffer, header_size, -1);
1172 
1173   GST_LOG_OBJECT (self, "Sending RTP");
1174 
1175   if (g_hash_table_size (self->tfrc_sources))
1176   {
1177     GHashTableIter ht_iter;
1178     struct TrackedSource *src;
1179 
1180     g_hash_table_iter_init (&ht_iter, self->tfrc_sources);
1181 
1182     while (g_hash_table_iter_next (&ht_iter, NULL,
1183             (gpointer *) &src))
1184     {
1185       if (src->sender)
1186       {
1187         if (!is_data_limited)
1188           tfrc_is_data_limited_not_limited_now (src->idl, now);
1189         tfrc_sender_sending_packet (src->sender, gst_buffer_get_size (newbuf));
1190       }
1191     }
1192   }
1193   if (self->initial_src)
1194   {
1195     if (!is_data_limited)
1196       tfrc_is_data_limited_not_limited_now (self->initial_src->idl, now);
1197     tfrc_sender_sending_packet (self->initial_src->sender,
1198         gst_buffer_get_size (newbuf));
1199   }
1200 
1201 
1202   GST_OBJECT_UNLOCK (self);
1203 
1204   gst_buffer_unref (buffer);
1205 
1206   return newbuf;
1207 }
1208 
1209 static GstPadProbeReturn
send_rtp_pad_blocked(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)1210 send_rtp_pad_blocked (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1211 {
1212   FsRtpTfrc *self = user_data;
1213   gboolean need_modder;
1214   GstPad *peer = NULL;
1215 
1216   GST_OBJECT_LOCK (self);
1217   self->modder_check_probe_id = 0;
1218   need_modder = self->extension_type != EXTENSION_NONE;
1219 
1220   if (!self->fsrtpsession || !!self->packet_modder == need_modder)
1221     goto out;
1222 
1223   GST_DEBUG ("Pad blocked to possibly %s the tfrc packet modder",
1224       need_modder ? "add" : "remove");
1225 
1226   if (need_modder)
1227   {
1228     GstPadLinkReturn linkret;
1229     GstPad *modder_pad;
1230 
1231     self->packet_modder = GST_ELEMENT (fs_rtp_packet_modder_new (
1232           fs_rtp_tfrc_outgoing_packets, fs_rtp_tfrc_get_sync_time, self));
1233     g_object_ref (self->packet_modder);
1234 
1235     if (!gst_bin_add (self->parent_bin, self->packet_modder))
1236     {
1237       fs_session_emit_error (FS_SESSION (self->fsrtpsession),
1238           FS_ERROR_CONSTRUCTION,
1239           "Could not add tfrc packet modder to the pipeline");
1240       goto adding_failed;
1241     }
1242 
1243     peer = gst_pad_get_peer (pad);
1244     gst_pad_unlink (pad, peer);
1245 
1246     modder_pad = gst_element_get_static_pad (self->packet_modder, "src");
1247     linkret = gst_pad_link (modder_pad, peer);
1248     gst_object_unref (modder_pad);
1249     if (GST_PAD_LINK_FAILED (linkret))
1250     {
1251       fs_session_emit_error (FS_SESSION (self->fsrtpsession),
1252           FS_ERROR_CONSTRUCTION,
1253           "Could not link tfrc packet modder to rtp muxer");
1254       goto linking_failed;
1255     }
1256 
1257     modder_pad = gst_element_get_static_pad (self->packet_modder, "sink");
1258     linkret = gst_pad_link (pad, modder_pad);
1259     gst_object_unref (modder_pad);
1260     if (GST_PAD_LINK_FAILED (linkret))
1261     {
1262       fs_session_emit_error (FS_SESSION (self->fsrtpsession),
1263           FS_ERROR_CONSTRUCTION,
1264           "Could not link tfrc packet modder to the rtpbin");
1265       goto linking_failed;
1266     }
1267 
1268     if (gst_element_set_state (self->packet_modder, GST_STATE_PLAYING) ==
1269         GST_STATE_CHANGE_FAILURE)
1270     {
1271       fs_session_emit_error (FS_SESSION (self->fsrtpsession),
1272           FS_ERROR_CONSTRUCTION,
1273           "Could not set the TFRC packet modder to playing");
1274       goto linking_failed;
1275     }
1276   }
1277   else
1278   {
1279     GstPadLinkReturn linkret;
1280     GstPad *modder_src_pad;
1281 
1282     modder_src_pad = gst_element_get_static_pad (self->packet_modder, "src");
1283     peer = gst_pad_get_peer (modder_src_pad);
1284     gst_object_unref (modder_src_pad);
1285 
1286     gst_bin_remove (self->parent_bin, self->packet_modder);
1287     gst_element_set_state (self->packet_modder, GST_STATE_NULL);
1288     gst_object_unref (self->packet_modder);
1289     self->packet_modder = NULL;
1290 
1291     linkret = gst_pad_link (pad, peer);
1292     if (GST_PAD_LINK_FAILED (linkret))
1293       fs_session_emit_error (FS_SESSION (self->fsrtpsession),
1294           FS_ERROR_CONSTRUCTION,
1295           "Could not re-link after removing tfrc packet modder");
1296   }
1297 
1298 out:
1299   gst_object_unref (peer);
1300   GST_OBJECT_UNLOCK (self);
1301 
1302   return GST_PAD_PROBE_REMOVE;
1303 
1304 linking_failed:
1305   gst_bin_remove (self->parent_bin, self->packet_modder);
1306   gst_pad_link (pad, peer);
1307 adding_failed:
1308   gst_object_unref (self->packet_modder);
1309   self->packet_modder = NULL;
1310   goto out;
1311 }
1312 
1313 static void
fs_rtp_tfrc_check_modder_locked(FsRtpTfrc * self)1314 fs_rtp_tfrc_check_modder_locked (FsRtpTfrc *self)
1315 {
1316   gboolean need_modder;
1317 
1318   need_modder = self->extension_type != EXTENSION_NONE;
1319 
1320   if (!!self->packet_modder == need_modder)
1321     return;
1322 
1323   if (self->modder_check_probe_id != 0)
1324     return;
1325 
1326   self->modder_check_probe_id =
1327       gst_pad_add_probe (self->out_rtp_pad,
1328           GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
1329           send_rtp_pad_blocked,
1330           g_object_ref (self), (GDestroyNotify) g_object_unref);
1331 }
1332 
1333 
1334 FsRtpTfrc *
fs_rtp_tfrc_new(FsRtpSession * fsrtpsession)1335 fs_rtp_tfrc_new (FsRtpSession *fsrtpsession)
1336 {
1337   FsRtpTfrc *self;
1338   GstElement *rtpmuxer;
1339 
1340   g_return_val_if_fail (fsrtpsession, NULL);
1341 
1342   self = g_object_new (FS_TYPE_RTP_TFRC, NULL);
1343 
1344   self->fsrtpsession = fsrtpsession;
1345   self->sending = FALSE;
1346 
1347   self->rtpsession = fs_rtp_session_get_rtpbin_internal_session (fsrtpsession);
1348   self->parent_bin = GST_BIN (fs_rtp_session_get_conference (fsrtpsession));
1349   self->in_rtp_pad = fs_rtp_session_get_rtpbin_recv_rtp_sink (fsrtpsession);;
1350   self->in_rtcp_pad = fs_rtp_session_get_rtpbin_recv_rtcp_sink (fsrtpsession);;
1351 
1352   rtpmuxer = fs_rtp_session_get_rtpmuxer (fsrtpsession);
1353   self->out_rtp_pad = gst_element_get_static_pad (rtpmuxer, "src");
1354   gst_object_unref (rtpmuxer);
1355 
1356   self->in_rtp_probe_id = gst_pad_add_probe (self->in_rtp_pad,
1357       GST_PAD_PROBE_TYPE_BUFFER, incoming_rtp_probe,
1358       g_object_ref (self), (GDestroyNotify) g_object_unref);
1359   self->in_rtcp_probe_id = gst_pad_add_probe (self->in_rtcp_pad,
1360       GST_PAD_PROBE_TYPE_BUFFER, incoming_rtcp_probe,
1361       g_object_ref (self), (GDestroyNotify) g_object_unref);
1362 
1363 
1364   self->on_ssrc_validated_id = g_signal_connect_object (self->rtpsession,
1365       "on-ssrc-validated", G_CALLBACK (rtpsession_on_ssrc_validated), self, 0);
1366   self->on_sending_rtcp_id = g_signal_connect_object (self->rtpsession,
1367       "on-sending-rtcp", G_CALLBACK (rtpsession_sending_rtcp), self, 0);
1368 
1369   return self;
1370 }
1371 
1372 gboolean
validate_ca_for_tfrc(CodecAssociation * ca,gpointer user_data)1373 validate_ca_for_tfrc (CodecAssociation *ca, gpointer user_data)
1374 {
1375   return codec_association_is_valid_for_sending (ca, TRUE) &&
1376       fs_codec_get_feedback_parameter (ca->codec, "tfrc", "",  "");
1377 }
1378 
1379 void
fs_rtp_tfrc_filter_codecs(GList ** codec_associations,GList ** header_extensions)1380 fs_rtp_tfrc_filter_codecs (GList **codec_associations,
1381     GList **header_extensions)
1382 {
1383   gboolean has_header_ext = FALSE;
1384   gboolean has_codec_rtcpfb = FALSE;
1385   GList *item;
1386 
1387   has_codec_rtcpfb = !!lookup_codec_association_custom (*codec_associations,
1388       validate_ca_for_tfrc, NULL);
1389 
1390   for (item = *header_extensions; item;)
1391   {
1392     FsRtpHeaderExtension *hdrext = item->data;
1393     GList *next = item->next;
1394 
1395     if (!strcmp (hdrext->uri, "urn:ietf:params:rtp-hdrext:rtt-sendts"))
1396     {
1397       if (has_header_ext || !has_codec_rtcpfb)
1398       {
1399         GST_WARNING ("Removing rtt-sendts hdrext because matching tfrc"
1400             " feedback parameter not found or because rtp-hdrext"
1401             " is duplicated");
1402         fs_rtp_header_extension_destroy (item->data);
1403         *header_extensions = g_list_remove_link (*header_extensions, item);
1404       }
1405       else if (hdrext->direction == FS_DIRECTION_BOTH)
1406       {
1407         has_header_ext = TRUE;
1408       }
1409     }
1410     item = next;
1411   }
1412 
1413   if (!has_codec_rtcpfb || has_header_ext)
1414     return;
1415 
1416   for (item = *codec_associations; item; item = item->next)
1417   {
1418     CodecAssociation *ca = item->data;
1419     GList *item2;
1420 
1421     for (item2 = ca->codec->feedback_params; item2;)
1422     {
1423       GList *next2 = item2->next;
1424       FsFeedbackParameter *p = item2->data;
1425 
1426       if (!g_ascii_strcasecmp (p->type, "tfrc"))
1427       {
1428         GST_WARNING ("Removing tfrc from codec because no hdrext:rtt-sendts: "
1429             FS_CODEC_FORMAT, FS_CODEC_ARGS (ca->codec));
1430         fs_codec_remove_feedback_parameter (ca->codec, item2);
1431       }
1432 
1433       item2 = next2;
1434     }
1435   }
1436 
1437 }
1438 
1439 void
fs_rtp_tfrc_codecs_updated(FsRtpTfrc * self,GList * codec_associations,GList * header_extensions)1440 fs_rtp_tfrc_codecs_updated (FsRtpTfrc *self,
1441     GList *codec_associations,
1442     GList *header_extensions)
1443 {
1444   GList *item;
1445   FsRtpHeaderExtension *hdrext;
1446 
1447   GST_OBJECT_LOCK (self);
1448 
1449   memset (self->pts, 0, 128 * sizeof (gboolean));
1450   for (item = codec_associations; item; item = item->next)
1451   {
1452     CodecAssociation *ca = item->data;
1453 
1454     /* Also require nack/pli for tfrc to work, we really need to disable
1455      * automatic keyframes
1456      */
1457 
1458     if (fs_codec_get_feedback_parameter (ca->codec, "tfrc", NULL, NULL) &&
1459         fs_rtp_keyunit_manager_has_key_request_feedback (ca->codec))
1460     self->pts[ca->codec->id] = TRUE;
1461   }
1462 
1463   for (item = header_extensions; item; item = item->next)
1464   {
1465     hdrext = item->data;
1466     if (!strcmp (hdrext->uri, "urn:ietf:params:rtp-hdrext:rtt-sendts") &&
1467         hdrext->direction == FS_DIRECTION_BOTH)
1468       break;
1469   }
1470 
1471   if (!item)
1472   {
1473     self->extension_type = EXTENSION_NONE;
1474     goto out;
1475   }
1476 
1477   if (hdrext->id > 15)
1478     self->extension_type = EXTENSION_TWO_BYTES;
1479   else
1480     self->extension_type = EXTENSION_ONE_BYTE;
1481 
1482   self->extension_id = hdrext->id;
1483 
1484 out:
1485   fs_rtp_tfrc_check_modder_locked (self);
1486 
1487   GST_OBJECT_UNLOCK (self);
1488 }
1489 
1490 
1491 gboolean
fs_rtp_tfrc_is_enabled(FsRtpTfrc * self,guint pt)1492 fs_rtp_tfrc_is_enabled (FsRtpTfrc *self, guint pt)
1493 {
1494   gboolean is_enabled;
1495 
1496   g_return_val_if_fail (pt < 128, FALSE);
1497 
1498   GST_OBJECT_LOCK (self);
1499   is_enabled = (self->extension_type != EXTENSION_NONE) &&
1500       self->pts[pt];
1501   GST_OBJECT_UNLOCK (self);
1502 
1503   return is_enabled;
1504 }
1505