1 /*
2 * Farstream - Farstream RTP TFRC support
3 *
4 * Copyright 2010 Collabora Ltd.
5 * @author: Olivier Crete <olivier.crete@collabora.co.uk>
6 * Copyright 2010 Nokia Corp.
7 *
8 * fs-rtp-tfrc.c - Rate control for Farstream RTP sessions
9 *
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License as published by the Free Software Foundation; either
13 * version 2.1 of the License, or (at your option) any later version.
14 *
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with this library; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #ifdef HAVE_CONFIG_H
26 #include "config.h"
27 #endif
28
29 #include "fs-rtp-tfrc.h"
30
31 #include <string.h>
32
33 #include "fs-rtp-packet-modder.h"
34 #include "farstream/fs-rtp.h"
35 #include "fs-rtp-codec-negotiation.h"
36
37 #include <gst/rtp/gstrtpbuffer.h>
38 #include <gst/rtp/gstrtcpbuffer.h>
39
40 #define ONE_32BIT_CYCLE ((guint64) (((guint64)0xffffffff) + ((guint64)1)))
41
42
43 GST_DEBUG_CATEGORY_STATIC (fsrtpconference_tfrc);
44 #define GST_CAT_DEFAULT fsrtpconference_tfrc
45
46 G_DEFINE_TYPE (FsRtpTfrc, fs_rtp_tfrc, GST_TYPE_OBJECT);
47
48 /* props */
49 enum
50 {
51 PROP_0,
52 PROP_BITRATE,
53 PROP_SENDING
54 };
55
56 static void fs_rtp_tfrc_get_property (GObject *object,
57 guint prop_id,
58 GValue *value,
59 GParamSpec *pspec);
60 static void fs_rtp_tfrc_set_property (GObject *object,
61 guint prop_id,
62 const GValue *value,
63 GParamSpec *pspec);
64 static void fs_rtp_tfrc_dispose (GObject *object);
65
66 static void fs_rtp_tfrc_update_sender_timer_locked (
67 FsRtpTfrc *self,
68 struct TrackedSource *src,
69 guint64 now);
70
71 static gboolean feedback_timer_expired (GstClock *clock, GstClockTime time,
72 GstClockID id, gpointer user_data);
73
74 static void fs_rtp_tfrc_clear_sender (FsRtpTfrc *self);
75
76 static void
fs_rtp_tfrc_class_init(FsRtpTfrcClass * klass)77 fs_rtp_tfrc_class_init (FsRtpTfrcClass *klass)
78 {
79 GObjectClass *gobject_class;
80
81 gobject_class = (GObjectClass *) klass;
82
83 gobject_class->get_property = fs_rtp_tfrc_get_property;
84 gobject_class->set_property = fs_rtp_tfrc_set_property;
85 gobject_class->dispose = fs_rtp_tfrc_dispose;
86
87 g_object_class_install_property (gobject_class,
88 PROP_BITRATE,
89 g_param_spec_uint ("bitrate",
90 "The bitrate at which data should be sent",
91 "The bitrate that the session should try to send at in bits/sec",
92 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
93
94 g_object_class_install_property (gobject_class,
95 PROP_SENDING,
96 g_param_spec_boolean ("sending",
97 "The bitrate at which data should be sent",
98 "The bitrate that the session should try to send at in bits/sec",
99 FALSE, G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
100 }
101
102
103 static struct TrackedSource *
tracked_src_new(FsRtpTfrc * self)104 tracked_src_new (FsRtpTfrc *self)
105 {
106 struct TrackedSource *src;
107
108 src = g_slice_new0 (struct TrackedSource);
109 src->self = self;
110 src->next_feedback_timer = G_MAXUINT64;
111
112 return src;
113 }
114
115 static void
tracked_src_free(struct TrackedSource * src)116 tracked_src_free (struct TrackedSource *src)
117 {
118 if (src->sender_id)
119 {
120 gst_clock_id_unschedule (src->sender_id);
121 gst_clock_id_unref (src->sender_id);
122 }
123
124 if (src->receiver_id)
125 {
126 gst_clock_id_unschedule (src->receiver_id);
127 gst_clock_id_unref (src->receiver_id);
128 }
129
130 if (src->rtpsource)
131 g_object_unref (src->rtpsource);
132
133 if (src->sender)
134 tfrc_sender_free (src->sender);
135 if (src->receiver)
136 tfrc_receiver_free (src->receiver);
137
138 if (src->idl)
139 tfrc_is_data_limited_free (src->idl);
140
141 g_slice_free (struct TrackedSource, src);
142 }
143
144 static void
fs_rtp_tfrc_init(FsRtpTfrc * self)145 fs_rtp_tfrc_init (FsRtpTfrc *self)
146 {
147 GST_DEBUG_CATEGORY_INIT (fsrtpconference_tfrc,
148 "fsrtpconference_tfrc", 0,
149 "Farstream RTP Conference Element Rate Control logic");
150
151 /* member init */
152
153 self->tfrc_sources = g_hash_table_new_full (g_direct_hash,
154 g_direct_equal, NULL, (GDestroyNotify) tracked_src_free);
155
156 fs_rtp_tfrc_clear_sender (self);
157 self->send_bitrate = tfrc_sender_get_send_rate (NULL) * 8;
158
159 self->extension_type = EXTENSION_NONE;
160 self->extension_id = 0;
161 memset (self->pts, 0, 128 * sizeof (gboolean));
162
163 self->systemclock = gst_system_clock_obtain ();
164 }
165
166 void
fs_rtp_tfrc_destroy(FsRtpTfrc * self)167 fs_rtp_tfrc_destroy (FsRtpTfrc *self)
168 {
169 GST_OBJECT_LOCK (self);
170
171 if (self->modder_check_probe_id)
172 gst_pad_remove_probe (self->in_rtp_pad, self->modder_check_probe_id);
173 self->modder_check_probe_id = 0;
174
175 if (self->in_rtp_probe_id)
176 gst_pad_remove_probe (self->in_rtp_pad, self->in_rtp_probe_id);
177 self->in_rtp_probe_id = 0;
178 if (self->in_rtcp_probe_id)
179 gst_pad_remove_probe (self->in_rtcp_pad, self->in_rtcp_probe_id);
180 self->in_rtcp_probe_id = 0;
181
182
183 if (self->on_ssrc_validated_id)
184 g_signal_handler_disconnect (self->rtpsession, self->on_ssrc_validated_id);
185 self->on_ssrc_validated_id = 0;
186 if (self->on_sending_rtcp_id)
187 g_signal_handler_disconnect (self->rtpsession, self->on_sending_rtcp_id);
188 self->on_sending_rtcp_id = 0;
189
190 g_hash_table_destroy (g_hash_table_ref (self->tfrc_sources));
191
192 self->fsrtpsession = NULL;
193
194 GST_OBJECT_UNLOCK (self);
195 }
196
197 static void
fs_rtp_tfrc_dispose(GObject * object)198 fs_rtp_tfrc_dispose (GObject *object)
199 {
200 FsRtpTfrc *self = FS_RTP_TFRC (object);
201
202 GST_OBJECT_LOCK (self);
203
204 if (self->tfrc_sources)
205 g_hash_table_destroy (self->tfrc_sources);
206 self->tfrc_sources = NULL;
207 self->last_src = NULL;
208
209 if (self->initial_src)
210 tracked_src_free (self->initial_src);
211 self->initial_src = NULL;
212
213 if (self->packet_modder)
214 {
215 gst_bin_remove (self->parent_bin, self->packet_modder);
216 gst_element_set_state (self->packet_modder, GST_STATE_NULL);
217 g_object_unref (self->packet_modder);
218 }
219
220 if (self->rtpsession)
221 g_object_unref (self->rtpsession);
222 if (self->in_rtp_pad)
223 g_object_unref (self->in_rtp_pad);
224 if (self->in_rtcp_pad)
225 g_object_unref (self->in_rtcp_pad);
226 if (self->out_rtp_pad)
227 g_object_unref (self->out_rtp_pad);
228
229 if (self->parent_bin)
230 gst_object_unref (self->parent_bin);
231
232 gst_object_unref (self->systemclock);
233 self->systemclock = NULL;
234
235 GST_OBJECT_UNLOCK (self);
236
237 if (G_OBJECT_CLASS (fs_rtp_tfrc_parent_class)->dispose)
238 G_OBJECT_CLASS (fs_rtp_tfrc_parent_class)->dispose (object);
239 }
240
241
242 static void
fs_rtp_tfrc_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)243 fs_rtp_tfrc_get_property (GObject *object,
244 guint prop_id,
245 GValue *value,
246 GParamSpec *pspec)
247 {
248 FsRtpTfrc *self = FS_RTP_TFRC (object);
249
250 switch (prop_id)
251 {
252 case PROP_BITRATE:
253 GST_OBJECT_LOCK (self);
254 g_value_set_uint (value, self->send_bitrate);
255 GST_OBJECT_UNLOCK (self);
256 break;
257 default:
258 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
259 break;
260 }
261 }
262
263 gboolean
clear_sender(gpointer key,gpointer value,gpointer user_data)264 clear_sender (gpointer key, gpointer value, gpointer user_data)
265 {
266 FsRtpTfrc *self = FS_RTP_TFRC (user_data);
267 struct TrackedSource *src = value;
268
269 src->send_ts_base = 0;
270 src->send_ts_cycles = 0;
271 src->fb_last_ts = 0;
272 src->fb_ts_cycles = 0;
273
274 if (src->sender_id)
275 {
276 gst_clock_id_unschedule (src->sender_id);
277 gst_clock_id_unref (src->sender_id);
278 src->sender_id = 0;
279 }
280
281 if (src->sender)
282 tfrc_sender_free (src->sender);
283 src->sender = NULL;
284
285 if (src->idl)
286 {
287 tfrc_is_data_limited_free (src->idl);
288 src->idl = NULL;
289 }
290
291 if (self->last_src == src)
292 self->last_src = NULL;
293
294 if (src->receiver)
295 return FALSE;
296 else
297 return TRUE;
298 }
299
300 static void
fs_rtp_tfrc_clear_sender(FsRtpTfrc * self)301 fs_rtp_tfrc_clear_sender (FsRtpTfrc *self)
302 {
303 g_hash_table_foreach_remove (self->tfrc_sources, clear_sender, self);
304 if (self->initial_src)
305 if (clear_sender (NULL, self->initial_src, self))
306 self->initial_src = NULL;
307
308 self->last_sent_ts = GST_CLOCK_TIME_NONE;
309 self->byte_reservoir = 1500; /* About one packet */
310 }
311
312 static void
fs_rtp_tfrc_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)313 fs_rtp_tfrc_set_property (GObject *object,
314 guint prop_id,
315 const GValue *value,
316 GParamSpec *pspec)
317 {
318 FsRtpTfrc *self = FS_RTP_TFRC (object);
319
320 switch (prop_id)
321 {
322 case PROP_SENDING:
323 GST_OBJECT_LOCK (self);
324 self->sending = g_value_get_boolean (value);
325 if (!self->sending)
326 fs_rtp_tfrc_clear_sender (self);
327 GST_OBJECT_UNLOCK (self);
328 break;
329 default:
330 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
331 break;
332 }
333 }
334
335
336 static gboolean
fs_rtp_tfrc_update_bitrate_locked(FsRtpTfrc * self,const gchar * source)337 fs_rtp_tfrc_update_bitrate_locked (FsRtpTfrc *self, const gchar *source)
338 {
339 guint byterate;
340 guint new_bitrate;
341 gboolean ret;
342
343 if (self->last_src && self->last_src->sender)
344 byterate = tfrc_sender_get_send_rate (self->last_src->sender);
345 else
346 byterate = tfrc_sender_get_send_rate (NULL);
347
348 if (G_LIKELY (byterate < G_MAXUINT / 8))
349 new_bitrate = byterate * 8;
350 else
351 new_bitrate = G_MAXUINT;
352
353 ret = self->send_bitrate != new_bitrate;
354
355 if (ret)
356 GST_DEBUG_OBJECT (self, "Send rate changed (%s): %u -> %u", source,
357 self->send_bitrate, new_bitrate);
358
359 self->send_bitrate = new_bitrate;
360
361 return ret;
362 }
363
364 static guint64
fs_rtp_tfrc_get_now(FsRtpTfrc * self)365 fs_rtp_tfrc_get_now (FsRtpTfrc *self)
366 {
367 return GST_TIME_AS_USECONDS (gst_clock_get_time (self->systemclock));
368 }
369
370
371 static struct TrackedSource *
fs_rtp_tfrc_get_remote_ssrc_locked(FsRtpTfrc * self,guint ssrc,GObject * rtpsource)372 fs_rtp_tfrc_get_remote_ssrc_locked (FsRtpTfrc *self, guint ssrc,
373 GObject *rtpsource)
374 {
375 struct TrackedSource *src;
376
377 src = g_hash_table_lookup (self->tfrc_sources, GUINT_TO_POINTER (ssrc));
378
379 if (G_LIKELY (src))
380 {
381 if (G_UNLIKELY (rtpsource && !src->rtpsource))
382 src->rtpsource = g_object_ref (rtpsource);
383
384 return src;
385 }
386
387 if (self->initial_src)
388 {
389 src = self->initial_src;
390 self->initial_src = NULL;
391 src->ssrc = ssrc;
392 if (rtpsource && !src->rtpsource)
393 src->rtpsource = g_object_ref (rtpsource);
394 g_hash_table_insert (self->tfrc_sources, GUINT_TO_POINTER (ssrc), src);
395 return src;
396 }
397
398 src = tracked_src_new (self);
399 src->ssrc = ssrc;
400 if (rtpsource)
401 src->rtpsource = g_object_ref (rtpsource);
402
403 if (!self->last_src)
404 self->last_src = src;
405
406 g_hash_table_insert (self->tfrc_sources, GUINT_TO_POINTER (ssrc), src);
407
408 return src;
409 }
410
411 static void
rtpsession_on_ssrc_validated(GObject * rtpsession,GObject * rtpsource,FsRtpTfrc * self)412 rtpsession_on_ssrc_validated (GObject *rtpsession, GObject *rtpsource,
413 FsRtpTfrc *self)
414 {
415 guint32 ssrc;
416
417 g_object_get (rtpsource, "ssrc", &ssrc, NULL);
418
419 GST_DEBUG_OBJECT (self, "ssrc validate: %X", ssrc);
420
421 GST_OBJECT_LOCK (self);
422 fs_rtp_tfrc_get_remote_ssrc_locked (self, ssrc, rtpsource);
423 GST_OBJECT_UNLOCK (self);
424 }
425
426 struct TimerData
427 {
428 FsRtpTfrc *self;
429 guint ssrc;
430 };
431
432 static struct TimerData *
build_timer_data(FsRtpTfrc * self,guint ssrc)433 build_timer_data (FsRtpTfrc *self, guint ssrc)
434 {
435 struct TimerData *td = g_slice_new0 (struct TimerData);
436
437 td->self = g_object_ref (self);
438 td->ssrc = ssrc;
439
440 return td;
441 }
442
443 static void
free_timer_data(gpointer data)444 free_timer_data (gpointer data)
445 {
446 struct TimerData *td = data;
447 g_object_unref (td->self);
448 g_slice_free (struct TimerData, td);
449 }
450
451 static void
fs_rtp_tfrc_set_receiver_timer_locked(FsRtpTfrc * self,struct TrackedSource * src,guint64 now)452 fs_rtp_tfrc_set_receiver_timer_locked (FsRtpTfrc *self,
453 struct TrackedSource *src, guint64 now)
454 {
455 guint64 expiry = tfrc_receiver_get_feedback_timer_expiry (src->receiver);
456 GstClockReturn cret;
457
458 if (expiry == 0)
459 return;
460
461 if (src->receiver_id)
462 {
463 if (src->next_feedback_timer <= expiry)
464 return;
465
466 gst_clock_id_unschedule (src->receiver_id);
467 gst_clock_id_unref (src->receiver_id);
468 src->receiver_id = NULL;
469 }
470 src->next_feedback_timer = expiry;
471
472 g_assert (expiry != now);
473
474 src->receiver_id = gst_clock_new_single_shot_id (self->systemclock,
475 expiry * GST_USECOND);
476
477 cret = gst_clock_id_wait_async (src->receiver_id, feedback_timer_expired,
478 build_timer_data (self, src->ssrc), free_timer_data);
479 if (cret != GST_CLOCK_OK)
480 GST_ERROR_OBJECT (self,
481 "Could not schedule feedback time for %" G_GUINT64_FORMAT
482 " (now %" G_GUINT64_FORMAT ") error: %d", expiry, now, cret);
483 }
484
485 static void
fs_rtp_tfrc_receiver_timer_func_locked(FsRtpTfrc * self,struct TrackedSource * src,guint64 now)486 fs_rtp_tfrc_receiver_timer_func_locked (FsRtpTfrc *self,
487 struct TrackedSource *src, guint64 now)
488 {
489 guint64 expiry;
490
491 if (src->receiver_id)
492 {
493 gst_clock_id_unschedule (src->receiver_id);
494 gst_clock_id_unref (src->receiver_id);
495 src->receiver_id = NULL;
496 }
497
498 expiry = tfrc_receiver_get_feedback_timer_expiry (src->receiver);
499
500 if (expiry <= now &&
501 tfrc_receiver_feedback_timer_expired (src->receiver, now))
502 {
503 src->send_feedback = TRUE;
504 g_signal_emit_by_name (self->rtpsession, "send-rtcp", (guint64) 0);
505 }
506 else
507 {
508 fs_rtp_tfrc_set_receiver_timer_locked (self, src, now);
509 }
510 }
511
512 static gboolean
feedback_timer_expired(GstClock * clock,GstClockTime time,GstClockID id,gpointer user_data)513 feedback_timer_expired (GstClock *clock, GstClockTime time, GstClockID id,
514 gpointer user_data)
515 {
516 struct TimerData *td = user_data;
517 struct TrackedSource *src;
518 guint64 now;
519
520 if (time == GST_CLOCK_TIME_NONE)
521 return FALSE;
522
523 GST_OBJECT_LOCK (td->self);
524
525 src = g_hash_table_lookup (td->self->tfrc_sources,
526 GUINT_TO_POINTER (td->ssrc));
527
528 now = fs_rtp_tfrc_get_now (td->self);
529
530 if (G_LIKELY (src && src->receiver_id == id))
531 fs_rtp_tfrc_receiver_timer_func_locked (td->self, src, now);
532
533 GST_OBJECT_UNLOCK (td->self);
534
535 return FALSE;
536 }
537
538
539 struct SendingRtcpData {
540 FsRtpTfrc *self;
541 GstRTCPBuffer rtcpbuffer;
542 gboolean ret;
543 guint32 ssrc;
544 gboolean have_ssrc;
545 };
546
547 static void
tfrc_sources_process(gpointer key,gpointer value,gpointer user_data)548 tfrc_sources_process (gpointer key, gpointer value, gpointer user_data)
549 {
550 struct SendingRtcpData *data = user_data;
551 struct TrackedSource *src = value;
552 GstRTCPPacket packet;
553 guint8 *pdata;
554 guint64 now;
555 gdouble loss_event_rate;
556 guint receive_rate;
557
558 if (!src->receiver)
559 return;
560
561 if (src->got_nohdr_pkt)
562 return;
563
564 now = fs_rtp_tfrc_get_now (data->self);
565
566 if (!src->send_feedback)
567 goto done;
568
569 if (!gst_rtcp_buffer_add_packet (&data->rtcpbuffer, GST_RTCP_TYPE_RTPFB,
570 &packet))
571 goto done;
572
573 if (!gst_rtcp_packet_fb_set_fci_length (&packet, 4))
574 {
575 gst_rtcp_packet_remove (&packet);
576 goto done;
577 }
578
579 if (!tfrc_receiver_send_feedback (src->receiver, now, &loss_event_rate,
580 &receive_rate))
581 {
582 gst_rtcp_packet_remove (&packet);
583 goto done;
584 }
585
586 if (!data->have_ssrc)
587 g_object_get (data->self->rtpsession, "internal-ssrc", &data->ssrc, NULL);
588 data->have_ssrc = TRUE;
589
590 /* draft-ietf-avt-tfrc-profile-10 defines the type as 2 */
591 gst_rtcp_packet_fb_set_type (&packet, 2);
592 gst_rtcp_packet_fb_set_sender_ssrc (&packet, data->ssrc);
593 gst_rtcp_packet_fb_set_media_ssrc (&packet, src->ssrc);
594 pdata = gst_rtcp_packet_fb_get_fci (&packet);
595
596 GST_WRITE_UINT32_BE (pdata, src->last_ts);
597 GST_WRITE_UINT32_BE (pdata + 4, now - src->last_now);
598 GST_WRITE_UINT32_BE (pdata + 8, receive_rate);
599 GST_WRITE_UINT32_BE (pdata + 12, loss_event_rate * G_MAXUINT);
600
601 GST_LOG_OBJECT (data->self, "Sending RTCP report last_ts: %d delay: %"
602 G_GINT64_FORMAT", x_recv: %d, rate: %f",
603 src->last_ts, now - src->last_now, receive_rate, loss_event_rate);
604
605 src->send_feedback = FALSE;
606
607 data->ret = TRUE;
608
609 done:
610 fs_rtp_tfrc_set_receiver_timer_locked (data->self, src, now);
611 }
612
613 static gboolean
rtpsession_sending_rtcp(GObject * rtpsession,GstBuffer * buffer,gboolean is_early,FsRtpTfrc * self)614 rtpsession_sending_rtcp (GObject *rtpsession, GstBuffer *buffer,
615 gboolean is_early, FsRtpTfrc *self)
616 {
617 struct SendingRtcpData data = {NULL, GST_RTCP_BUFFER_INIT};
618
619 gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &data.rtcpbuffer);
620
621 data.self = self;
622 data.ret = FALSE;
623 data.have_ssrc = FALSE;
624
625
626 GST_OBJECT_LOCK (self);
627 g_hash_table_foreach (self->tfrc_sources, tfrc_sources_process, &data);
628 GST_OBJECT_UNLOCK (self);
629
630 gst_rtcp_buffer_unmap (&data.rtcpbuffer);
631
632 /* Return TRUE if something was added */
633 return data.ret;
634 }
635
636 static GstPadProbeReturn
incoming_rtp_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)637 incoming_rtp_probe (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
638 {
639 FsRtpTfrc *self = FS_RTP_TFRC (user_data);
640 GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
641 guint32 ssrc;
642 guint8 *data;
643 guint size;
644 gboolean got_header = FALSE;
645 struct TrackedSource *src = NULL;
646 guint32 rtt, seq;
647 gint64 ts_delta;
648 guint64 ts;
649 gboolean send_rtcp = FALSE;
650 guint64 now;
651 guint8 pt;
652 gint seq_delta;
653 GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
654
655 if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
656 return GST_PAD_PROBE_OK;
657
658 GST_OBJECT_LOCK (self);
659
660 if (!self->fsrtpsession)
661 goto out_no_header_unmap;
662
663 ssrc = gst_rtp_buffer_get_ssrc (&rtpbuffer);
664 pt = gst_rtp_buffer_get_payload_type (&rtpbuffer);
665 seq = gst_rtp_buffer_get_seq (&rtpbuffer);
666
667 if (pt >= 128 || !self->pts[pt])
668 goto out_no_header_unmap;
669
670 if (self->extension_type == EXTENSION_NONE)
671 goto out_no_header_unmap;
672 else if (self->extension_type == EXTENSION_ONE_BYTE)
673 got_header = gst_rtp_buffer_get_extension_onebyte_header (&rtpbuffer,
674 self->extension_id, 0, (gpointer *) &data, &size);
675 else if (self->extension_type == EXTENSION_TWO_BYTES)
676 got_header = gst_rtp_buffer_get_extension_twobytes_header (&rtpbuffer,
677 NULL, self->extension_id, 0, (gpointer *) &data, &size);
678
679 gst_rtp_buffer_unmap (&rtpbuffer);
680
681 src = fs_rtp_tfrc_get_remote_ssrc_locked (self, ssrc, NULL);
682
683 if (src->rtpsource == NULL)
684 {
685 GST_WARNING_OBJECT (self, "Got packet from unconfirmed source %X ?", ssrc);
686 goto out;
687 }
688
689 if (!got_header || size != 7)
690 goto out_no_header;
691
692 src->got_nohdr_pkt = FALSE;
693
694 now = fs_rtp_tfrc_get_now (self);
695
696 rtt = GST_READ_UINT24_BE (data);
697 ts = GST_READ_UINT32_BE (data + 3);
698
699
700 if (!src->receiver)
701 {
702 src->receiver = tfrc_receiver_new (now);
703 }
704 else if (rtt == 0 && src->last_rtt != 0)
705 {
706 /* Detect sender reset */
707 src->seq_cycles = 0;
708 src->last_seq = 0;
709 src->ts_cycles = 0;
710 src->last_now = 0;
711 src->last_rtt = 0;
712 tfrc_receiver_free (src->receiver);
713 src->receiver = tfrc_receiver_new (now);
714 if (src->receiver_id)
715 {
716 gst_clock_id_unschedule (src->receiver_id);
717 gst_clock_id_unref (src->receiver_id);
718 src->receiver_id = NULL;
719 }
720 }
721 seq_delta = seq - src->last_seq;
722
723 if (seq < src->last_seq && seq_delta < -3000)
724 src->seq_cycles += 1 << 16;
725 src->last_seq = seq;
726 seq += src->seq_cycles;
727
728 ts_delta = ts - src->last_ts;
729 /* We declare there has been a cycle if the difference is more than
730 * 5 minutes
731 */
732 if (ts < src->last_ts && ts_delta < -(5 * 60 * 1000 * 1000))
733 src->ts_cycles += ONE_32BIT_CYCLE;
734 src->last_ts = ts;
735 ts += src->ts_cycles;
736
737 send_rtcp = tfrc_receiver_got_packet (src->receiver, ts, now, seq, rtt,
738 gst_rtp_buffer_get_packet_len (&rtpbuffer));
739
740 GST_LOG_OBJECT (self, "Got RTP packet");
741
742 if (rtt && src->last_rtt == 0)
743 fs_rtp_tfrc_receiver_timer_func_locked (self, src, now);
744
745 src->last_now = now;
746 src->last_rtt = rtt;
747
748 out:
749
750 if (send_rtcp)
751 {
752 src->send_feedback = TRUE;
753 GST_OBJECT_UNLOCK (self);
754 g_signal_emit_by_name (src->self->rtpsession, "send-rtcp", (guint64) 0);
755 }
756 else
757 {
758 GST_OBJECT_UNLOCK (self);
759 }
760
761 return GST_PAD_PROBE_OK;
762
763 out_no_header_unmap:
764
765 gst_rtp_buffer_unmap (&rtpbuffer);
766
767 out_no_header:
768 if (src)
769 src->got_nohdr_pkt = TRUE;
770 goto out;
771 }
772
773 static gboolean
no_feedback_timer_expired(GstClock * clock,GstClockTime time,GstClockID id,gpointer user_data)774 no_feedback_timer_expired (GstClock *clock, GstClockTime time, GstClockID id,
775 gpointer user_data)
776 {
777 struct TimerData *td = user_data;
778 struct TrackedSource *src;
779 guint64 now;
780 gboolean notify = FALSE;
781
782 if (time == GST_CLOCK_TIME_NONE)
783 return FALSE;
784
785 GST_OBJECT_LOCK (td->self);
786
787 if (!td->self->sending)
788 goto out;
789
790 src = g_hash_table_lookup (td->self->tfrc_sources,
791 GUINT_TO_POINTER (td->ssrc));
792
793 if (!src)
794 goto out;
795
796 if (src->sender_id != id)
797 goto out;
798
799 now = fs_rtp_tfrc_get_now (td->self);
800
801 fs_rtp_tfrc_update_sender_timer_locked (td->self, src, now);
802
803 if (fs_rtp_tfrc_update_bitrate_locked (td->self, "tm"))
804 notify = TRUE;
805
806 out:
807
808 GST_OBJECT_UNLOCK (td->self);
809
810 if (notify)
811 g_object_notify (G_OBJECT (td->self), "bitrate");
812
813 return FALSE;
814 }
815
816 static void
fs_rtp_tfrc_update_sender_timer_locked(FsRtpTfrc * self,struct TrackedSource * src,guint64 now)817 fs_rtp_tfrc_update_sender_timer_locked (FsRtpTfrc *self,
818 struct TrackedSource *src, guint64 now)
819 {
820 guint64 expiry;
821 GstClockReturn cret;
822
823 if (src->sender_id)
824 {
825 gst_clock_id_unschedule (src->sender_id);
826 gst_clock_id_unref (src->sender_id);
827 src->sender_id = NULL;
828 }
829
830 if (src->sender == NULL)
831 return;
832
833 expiry = tfrc_sender_get_no_feedback_timer_expiry (src->sender);
834
835 if (expiry <= now)
836 {
837 tfrc_sender_no_feedback_timer_expired (src->sender, now);
838 expiry = tfrc_sender_get_no_feedback_timer_expiry (src->sender);
839 }
840
841 src->sender_id = gst_clock_new_single_shot_id (self->systemclock,
842 expiry * GST_USECOND);
843
844 cret = gst_clock_id_wait_async (src->sender_id,
845 no_feedback_timer_expired, build_timer_data (self, src->ssrc),
846 free_timer_data);
847 if (cret != GST_CLOCK_OK)
848 GST_ERROR_OBJECT (self,
849 "Could not schedule feedback time for %" G_GUINT64_FORMAT
850 " (now %" G_GUINT64_FORMAT ") error: %d",
851 expiry, now, cret);
852 }
853
854 static void
tracked_src_add_sender(struct TrackedSource * src,guint64 now,guint initial_rate)855 tracked_src_add_sender (struct TrackedSource *src, guint64 now,
856 guint initial_rate)
857 {
858 src->sender = tfrc_sender_new (1460, now, initial_rate);
859 src->idl = tfrc_is_data_limited_new (now);
860 src->send_ts_base = now;
861 }
862
863 static GstPadProbeReturn
incoming_rtcp_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)864 incoming_rtcp_probe (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
865 {
866 FsRtpTfrc *self = FS_RTP_TFRC (user_data);
867 GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
868 GstRTCPBuffer rtcpbuffer = GST_RTCP_BUFFER_INIT;
869 GstRTCPPacket packet;
870 gboolean notify = FALSE;
871
872 if (!gst_rtcp_buffer_validate (buffer))
873 return GST_PAD_PROBE_OK;
874
875 gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcpbuffer);
876
877 if (!gst_rtcp_buffer_get_first_packet (&rtcpbuffer, &packet))
878 goto out;
879
880 do {
881 if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_RTPFB &&
882 gst_rtcp_packet_fb_get_type (&packet) == 2 &&
883 gst_rtcp_packet_get_length (&packet) == 6)
884 {
885 /* We have a TFRC packet */
886 guint32 media_ssrc;
887 guint32 sender_ssrc;
888 guint64 ts;
889 guint32 delay;
890 guint32 x_recv;
891 gdouble loss_event_rate;
892 guint8 *buf = rtcpbuffer.map.data + packet.offset;
893 struct TrackedSource *src;
894 guint64 now;
895 guint64 rtt;
896 guint32 local_ssrc;
897 gboolean is_data_limited;
898
899 media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (&packet);
900
901 g_object_get (self->rtpsession, "internal-ssrc", &local_ssrc, NULL);
902
903 if (media_ssrc != local_ssrc)
904 continue;
905
906 sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (&packet);
907
908 buf += 4 * 3; /* skip the header, ssrc of sender and media sender */
909
910 ts = GST_READ_UINT32_BE (buf);
911 buf += 4;
912 delay = GST_READ_UINT32_BE (buf);
913 buf += 4;
914 x_recv = GST_READ_UINT32_BE (buf);
915 buf += 4;
916 loss_event_rate = (gdouble) GST_READ_UINT32_BE (buf) / (gdouble) G_MAXUINT;
917 GST_LOG_OBJECT (self, "Got RTCP TFRC packet last_sent_ts: %"
918 G_GUINT64_FORMAT " delay: %u x_recv: %u loss_event_rate: %f", ts,
919 delay, x_recv, loss_event_rate);
920
921 GST_OBJECT_LOCK (self);
922
923 if (!self->fsrtpsession || !self->sending)
924 goto done;
925
926 src = fs_rtp_tfrc_get_remote_ssrc_locked (self, sender_ssrc,
927 NULL);
928
929 now = fs_rtp_tfrc_get_now (self);
930
931 if (G_UNLIKELY (!src->sender))
932 tracked_src_add_sender (src, now, self->send_bitrate);
933
934 /* Make sure we only use the RTT from the most recent packets from
935 * the remote side, ignore anything that got delayed in between.
936 */
937 if (ts < src->fb_last_ts)
938 {
939 if (src->fb_ts_cycles + ONE_32BIT_CYCLE == src->send_ts_cycles)
940 {
941 src->fb_ts_cycles = src->send_ts_cycles;
942 }
943 else
944 {
945 GST_DEBUG_OBJECT (self, "Ignoring packet because the timestamp is "
946 "older than one that has already been received,"
947 " probably reordered.");
948 goto done;
949 }
950 }
951
952 src->fb_last_ts = ts;
953 ts += src->fb_ts_cycles + src->send_ts_base;
954
955 if (ts > now || now - ts < delay)
956 {
957 GST_ERROR_OBJECT (self, "Ignoring packet because ts > now ||"
958 " now - ts < delay (ts: %" G_GUINT64_FORMAT
959 " now: %" G_GUINT64_FORMAT " delay:%u",
960 ts, now, delay);
961 goto done;
962 }
963
964 rtt = now - ts - delay;
965
966 if (rtt == 0)
967 rtt = 1;
968
969 if (rtt > 10 * 1000 * 1000)
970 {
971 GST_WARNING_OBJECT (self, "Impossible RTT %" G_GUINT64_FORMAT
972 " ms, ignoring", rtt);
973 goto done;
974 }
975
976 GST_LOG_OBJECT (self, "rtt: %" G_GUINT64_FORMAT
977 " = now %" G_GUINT64_FORMAT
978 " - ts %"G_GUINT64_FORMAT" - delay %u",
979 rtt, now, ts, delay);
980
981 if (G_UNLIKELY (tfrc_sender_get_averaged_rtt (src->sender) == 0))
982 tfrc_sender_on_first_rtt (src->sender, now);
983
984 is_data_limited =
985 tfrc_is_data_limited_received_feedback (src->idl, now, ts,
986 tfrc_sender_get_averaged_rtt (src->sender));
987
988 tfrc_sender_on_feedback_packet (src->sender, now, rtt, x_recv,
989 loss_event_rate, is_data_limited);
990
991 fs_rtp_tfrc_update_sender_timer_locked (self, src, now);
992
993 self->last_src = src;
994
995 if (fs_rtp_tfrc_update_bitrate_locked (self, "fb"))
996 notify = TRUE;
997
998 done:
999 GST_OBJECT_UNLOCK (self);
1000 }
1001 } while (gst_rtcp_packet_move_to_next (&packet));
1002
1003 if (notify)
1004 g_object_notify (G_OBJECT (self), "bitrate");
1005
1006 out:
1007
1008 gst_rtcp_buffer_unmap (&rtcpbuffer);
1009
1010 return GST_PAD_PROBE_OK;
1011 }
1012
1013 static GstClockTime
fs_rtp_tfrc_get_sync_time(FsRtpPacketModder * modder,GstBuffer * buffer,gpointer user_data)1014 fs_rtp_tfrc_get_sync_time (FsRtpPacketModder *modder,
1015 GstBuffer *buffer, gpointer user_data)
1016 {
1017 FsRtpTfrc *self = FS_RTP_TFRC (user_data);
1018 GstClockTime sync_time = GST_BUFFER_TIMESTAMP (buffer);
1019 gint bytes_for_one_rtt = 0;
1020 guint size = 0;
1021 guint send_rate;
1022
1023 GST_OBJECT_LOCK (self);
1024
1025 if (self->extension_type == EXTENSION_NONE || !self->sending)
1026 {
1027 GST_OBJECT_UNLOCK (self);
1028 return GST_CLOCK_TIME_NONE;
1029 }
1030
1031 if (self->last_src && self->last_src->sender)
1032 {
1033 send_rate = tfrc_sender_get_send_rate (self->last_src->sender);
1034 bytes_for_one_rtt = send_rate *
1035 tfrc_sender_get_averaged_rtt (self->last_src->sender);
1036 }
1037 else
1038 {
1039 send_rate = tfrc_sender_get_send_rate (NULL);
1040 bytes_for_one_rtt = 0;
1041 }
1042
1043 size = gst_buffer_get_size (buffer) + 10;
1044
1045 if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer))
1046 {
1047 if (GST_CLOCK_TIME_IS_VALID (self->last_sent_ts) &&
1048 self->last_sent_ts < GST_BUFFER_TIMESTAMP (buffer))
1049 self->byte_reservoir +=
1050 gst_util_uint64_scale (
1051 (GST_BUFFER_TIMESTAMP (buffer) - self->last_sent_ts),
1052 send_rate,
1053 GST_SECOND);
1054 self->last_sent_ts = GST_BUFFER_TIMESTAMP (buffer);
1055
1056 if (bytes_for_one_rtt &&
1057 self->byte_reservoir > bytes_for_one_rtt)
1058 self->byte_reservoir = bytes_for_one_rtt;
1059 }
1060
1061 self->byte_reservoir -= size;
1062
1063 if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer) &&
1064 self->byte_reservoir < 0)
1065 {
1066 GstClockTimeDiff diff = 0;
1067
1068 diff = gst_util_uint64_scale_int (GST_SECOND,
1069 -self->byte_reservoir, send_rate);
1070 g_assert (diff > 0);
1071
1072
1073 GST_LOG_OBJECT (self, "Delaying packet by %"GST_TIME_FORMAT
1074 " = 1sec * bytes %d / rate %u",
1075 GST_TIME_ARGS (diff), self->byte_reservoir,
1076 send_rate);
1077
1078 GST_BUFFER_TIMESTAMP (buffer) += diff;
1079 }
1080
1081 GST_OBJECT_UNLOCK (self);
1082
1083
1084 return sync_time;
1085 }
1086
1087
1088 static GstBuffer *
fs_rtp_tfrc_outgoing_packets(FsRtpPacketModder * modder,GstBuffer * buffer,GstClockTime buffer_ts,gpointer user_data)1089 fs_rtp_tfrc_outgoing_packets (FsRtpPacketModder *modder,
1090 GstBuffer *buffer, GstClockTime buffer_ts, gpointer user_data)
1091 {
1092 FsRtpTfrc *self = FS_RTP_TFRC (user_data);
1093 gchar data[7];
1094 guint64 now;
1095 GstBuffer *headerbuf;
1096 GstBuffer *newbuf;
1097 gboolean is_data_limited;
1098 gsize header_size;
1099 gsize new_header_size;
1100 GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
1101
1102 if (!GST_CLOCK_TIME_IS_VALID (buffer_ts))
1103 return buffer;
1104
1105 GST_OBJECT_LOCK (self);
1106
1107 if (!self->fsrtpsession || self->extension_type == EXTENSION_NONE ||
1108 !self->sending)
1109 {
1110 GST_OBJECT_UNLOCK (self);
1111 return buffer;
1112 }
1113
1114 now = fs_rtp_tfrc_get_now (self);
1115
1116 if (G_UNLIKELY (self->last_src == NULL))
1117 self->initial_src = self->last_src = tracked_src_new (self);
1118
1119 if (G_UNLIKELY (self->last_src->sender == NULL))
1120 {
1121 tracked_src_add_sender (self->last_src, now, self->send_bitrate);
1122 fs_rtp_tfrc_update_sender_timer_locked (self, self->last_src, now);
1123 }
1124
1125 GST_WRITE_UINT24_BE (data,
1126 tfrc_sender_get_averaged_rtt (self->last_src->sender));
1127 GST_WRITE_UINT32_BE (data+3, now - self->last_src->send_ts_base);
1128
1129 if (now - self->last_src->send_ts_base > self->last_src->send_ts_cycles +
1130 ONE_32BIT_CYCLE)
1131 self->last_src->send_ts_cycles += ONE_32BIT_CYCLE;
1132
1133 is_data_limited = (GST_BUFFER_PTS (buffer) == buffer_ts);
1134
1135 gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer);
1136 header_size = gst_rtp_buffer_get_header_len (&rtpbuffer);
1137 gst_rtp_buffer_unmap (&rtpbuffer);
1138
1139 headerbuf = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, 0,
1140 header_size);
1141 headerbuf = gst_buffer_make_writable (headerbuf);
1142 gst_buffer_set_size (headerbuf, header_size + 16);
1143
1144 gst_rtp_buffer_map (headerbuf, GST_MAP_READWRITE, &rtpbuffer);
1145
1146 if (self->extension_type == EXTENSION_ONE_BYTE)
1147 {
1148 if (!gst_rtp_buffer_add_extension_onebyte_header (&rtpbuffer,
1149 self->extension_id, data, 7))
1150 GST_WARNING_OBJECT (self,
1151 "Could not add extension to RTP header buf %p", headerbuf);
1152 }
1153 else if (self->extension_type == EXTENSION_TWO_BYTES)
1154 {
1155 if (!gst_rtp_buffer_add_extension_twobytes_header (&rtpbuffer, 0,
1156 self->extension_id, data, 7))
1157 GST_WARNING_OBJECT (self,
1158 "Could not add extension to RTP header in list %p", headerbuf);
1159 }
1160
1161 /* FIXME:
1162 * This will break if any padding is applied
1163 */
1164 new_header_size = gst_rtp_buffer_get_header_len (&rtpbuffer);
1165
1166 gst_rtp_buffer_unmap (&rtpbuffer);
1167 gst_buffer_set_size (headerbuf, new_header_size);
1168
1169 /* append_region eats a ref */
1170 gst_buffer_ref (buffer);
1171 newbuf = gst_buffer_append_region (headerbuf, buffer, header_size, -1);
1172
1173 GST_LOG_OBJECT (self, "Sending RTP");
1174
1175 if (g_hash_table_size (self->tfrc_sources))
1176 {
1177 GHashTableIter ht_iter;
1178 struct TrackedSource *src;
1179
1180 g_hash_table_iter_init (&ht_iter, self->tfrc_sources);
1181
1182 while (g_hash_table_iter_next (&ht_iter, NULL,
1183 (gpointer *) &src))
1184 {
1185 if (src->sender)
1186 {
1187 if (!is_data_limited)
1188 tfrc_is_data_limited_not_limited_now (src->idl, now);
1189 tfrc_sender_sending_packet (src->sender, gst_buffer_get_size (newbuf));
1190 }
1191 }
1192 }
1193 if (self->initial_src)
1194 {
1195 if (!is_data_limited)
1196 tfrc_is_data_limited_not_limited_now (self->initial_src->idl, now);
1197 tfrc_sender_sending_packet (self->initial_src->sender,
1198 gst_buffer_get_size (newbuf));
1199 }
1200
1201
1202 GST_OBJECT_UNLOCK (self);
1203
1204 gst_buffer_unref (buffer);
1205
1206 return newbuf;
1207 }
1208
1209 static GstPadProbeReturn
send_rtp_pad_blocked(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)1210 send_rtp_pad_blocked (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1211 {
1212 FsRtpTfrc *self = user_data;
1213 gboolean need_modder;
1214 GstPad *peer = NULL;
1215
1216 GST_OBJECT_LOCK (self);
1217 self->modder_check_probe_id = 0;
1218 need_modder = self->extension_type != EXTENSION_NONE;
1219
1220 if (!self->fsrtpsession || !!self->packet_modder == need_modder)
1221 goto out;
1222
1223 GST_DEBUG ("Pad blocked to possibly %s the tfrc packet modder",
1224 need_modder ? "add" : "remove");
1225
1226 if (need_modder)
1227 {
1228 GstPadLinkReturn linkret;
1229 GstPad *modder_pad;
1230
1231 self->packet_modder = GST_ELEMENT (fs_rtp_packet_modder_new (
1232 fs_rtp_tfrc_outgoing_packets, fs_rtp_tfrc_get_sync_time, self));
1233 g_object_ref (self->packet_modder);
1234
1235 if (!gst_bin_add (self->parent_bin, self->packet_modder))
1236 {
1237 fs_session_emit_error (FS_SESSION (self->fsrtpsession),
1238 FS_ERROR_CONSTRUCTION,
1239 "Could not add tfrc packet modder to the pipeline");
1240 goto adding_failed;
1241 }
1242
1243 peer = gst_pad_get_peer (pad);
1244 gst_pad_unlink (pad, peer);
1245
1246 modder_pad = gst_element_get_static_pad (self->packet_modder, "src");
1247 linkret = gst_pad_link (modder_pad, peer);
1248 gst_object_unref (modder_pad);
1249 if (GST_PAD_LINK_FAILED (linkret))
1250 {
1251 fs_session_emit_error (FS_SESSION (self->fsrtpsession),
1252 FS_ERROR_CONSTRUCTION,
1253 "Could not link tfrc packet modder to rtp muxer");
1254 goto linking_failed;
1255 }
1256
1257 modder_pad = gst_element_get_static_pad (self->packet_modder, "sink");
1258 linkret = gst_pad_link (pad, modder_pad);
1259 gst_object_unref (modder_pad);
1260 if (GST_PAD_LINK_FAILED (linkret))
1261 {
1262 fs_session_emit_error (FS_SESSION (self->fsrtpsession),
1263 FS_ERROR_CONSTRUCTION,
1264 "Could not link tfrc packet modder to the rtpbin");
1265 goto linking_failed;
1266 }
1267
1268 if (gst_element_set_state (self->packet_modder, GST_STATE_PLAYING) ==
1269 GST_STATE_CHANGE_FAILURE)
1270 {
1271 fs_session_emit_error (FS_SESSION (self->fsrtpsession),
1272 FS_ERROR_CONSTRUCTION,
1273 "Could not set the TFRC packet modder to playing");
1274 goto linking_failed;
1275 }
1276 }
1277 else
1278 {
1279 GstPadLinkReturn linkret;
1280 GstPad *modder_src_pad;
1281
1282 modder_src_pad = gst_element_get_static_pad (self->packet_modder, "src");
1283 peer = gst_pad_get_peer (modder_src_pad);
1284 gst_object_unref (modder_src_pad);
1285
1286 gst_bin_remove (self->parent_bin, self->packet_modder);
1287 gst_element_set_state (self->packet_modder, GST_STATE_NULL);
1288 gst_object_unref (self->packet_modder);
1289 self->packet_modder = NULL;
1290
1291 linkret = gst_pad_link (pad, peer);
1292 if (GST_PAD_LINK_FAILED (linkret))
1293 fs_session_emit_error (FS_SESSION (self->fsrtpsession),
1294 FS_ERROR_CONSTRUCTION,
1295 "Could not re-link after removing tfrc packet modder");
1296 }
1297
1298 out:
1299 gst_object_unref (peer);
1300 GST_OBJECT_UNLOCK (self);
1301
1302 return GST_PAD_PROBE_REMOVE;
1303
1304 linking_failed:
1305 gst_bin_remove (self->parent_bin, self->packet_modder);
1306 gst_pad_link (pad, peer);
1307 adding_failed:
1308 gst_object_unref (self->packet_modder);
1309 self->packet_modder = NULL;
1310 goto out;
1311 }
1312
1313 static void
fs_rtp_tfrc_check_modder_locked(FsRtpTfrc * self)1314 fs_rtp_tfrc_check_modder_locked (FsRtpTfrc *self)
1315 {
1316 gboolean need_modder;
1317
1318 need_modder = self->extension_type != EXTENSION_NONE;
1319
1320 if (!!self->packet_modder == need_modder)
1321 return;
1322
1323 if (self->modder_check_probe_id != 0)
1324 return;
1325
1326 self->modder_check_probe_id =
1327 gst_pad_add_probe (self->out_rtp_pad,
1328 GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
1329 send_rtp_pad_blocked,
1330 g_object_ref (self), (GDestroyNotify) g_object_unref);
1331 }
1332
1333
1334 FsRtpTfrc *
fs_rtp_tfrc_new(FsRtpSession * fsrtpsession)1335 fs_rtp_tfrc_new (FsRtpSession *fsrtpsession)
1336 {
1337 FsRtpTfrc *self;
1338 GstElement *rtpmuxer;
1339
1340 g_return_val_if_fail (fsrtpsession, NULL);
1341
1342 self = g_object_new (FS_TYPE_RTP_TFRC, NULL);
1343
1344 self->fsrtpsession = fsrtpsession;
1345 self->sending = FALSE;
1346
1347 self->rtpsession = fs_rtp_session_get_rtpbin_internal_session (fsrtpsession);
1348 self->parent_bin = GST_BIN (fs_rtp_session_get_conference (fsrtpsession));
1349 self->in_rtp_pad = fs_rtp_session_get_rtpbin_recv_rtp_sink (fsrtpsession);;
1350 self->in_rtcp_pad = fs_rtp_session_get_rtpbin_recv_rtcp_sink (fsrtpsession);;
1351
1352 rtpmuxer = fs_rtp_session_get_rtpmuxer (fsrtpsession);
1353 self->out_rtp_pad = gst_element_get_static_pad (rtpmuxer, "src");
1354 gst_object_unref (rtpmuxer);
1355
1356 self->in_rtp_probe_id = gst_pad_add_probe (self->in_rtp_pad,
1357 GST_PAD_PROBE_TYPE_BUFFER, incoming_rtp_probe,
1358 g_object_ref (self), (GDestroyNotify) g_object_unref);
1359 self->in_rtcp_probe_id = gst_pad_add_probe (self->in_rtcp_pad,
1360 GST_PAD_PROBE_TYPE_BUFFER, incoming_rtcp_probe,
1361 g_object_ref (self), (GDestroyNotify) g_object_unref);
1362
1363
1364 self->on_ssrc_validated_id = g_signal_connect_object (self->rtpsession,
1365 "on-ssrc-validated", G_CALLBACK (rtpsession_on_ssrc_validated), self, 0);
1366 self->on_sending_rtcp_id = g_signal_connect_object (self->rtpsession,
1367 "on-sending-rtcp", G_CALLBACK (rtpsession_sending_rtcp), self, 0);
1368
1369 return self;
1370 }
1371
1372 gboolean
validate_ca_for_tfrc(CodecAssociation * ca,gpointer user_data)1373 validate_ca_for_tfrc (CodecAssociation *ca, gpointer user_data)
1374 {
1375 return codec_association_is_valid_for_sending (ca, TRUE) &&
1376 fs_codec_get_feedback_parameter (ca->codec, "tfrc", "", "");
1377 }
1378
1379 void
fs_rtp_tfrc_filter_codecs(GList ** codec_associations,GList ** header_extensions)1380 fs_rtp_tfrc_filter_codecs (GList **codec_associations,
1381 GList **header_extensions)
1382 {
1383 gboolean has_header_ext = FALSE;
1384 gboolean has_codec_rtcpfb = FALSE;
1385 GList *item;
1386
1387 has_codec_rtcpfb = !!lookup_codec_association_custom (*codec_associations,
1388 validate_ca_for_tfrc, NULL);
1389
1390 for (item = *header_extensions; item;)
1391 {
1392 FsRtpHeaderExtension *hdrext = item->data;
1393 GList *next = item->next;
1394
1395 if (!strcmp (hdrext->uri, "urn:ietf:params:rtp-hdrext:rtt-sendts"))
1396 {
1397 if (has_header_ext || !has_codec_rtcpfb)
1398 {
1399 GST_WARNING ("Removing rtt-sendts hdrext because matching tfrc"
1400 " feedback parameter not found or because rtp-hdrext"
1401 " is duplicated");
1402 fs_rtp_header_extension_destroy (item->data);
1403 *header_extensions = g_list_remove_link (*header_extensions, item);
1404 }
1405 else if (hdrext->direction == FS_DIRECTION_BOTH)
1406 {
1407 has_header_ext = TRUE;
1408 }
1409 }
1410 item = next;
1411 }
1412
1413 if (!has_codec_rtcpfb || has_header_ext)
1414 return;
1415
1416 for (item = *codec_associations; item; item = item->next)
1417 {
1418 CodecAssociation *ca = item->data;
1419 GList *item2;
1420
1421 for (item2 = ca->codec->feedback_params; item2;)
1422 {
1423 GList *next2 = item2->next;
1424 FsFeedbackParameter *p = item2->data;
1425
1426 if (!g_ascii_strcasecmp (p->type, "tfrc"))
1427 {
1428 GST_WARNING ("Removing tfrc from codec because no hdrext:rtt-sendts: "
1429 FS_CODEC_FORMAT, FS_CODEC_ARGS (ca->codec));
1430 fs_codec_remove_feedback_parameter (ca->codec, item2);
1431 }
1432
1433 item2 = next2;
1434 }
1435 }
1436
1437 }
1438
1439 void
fs_rtp_tfrc_codecs_updated(FsRtpTfrc * self,GList * codec_associations,GList * header_extensions)1440 fs_rtp_tfrc_codecs_updated (FsRtpTfrc *self,
1441 GList *codec_associations,
1442 GList *header_extensions)
1443 {
1444 GList *item;
1445 FsRtpHeaderExtension *hdrext;
1446
1447 GST_OBJECT_LOCK (self);
1448
1449 memset (self->pts, 0, 128 * sizeof (gboolean));
1450 for (item = codec_associations; item; item = item->next)
1451 {
1452 CodecAssociation *ca = item->data;
1453
1454 /* Also require nack/pli for tfrc to work, we really need to disable
1455 * automatic keyframes
1456 */
1457
1458 if (fs_codec_get_feedback_parameter (ca->codec, "tfrc", NULL, NULL) &&
1459 fs_rtp_keyunit_manager_has_key_request_feedback (ca->codec))
1460 self->pts[ca->codec->id] = TRUE;
1461 }
1462
1463 for (item = header_extensions; item; item = item->next)
1464 {
1465 hdrext = item->data;
1466 if (!strcmp (hdrext->uri, "urn:ietf:params:rtp-hdrext:rtt-sendts") &&
1467 hdrext->direction == FS_DIRECTION_BOTH)
1468 break;
1469 }
1470
1471 if (!item)
1472 {
1473 self->extension_type = EXTENSION_NONE;
1474 goto out;
1475 }
1476
1477 if (hdrext->id > 15)
1478 self->extension_type = EXTENSION_TWO_BYTES;
1479 else
1480 self->extension_type = EXTENSION_ONE_BYTE;
1481
1482 self->extension_id = hdrext->id;
1483
1484 out:
1485 fs_rtp_tfrc_check_modder_locked (self);
1486
1487 GST_OBJECT_UNLOCK (self);
1488 }
1489
1490
1491 gboolean
fs_rtp_tfrc_is_enabled(FsRtpTfrc * self,guint pt)1492 fs_rtp_tfrc_is_enabled (FsRtpTfrc *self, guint pt)
1493 {
1494 gboolean is_enabled;
1495
1496 g_return_val_if_fail (pt < 128, FALSE);
1497
1498 GST_OBJECT_LOCK (self);
1499 is_enabled = (self->extension_type != EXTENSION_NONE) &&
1500 self->pts[pt];
1501 GST_OBJECT_UNLOCK (self);
1502
1503 return is_enabled;
1504 }
1505