1 /*
2 * Farstream - Farstream TFRC implementation
3 *
4 * Copyright 2010 Collabora Ltd.
5 * @author: Olivier Crete <olivier.crete@collabora.co.uk>
6 * Copyright 2010 Nokia Corp.
7 *
8 * tfrc.c - An implemention of TCP Friendly rate control, RFCs 5348 and 4828
9 *
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License as published by the Free Software Foundation; either
13 * version 2.1 of the License, or (at your option) any later version.
14 *
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with this library; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "tfrc.h"
26
27 #include <math.h>
28 #include <string.h>
29
30 /* for gst_util_uint64_scale_round */
31 #include <gst/gst.h>
32
33 /*
34 * ALL TIMES ARE IN MICROSECONDS
35 * bitrates are in bytes/sec
36 */
37
38 #if 0
39 #define DEBUG_RECEIVER(receiver, format, ...) \
40 g_debug ("TFRC-R (%p): " format , receiver, __VA_ARGS__)
41 #else
42 #define DEBUG_RECEIVER(...)
43 #endif
44
45 #if 0
46 #define DEBUG_SENDER(sender, format, ...) \
47 g_debug ("TFRC-S (%p): " format, sender, __VA_ARGS__)
48 #else
49 #define DEBUG_SENDER(...)
50 #endif
51
52 #define DEFAULT_MSS 1460
53
54 #define SECOND (1000 * 1000)
55
56 #define MIN_NOFEEDBACK_TIMER (20 * 1000)
57
58 /*
59 * @s: segment size in bytes
60 * @R: RTT in milli seconds (instead of seconds)
61 * @p: loss event per packet transmitted
62 *
63 * s
64 * X_Bps = -----------------------------------------------
65 * R * (sqrt(2*p/3) + 12*sqrt(3*p/8)*p*(1+32*p^2))
66 *
67 * Returns: The bitrate in bytes/s
68 */
69 static gdouble
calculate_bitrate(gdouble s,gdouble R,gdouble p)70 calculate_bitrate (gdouble s, gdouble R, gdouble p)
71 {
72 gdouble f = sqrt (2 * p / 3) + 12 * sqrt (3 * p / 8) * p * (1 + 32 * p * p);
73
74 return (SECOND * s) / (R * f);
75 }
76
77 #define RECEIVE_RATE_HISTORY_SIZE (4)
78
79 /* Specified in RFC 4828 Section 3 second bullet */
80 #define HEADER_SIZE (40)
81
82 struct ReceiveRateItem {
83 guint64 timestamp;
84 guint rate;
85 };
86
87 struct _TfrcSender {
88 guint computed_rate; /* The rate computer from the TCP throughput equation */
89
90 gboolean sp;
91 guint average_packet_size; /* 16 times larger */
92 gboolean use_inst_rate; /* use inst_rate instead of rate */
93
94 guint mss; /* max segment size */
95 guint rate; /* maximum allowed sending rate in bytes/sec */
96 guint inst_rate; /* corrected maximum allowed sending rate */
97 guint averaged_rtt;
98 guint sqmean_rtt;
99 guint last_sqrt_rtt;
100 guint64 tld; /* Time Last Doubled during slow-start */
101
102 guint64 nofeedback_timer_expiry;
103
104 guint retransmission_timeout; /* RTO */
105
106 struct ReceiveRateItem receive_rate_history[RECEIVE_RATE_HISTORY_SIZE];
107
108 gdouble last_loss_event_rate;
109
110 gboolean sent_packet;
111 };
112
113 TfrcSender *
tfrc_sender_new(guint segment_size,guint64 now,guint initial_rate)114 tfrc_sender_new (guint segment_size, guint64 now, guint initial_rate)
115 {
116 TfrcSender *sender = g_slice_new0 (TfrcSender);
117
118 /* initialized as described in RFC 5348 4.2 */
119 sender->use_inst_rate = TRUE;
120 sender->mss = DEFAULT_MSS;
121 sender->average_packet_size = segment_size << 4;
122 if (initial_rate)
123 sender->rate = initial_rate;
124 else
125 sender->rate = segment_size;
126
127 sender->retransmission_timeout = 2 * SECOND;
128 sender->nofeedback_timer_expiry = now + sender->retransmission_timeout;
129 return sender;
130 }
131
132 TfrcSender *
tfrc_sender_new_sp(guint64 now,guint initial_average_packet_size)133 tfrc_sender_new_sp (guint64 now, guint initial_average_packet_size)
134 {
135 TfrcSender *sender = tfrc_sender_new (1460, now, 0);
136
137 sender->sp = TRUE;
138
139 sender->average_packet_size = initial_average_packet_size << 4;
140
141 return sender;
142 }
143
144 void
tfrc_sender_use_inst_rate(TfrcSender * sender,gboolean use_inst_rate)145 tfrc_sender_use_inst_rate (TfrcSender *sender, gboolean use_inst_rate)
146 {
147 sender->use_inst_rate = use_inst_rate;
148 }
149
150
151 void
tfrc_sender_free(TfrcSender * sender)152 tfrc_sender_free (TfrcSender *sender)
153 {
154 g_slice_free (TfrcSender, sender);
155 }
156
157 static guint
sender_get_segment_size(TfrcSender * sender)158 sender_get_segment_size (TfrcSender *sender)
159 {
160 if (sender->sp)
161 return sender->mss;
162 else
163 return sender->average_packet_size >> 4;
164 }
165
166 void
tfrc_sender_on_first_rtt(TfrcSender * sender,guint64 now)167 tfrc_sender_on_first_rtt (TfrcSender *sender, guint64 now)
168 {
169 sender->receive_rate_history[0].rate = G_MAXUINT;
170 sender->receive_rate_history[0].timestamp = now;
171 }
172
get_max_receive_rate(TfrcSender * sender,gboolean ignore_max_uint)173 static guint get_max_receive_rate (TfrcSender *sender, gboolean ignore_max_uint)
174 {
175 guint max_rate = 0;
176 guint i;
177
178 for (i = 0; i < RECEIVE_RATE_HISTORY_SIZE; i++)
179 {
180 if (G_UNLIKELY (sender->receive_rate_history[i].rate == G_MAXUINT))
181 {
182 if (ignore_max_uint)
183 return max_rate;
184 else
185 return G_MAXUINT;
186 }
187 max_rate = MAX (max_rate, sender->receive_rate_history[i].rate);
188 }
189
190 return max_rate;
191 }
192
193 static void
add_to_receive_rate_history(TfrcSender * sender,guint receive_rate,guint64 now)194 add_to_receive_rate_history (TfrcSender *sender, guint receive_rate,
195 guint64 now)
196 {
197 int i;
198
199 for (i = RECEIVE_RATE_HISTORY_SIZE - 1; i > 0; i --)
200 sender->receive_rate_history[i] = sender->receive_rate_history[i-1];
201
202 sender->receive_rate_history[0].rate = receive_rate;
203 sender->receive_rate_history[0].timestamp = now;
204 }
205
206 static guint
maximize_receive_rate_history(TfrcSender * sender,guint receive_rate,guint64 now)207 maximize_receive_rate_history (TfrcSender *sender, guint receive_rate,
208 guint64 now)
209 {
210 guint max_rate;
211
212 add_to_receive_rate_history (sender, receive_rate, now);
213
214 max_rate = get_max_receive_rate (sender, TRUE);
215
216 DEBUG_SENDER (sender, "MAXIMIZE recv: %u max: %u", receive_rate, max_rate);
217
218 memset (sender->receive_rate_history, 0,
219 sizeof(struct ReceiveRateItem) * RECEIVE_RATE_HISTORY_SIZE);
220
221 sender->receive_rate_history[0].rate = max_rate;
222 sender->receive_rate_history[0].timestamp = now;
223
224 return max_rate;
225 }
226
227 static void
update_receive_rate_history(TfrcSender * sender,guint receive_rate,guint64 now)228 update_receive_rate_history (TfrcSender *sender, guint receive_rate,
229 guint64 now)
230 {
231 guint i;
232
233 add_to_receive_rate_history (sender, receive_rate, now);
234
235 for (i = 1; i < RECEIVE_RATE_HISTORY_SIZE; i++)
236 if (sender->receive_rate_history[i].rate &&
237 sender->receive_rate_history[i].timestamp <
238 now - (2 * sender->averaged_rtt))
239 sender->receive_rate_history[i].rate = 0;
240 }
241
242 const guint T_MBI = 64; /* the maximum backoff interval of 64 seconds */
243
244 static guint
compute_initial_rate(guint mss,guint rtt)245 compute_initial_rate (guint mss, guint rtt)
246 {
247 if (G_UNLIKELY (rtt == 0))
248 return 0;
249
250 return (SECOND * MIN (4 * mss, MAX (2 * mss, 4380))) / rtt;
251 }
252
253 /* RFC 5348 section 4.3 step 4 second part */
254 static void
recompute_sending_rate(TfrcSender * sender,guint recv_limit,gdouble loss_event_rate,guint64 now)255 recompute_sending_rate (TfrcSender *sender, guint recv_limit,
256 gdouble loss_event_rate, guint64 now)
257 {
258 if (loss_event_rate > 0) {
259 /* congestion avoidance phase */
260 sender->computed_rate = calculate_bitrate (sender_get_segment_size (sender),
261 sender->averaged_rtt, loss_event_rate);
262 sender->rate = MAX (MIN (sender->computed_rate, recv_limit),
263 sender_get_segment_size (sender)/T_MBI);
264 DEBUG_SENDER (sender, "congestion avoidance: %u (computed: %u ss: %u)",
265 sender->rate, sender->computed_rate, sender_get_segment_size (sender));
266 } else if (now - sender->tld >= sender->averaged_rtt) {
267 /* initial slow-start */
268 sender->rate = MAX (MIN (2 * sender->rate, recv_limit),
269 compute_initial_rate (sender->mss, sender->averaged_rtt));
270 sender->tld = now;
271 DEBUG_SENDER (sender, "initial slow start: %u", sender->rate);
272 }
273 }
274
275 static void
tfrc_sender_update_inst_rate(TfrcSender * sender)276 tfrc_sender_update_inst_rate (TfrcSender *sender)
277 {
278 if (!sender->last_sqrt_rtt)
279 return;
280
281 /*
282 * Update the instantaneous
283 * transmit rate, X_inst, following RFC 5348 Section 4.5.
284 */
285
286 if (sender->sqmean_rtt)
287 sender->sqmean_rtt = 0.9 * sender->sqmean_rtt + sender->last_sqrt_rtt / 10;
288 else
289 sender->sqmean_rtt = sender->last_sqrt_rtt;
290
291 sender->inst_rate = sender->rate * sender->sqmean_rtt / sender->last_sqrt_rtt;
292 if (sender->inst_rate < sender_get_segment_size (sender) / T_MBI)
293 sender->inst_rate = sender_get_segment_size (sender) / T_MBI;
294
295 }
296
297 void
tfrc_sender_on_feedback_packet(TfrcSender * sender,guint64 now,guint rtt,guint receive_rate,gdouble loss_event_rate,gboolean is_data_limited)298 tfrc_sender_on_feedback_packet (TfrcSender *sender, guint64 now,
299 guint rtt, guint receive_rate, gdouble loss_event_rate,
300 gboolean is_data_limited)
301 {
302 guint recv_limit; /* the limit on the sending rate computed from X_recv_set */
303
304 g_return_if_fail (rtt > 0 && rtt <= 10 * SECOND);
305
306 /* On first feedback packet, set he rate based on the mss and rtt */
307 if (sender->tld == 0) {
308 sender->rate = compute_initial_rate (sender->mss, rtt);
309 sender->tld = now;
310 DEBUG_SENDER (sender, "fb: initial rate: %u", sender->rate);
311 }
312
313 /* Apply the steps from RFC 5348 section 4.3 */
314
315 /* Step 1 (calculating the rtt) is done before calling this function */
316
317 /* Step 2: Update the RTT */
318 if (sender->averaged_rtt == 0)
319 sender->averaged_rtt = rtt;
320 else
321 sender->averaged_rtt = (sender->averaged_rtt * 9 + rtt) / 10;
322
323 if (sender->averaged_rtt == 0)
324 sender->averaged_rtt = 1;
325
326 /* Step 3: Update the timeout interval */
327 sender->retransmission_timeout = MAX (MAX (4 * sender->averaged_rtt,
328 SECOND * 2 * sender_get_segment_size (sender) / sender->rate ),
329 MIN_NOFEEDBACK_TIMER);
330
331 /* Step 4: Update the allowed sending rate */
332
333
334 if (G_UNLIKELY (is_data_limited)) {
335 /* the entire interval covered by the feedback packet
336 was a data-limited interval */
337 if (loss_event_rate > sender->last_loss_event_rate) {
338 guint i;
339 /* the feedback packet reports a new loss event or an
340 increase in the loss event rate p */
341
342 /* Halve entries in the receive rate history */
343 for (i = 0; i < RECEIVE_RATE_HISTORY_SIZE; i++)
344 sender->receive_rate_history[i].rate /= 2;
345
346 receive_rate *= 0.85;
347
348 recv_limit = maximize_receive_rate_history (sender, receive_rate,
349 now);
350 DEBUG_SENDER (sender, "fb: data limited, new loss event %f > %f,"
351 " recv_limit: %u", loss_event_rate, sender->last_loss_event_rate,
352 recv_limit);
353 } else {
354 recv_limit = 2 * maximize_receive_rate_history (sender, receive_rate,
355 now);
356 DEBUG_SENDER (sender, "fb: data limited, no new loss event %f <= %f,"
357 " recv_limit: %u", loss_event_rate, sender->last_loss_event_rate,
358 recv_limit);
359 }
360 } else {
361 /* typical behavior */
362 update_receive_rate_history (sender, receive_rate, now);
363 recv_limit = get_max_receive_rate (sender, FALSE);
364 if (recv_limit < G_MAXUINT / 2)
365 recv_limit *= 2;
366 else
367 recv_limit = G_MAXUINT;
368 DEBUG_SENDER (sender, "fb: not data limited, recv_limit: %u",
369 recv_limit);
370 }
371
372 recompute_sending_rate (sender, recv_limit, loss_event_rate, now);
373
374 /* Step 5: update the instantaneous
375 transmit rate, X_inst, following Section 4.5.
376 */
377
378 sender->last_sqrt_rtt = sqrt (rtt);
379 tfrc_sender_update_inst_rate (sender);
380
381 /* Step 6: Reset the nofeedback timer to expire after RTO seconds. */
382
383 sender->nofeedback_timer_expiry = now + sender->retransmission_timeout;
384 sender->sent_packet = FALSE;
385
386 sender->last_loss_event_rate = loss_event_rate;
387 }
388
389 static void
update_limits(TfrcSender * sender,guint timer_limit,guint64 now)390 update_limits(TfrcSender *sender, guint timer_limit, guint64 now)
391 {
392 if (timer_limit < sender_get_segment_size (sender) / T_MBI)
393 timer_limit = sender_get_segment_size (sender) / T_MBI;
394
395 memset (sender->receive_rate_history, 0,
396 sizeof(struct ReceiveRateItem) * RECEIVE_RATE_HISTORY_SIZE);
397
398 sender->receive_rate_history[0].rate = timer_limit / 2;
399 sender->receive_rate_history[0].timestamp = now;
400
401 recompute_sending_rate (sender, timer_limit,
402 sender->last_loss_event_rate, now);
403 }
404
405
406 void
tfrc_sender_no_feedback_timer_expired(TfrcSender * sender,guint64 now)407 tfrc_sender_no_feedback_timer_expired (TfrcSender *sender, guint64 now)
408 {
409 guint receive_rate = get_max_receive_rate (sender, FALSE);
410 guint recover_rate = compute_initial_rate (sender->mss, sender->averaged_rtt);
411
412 if (sender->averaged_rtt == 0 && sender->sent_packet) {
413 /* We do not have X_Bps or recover_rate yet.
414 * Halve the allowed sending rate.
415 */
416
417 sender->rate = MAX ( sender->rate / 2,
418 sender_get_segment_size (sender) / T_MBI);
419 DEBUG_SENDER (sender, "no_fb: no p, initial, halve rate: %u", sender->rate);
420 tfrc_sender_update_inst_rate (sender);
421 } else if (((sender->last_loss_event_rate > 0 &&
422 receive_rate < recover_rate) ||
423 (sender->last_loss_event_rate == 0 &&
424 sender->rate < 2 * recover_rate)) &&
425 !sender->sent_packet) {
426 /* Don't halve the allowed sending rate. */
427 /* do nothing */
428 DEBUG_SENDER (sender, "no_fb: idle, do nothing recv: %u recover: %u",
429 receive_rate, recover_rate);
430 } else if (sender->last_loss_event_rate == 0) {
431 /* We do not have X_Bps yet.
432 * Halve the allowed sending rate.
433 */
434 sender->rate = MAX ( sender->rate / 2,
435 sender_get_segment_size (sender) / T_MBI);
436 DEBUG_SENDER (sender, "no_fb: no p, halve rate: %u recover: %u, sent: %u", sender->rate,
437 recover_rate, sender->sent_packet);
438 tfrc_sender_update_inst_rate (sender);
439 } else if (sender->computed_rate / 2 > receive_rate) {
440 /* 2 * X_recv was already limiting the sending rate.
441 * Halve the allowed sending rate.
442 */
443 DEBUG_SENDER (sender, "no_fb: computed rate %u > 2 * recv_rate %u",
444 sender->computed_rate, receive_rate);
445 update_limits(sender, receive_rate, now);
446 } else {
447 DEBUG_SENDER (sender, "no_fb: ELSE computed: %u", sender->computed_rate);
448 update_limits(sender, sender->computed_rate / 2, now);
449 }
450
451 g_assert (sender->rate != 0);
452
453 sender->nofeedback_timer_expiry = now + MAX (MAX ( 4 * sender->averaged_rtt,
454 SECOND * 2 * sender_get_segment_size (sender) / sender->rate),
455 MIN_NOFEEDBACK_TIMER);
456 sender->sent_packet = FALSE;
457 }
458
459 void
tfrc_sender_sending_packet(TfrcSender * sender,guint size)460 tfrc_sender_sending_packet (TfrcSender *sender, guint size)
461 {
462 /* this should be:
463 * avg = size + (avg * 15/16)
464 */
465 sender->average_packet_size =
466 size + ((15 * sender->average_packet_size) >> 4);
467
468 sender->sent_packet = TRUE;
469 }
470
471 guint
tfrc_sender_get_send_rate(TfrcSender * sender)472 tfrc_sender_get_send_rate (TfrcSender *sender)
473 {
474 guint rate;
475
476 if (!sender)
477 return DEFAULT_MSS;
478
479 if (sender->use_inst_rate && sender->inst_rate)
480 rate = sender->inst_rate;
481 else
482 rate = sender->rate;
483
484 if (sender->sp)
485 return rate * (sender->average_packet_size >> 4) /
486 ((sender->average_packet_size >> 4) + HEADER_SIZE);
487 else
488 return rate;
489 }
490
491 guint64
tfrc_sender_get_no_feedback_timer_expiry(TfrcSender * sender)492 tfrc_sender_get_no_feedback_timer_expiry (TfrcSender *sender)
493 {
494 return sender->nofeedback_timer_expiry;
495 }
496
497 guint
tfrc_sender_get_averaged_rtt(TfrcSender * sender)498 tfrc_sender_get_averaged_rtt (TfrcSender *sender)
499 {
500 return sender->averaged_rtt;
501 }
502
503
504 #define NDUPACK 3 /* Number of packets to receive after a loss before declaring the loss event */
505 #define LOSS_EVENTS_MAX (9)
506 #define LOSS_INTERVALS_MAX (8)
507 #define MAX_HISTORY_SIZE (LOSS_EVENTS_MAX * 2) /* 2 is a random number */
508 #define MIN_HISTORY_DURATION (10)
509
510 typedef struct {
511 guint64 first_timestamp;
512 guint first_seqnum;
513 guint64 first_recvtime;
514
515 guint64 last_timestamp;
516 guint last_seqnum;
517 guint64 last_recvtime;
518 } ReceivedInterval;
519
520 struct _TfrcReceiver {
521 GQueue received_intervals;
522
523 gboolean sp;
524
525 guint sender_rtt;
526 guint receive_rate;
527 guint max_receive_rate;
528 guint max_receive_rate_ss;
529 guint64 feedback_timer_expiry;
530
531 guint first_loss_interval;
532
533 gdouble loss_event_rate;
534
535 gboolean feedback_sent_on_last_timer;
536
537 guint received_bytes;
538 guint prev_received_bytes;
539 guint64 received_bytes_reset_time;
540 guint64 prev_received_bytes_reset_time;
541 guint received_packets;
542 guint prev_received_packets;
543 guint sender_rtt_on_last_feedback;
544 };
545
546 TfrcReceiver *
tfrc_receiver_new(guint64 now)547 tfrc_receiver_new (guint64 now)
548 {
549 TfrcReceiver *receiver = g_slice_new0 (TfrcReceiver);
550
551 g_queue_init (&receiver->received_intervals);
552 receiver->received_bytes_reset_time = now;
553 receiver->prev_received_bytes_reset_time = now;
554
555 return receiver;
556 }
557
558
559 TfrcReceiver *
tfrc_receiver_new_sp(guint64 now)560 tfrc_receiver_new_sp (guint64 now)
561 {
562 TfrcReceiver *receiver = tfrc_receiver_new (now);
563
564 receiver->sp = TRUE;
565
566 return receiver;
567 }
568
569 void
tfrc_receiver_free(TfrcReceiver * receiver)570 tfrc_receiver_free (TfrcReceiver *receiver)
571 {
572 ReceivedInterval *ri;
573
574 while ((ri = g_queue_pop_tail (&receiver->received_intervals)))
575 g_slice_free (ReceivedInterval, ri);
576
577 g_slice_free (TfrcReceiver, receiver);
578 }
579
580 /*
581 * @s: segment size in bytes
582 * @R: RTT in milli seconds (instead of seconds)
583 * @rate: the sending rate
584 *
585 * Returns the 1/p that would produce this sending rate
586 * This is used to compute the first loss interval
587 */
588
589 static gdouble
compute_first_loss_interval(gdouble s,gdouble R,gdouble rate)590 compute_first_loss_interval (gdouble s, gdouble R, gdouble rate)
591 {
592 gdouble p_min = 0;
593 gdouble p_max = 1;
594 gdouble p;
595 gdouble computed_rate;
596
597 /* Use an iterative process to find p
598 * it would be faster to use a table, but I'm lazy
599 */
600
601 do {
602 p = (p_min + p_max) / 2;
603 computed_rate = calculate_bitrate (s, R, p);
604
605 if (computed_rate < rate)
606 p_max = p;
607 else
608 p_min = p;
609
610 } while (computed_rate < 0.95 * rate || computed_rate > 1.05 * rate);
611
612 return 1 / p;
613 }
614
615
616 /* Implements RFC 5348 section 5 */
617 static gdouble
calculate_loss_event_rate(TfrcReceiver * receiver,guint64 now)618 calculate_loss_event_rate (TfrcReceiver *receiver, guint64 now)
619 {
620 guint64 loss_event_times[LOSS_EVENTS_MAX];
621 guint loss_event_seqnums[LOSS_EVENTS_MAX];
622 guint loss_event_pktcount[LOSS_EVENTS_MAX];
623 guint loss_intervals[LOSS_EVENTS_MAX];
624 const gdouble weights[8] = { 1.0, 1.0, 1.0, 1.0, 0.8, 0.6, 0.4, 0.2 };
625 gint max_index = -1;
626 GList *item;
627 guint max_seqnum = 0;
628 gint i;
629 guint max_interval;
630 gdouble I_tot0 = 0;
631 gdouble I_tot1 = 0;
632 gdouble W_tot = 0;
633 gdouble I_tot;
634
635 if (receiver->sender_rtt == 0)
636 return 0;
637
638 if (receiver->received_intervals.length < 2)
639 return 0;
640
641 DEBUG_RECEIVER (receiver, "start loss event rate computation (rtt: %u)",
642 receiver->sender_rtt);
643
644 for (item = g_queue_peek_head_link (&receiver->received_intervals)->next;
645 item;
646 item = item->next) {
647 ReceivedInterval *current = item->data;
648 ReceivedInterval *prev = item->prev->data;
649 guint64 start_ts;
650 guint start_seqnum;
651
652 max_seqnum = current->last_seqnum;
653
654 DEBUG_RECEIVER (receiver, "Loss: ts %"G_GUINT64_FORMAT
655 "->%"G_GUINT64_FORMAT" seq %u->%u",
656 prev->last_timestamp, current->first_timestamp, prev->last_seqnum,
657 current->first_seqnum);
658
659 /* If the current loss is entirely within one RTT of the beginning of the
660 * last loss, lets merge it into there
661 */
662 if (max_index >= 0 && current->first_timestamp <
663 loss_event_times[max_index % LOSS_EVENTS_MAX] + receiver->sender_rtt) {
664 loss_event_pktcount[max_index % LOSS_EVENTS_MAX] +=
665 current->first_seqnum - prev->last_seqnum;
666 DEBUG_RECEIVER (receiver, "Merged: pktcount[%u] = %u", max_index,
667 loss_event_pktcount[max_index % LOSS_EVENTS_MAX]);
668 continue;
669 }
670
671 if (max_index >= 0 && prev->last_timestamp <
672 loss_event_times[max_index % LOSS_EVENTS_MAX] + receiver->sender_rtt) {
673 /* This is the case where a loss event ends in a middle of an interval
674 * without packets, then we close this loss event and start a new one
675 */
676 start_ts = loss_event_times[max_index % LOSS_EVENTS_MAX] +
677 receiver->sender_rtt;
678 start_seqnum = prev->last_seqnum +
679 gst_util_uint64_scale_round (
680 current->first_seqnum - prev->last_seqnum,
681 start_ts - prev->last_timestamp,
682 1 + current->first_timestamp - prev->last_timestamp);
683 loss_event_pktcount[max_index % LOSS_EVENTS_MAX] +=
684 start_seqnum - prev->last_seqnum - 1;
685 DEBUG_RECEIVER (receiver,
686 "Loss ends inside loss interval pktcount[%u] = %u",
687 max_index, loss_event_pktcount[max_index % LOSS_EVENTS_MAX]);
688 } else {
689 /* this is the case where the packet loss starts an entirely new loss
690 * event
691 */
692 start_ts = prev->last_timestamp +
693 gst_util_uint64_scale_round (1,
694 current->first_timestamp - prev->last_timestamp,
695 current->first_seqnum - prev->last_seqnum);
696 start_seqnum = prev->last_seqnum + 1;
697 }
698
699 DEBUG_RECEIVER (receiver, "start_ts: %" G_GUINT64_FORMAT " seqnum: %u",
700 start_ts, start_seqnum);
701
702 /* Now we have one or more loss events that start
703 * during this interval of lost packets, if there is more than one
704 * all but the last one are of RTT length
705 */
706 while (start_ts <= current->first_timestamp) {
707 max_index ++;
708
709 loss_event_times[max_index % LOSS_EVENTS_MAX] = start_ts;
710 loss_event_seqnums[max_index % LOSS_EVENTS_MAX] = start_seqnum;
711 if (current->first_timestamp == prev->last_timestamp) {
712 /* if current->first_ts == prev->last_ts,
713 * then the computation of start_seqnum below will yield a division
714 * by 0
715 */
716 loss_event_pktcount[max_index % LOSS_EVENTS_MAX] =
717 current->first_seqnum - start_seqnum;
718 break;
719 }
720
721 start_ts += receiver->sender_rtt;
722 start_seqnum = prev->last_seqnum +
723 gst_util_uint64_scale_round (
724 current->first_seqnum - prev->last_seqnum,
725 start_ts - prev->last_timestamp,
726 current->first_timestamp - prev->last_timestamp);
727
728 /* Make sure our interval has at least one packet in it */
729 if (G_UNLIKELY (start_seqnum <=
730 loss_event_seqnums[max_index % LOSS_EVENTS_MAX]))
731 {
732 start_seqnum = loss_event_seqnums[max_index % LOSS_EVENTS_MAX] + 1;
733 start_ts = prev->last_timestamp +
734 gst_util_uint64_scale_round (
735 current->first_timestamp - prev->last_timestamp,
736 start_seqnum - prev->last_seqnum,
737 current->first_seqnum - prev->last_seqnum);
738 }
739
740 if (start_seqnum > current->first_seqnum)
741 {
742 g_assert (start_ts > current->first_timestamp);
743 start_seqnum = current->first_seqnum;
744 /* No need top change start_ts, the loop will stop anyway */
745 }
746 loss_event_pktcount[max_index % LOSS_EVENTS_MAX] = start_seqnum -
747 loss_event_seqnums[max_index % LOSS_EVENTS_MAX];
748 DEBUG_RECEIVER (receiver, "loss %u times: %" G_GUINT64_FORMAT
749 " seqnum: %u pktcount: %u",
750 max_index, loss_event_times[max_index % LOSS_EVENTS_MAX],
751 loss_event_seqnums[max_index % LOSS_EVENTS_MAX],
752 loss_event_pktcount[max_index % LOSS_EVENTS_MAX]);
753 }
754 }
755
756 if (max_index < 0 ||
757 (max_index < 1 && receiver->max_receive_rate == 0))
758 return 0;
759
760 /* RFC 5348 Section 5.3: The size of loss events */
761 loss_intervals[0] =
762 max_seqnum - loss_event_seqnums[max_index % LOSS_EVENTS_MAX] + 1;
763 DEBUG_RECEIVER (receiver, "intervals[0] = %u", loss_intervals[0]);
764 for (i = max_index - 1, max_interval = 1;
765 max_interval < LOSS_INTERVALS_MAX &&
766 i >= 0 && i > max_index - LOSS_EVENTS_MAX;
767 i--, max_interval++) {
768 guint cur_i = i % LOSS_EVENTS_MAX;
769 guint prev_i = (i + 1) % LOSS_EVENTS_MAX;
770
771 /* if its Small Packet variant and the loss event is short,
772 * that is less than 2 * RTT, then the loss interval is divided by the
773 * number of packets lost
774 * see RFC 4828 section 3 bullet 3 paragraph 2 */
775 if (receiver->sp &&
776 loss_event_times[prev_i] - loss_event_times[cur_i] <
777 2 * receiver->sender_rtt)
778 loss_intervals[max_interval] = (loss_event_seqnums[prev_i] -
779 loss_event_seqnums[cur_i]) / loss_event_pktcount[cur_i];
780 else
781 loss_intervals[max_interval] =
782 loss_event_seqnums[prev_i] - loss_event_seqnums[cur_i];
783 DEBUG_RECEIVER (receiver, "intervals[%u] = %u", max_interval,
784 loss_intervals[max_interval]);
785 }
786
787 /* If the first loss interval is still used, use the computed
788 * value according to RFC 5348 section 6.3.1
789 */
790 if (max_interval < LOSS_INTERVALS_MAX)
791 {
792 if (!receiver->first_loss_interval)
793 {
794 receiver->first_loss_interval =
795 /* segment size is 1 because we're computing it based on the
796 * X_pps equation.. in packets/s
797 */
798 compute_first_loss_interval (receiver->max_receive_rate_ss,
799 receiver->sender_rtt, receiver->max_receive_rate);
800 DEBUG_RECEIVER (receiver, "Computed the first loss interval to %u"
801 " (rtt: %u s: %u rate:%u)",
802 receiver->first_loss_interval, receiver->sender_rtt,
803 receiver->max_receive_rate_ss, receiver->max_receive_rate);
804 }
805 loss_intervals[max_interval] = receiver->first_loss_interval;
806 DEBUG_RECEIVER (receiver, "intervals[%u] = %u", max_interval,
807 loss_intervals[max_interval]);
808 max_interval++;
809 }
810
811 /* Section 5.4: Average loss rate */
812 for (i = 1; i < max_interval; i++) {
813 I_tot1 += loss_intervals[i] * weights[i - 1];
814 W_tot += weights[i - 1];
815 }
816
817 /* Modified according to RFC 4828 section 3 bullet 3 paragraph 4*/
818 if (receiver->sp && now - loss_event_times[0] < 2 * receiver->sender_rtt) {
819 I_tot = I_tot1;
820 } else {
821 for (i = 0; i < max_interval - 1; i++)
822 I_tot0 += loss_intervals[i] * weights[i];
823
824 I_tot = MAX (I_tot0, I_tot1);
825 }
826
827 return W_tot / I_tot;
828 }
829
830
831 /* Implements RFC 5348 section 6.1 */
832 gboolean
tfrc_receiver_got_packet(TfrcReceiver * receiver,guint64 timestamp,guint64 now,guint seqnum,guint sender_rtt,guint packet_size)833 tfrc_receiver_got_packet (TfrcReceiver *receiver, guint64 timestamp,
834 guint64 now, guint seqnum, guint sender_rtt, guint packet_size)
835 {
836 GList *item = NULL;
837 ReceivedInterval *current = NULL;
838 ReceivedInterval *prev = NULL;
839 gboolean recalculate_loss_rate = FALSE;
840 gboolean retval = FALSE;
841 gboolean history_too_short = !sender_rtt; /* No RTT, keep all history */
842
843 receiver->received_bytes += packet_size;
844 receiver->received_packets++;
845
846 g_assert (sender_rtt != 0 || receiver->sender_rtt == 0);
847
848 if (receiver->sender_rtt)
849 receiver->sender_rtt = (0.9 * receiver->sender_rtt) + (sender_rtt / 10);
850 else
851 receiver->sender_rtt = sender_rtt;
852
853 /* RFC 5348 section 6.3: First packet received */
854 if (g_queue_get_length (&receiver->received_intervals) == 0 ||
855 receiver->sender_rtt == 0) {
856 if (receiver->sender_rtt)
857 receiver->feedback_timer_expiry = now + receiver->sender_rtt;
858
859 /* First packet or no RTT yet, lets send a feedback packet */
860 retval = TRUE;
861 }
862
863 /* RFC 5348 section 6.1 Step 1: Add to packet history */
864
865 for (item = g_queue_peek_tail_link (&receiver->received_intervals);
866 item;
867 item = item->prev) {
868 current = item->data;
869 prev = item->prev ? item->prev->data : NULL;
870
871 if (G_LIKELY (seqnum == current->last_seqnum + 1)) {
872 /* Extend the current packet forwardd */
873 current->last_seqnum = seqnum;
874 current->last_timestamp = timestamp;
875 current->last_recvtime = now;
876 } else if (seqnum >= current->first_seqnum &&
877 seqnum <= current->last_seqnum) {
878 /* Is inside the current interval, must be duplicate, ignore */
879 } else if (seqnum > current->last_seqnum + 1) {
880 /* We had a loss, lets add a new one */
881 prev = current;
882
883 current = g_slice_new (ReceivedInterval);
884 current->first_timestamp = current->last_timestamp = timestamp;
885 current->first_seqnum = current->last_seqnum = seqnum;
886 current->first_recvtime = current->last_recvtime = now;
887 g_queue_push_tail (&receiver->received_intervals, current);
888
889 item = g_queue_peek_tail_link (&receiver->received_intervals);
890 } else if (seqnum == current->first_seqnum - 1) {
891 /* Extend the current packet backwards */
892 current->first_seqnum = seqnum;
893 current->first_timestamp = timestamp;
894 current->first_recvtime = now;
895 } else if (seqnum < current->first_timestamp &&
896 (!prev || seqnum > prev->last_seqnum + 1)) {
897 /* We have something that goes in the middle of a gap,
898 so lets created a new received interval */
899 current = g_slice_new (ReceivedInterval);
900
901 current->first_timestamp = current->last_timestamp = timestamp;
902 current->first_seqnum = current->last_seqnum = seqnum;
903 current->first_recvtime = current->last_recvtime = now;
904
905 g_queue_insert_before (&receiver->received_intervals, item, current);
906 item = item->prev;
907 prev = item->prev ? item->prev->data : NULL;
908 } else
909 continue;
910 break;
911 }
912
913 /* Don't forget history if we have aless than MIN_HISTORY_DURATION * rtt
914 * of history
915 */
916 if (!history_too_short)
917 {
918 ReceivedInterval *newest =
919 g_queue_peek_tail (&receiver->received_intervals);
920 ReceivedInterval *oldest =
921 g_queue_peek_head (&receiver->received_intervals);
922 if (newest && oldest)
923 history_too_short =
924 newest->last_timestamp - oldest->first_timestamp <
925 MIN_HISTORY_DURATION * receiver->sender_rtt;
926 else
927 history_too_short = TRUE;
928 }
929
930 /* It's the first one or we're at the start */
931 if (G_UNLIKELY (!current)) {
932 /* If its before MAX_HISTORY_SIZE, its too old, just discard it */
933 if (!history_too_short &&
934 g_queue_get_length (&receiver->received_intervals) > MAX_HISTORY_SIZE)
935 return retval;
936
937 current = g_slice_new (ReceivedInterval);
938
939 current->first_timestamp = current->last_timestamp = timestamp;
940 current->first_seqnum = current->last_seqnum = seqnum;
941 current->first_recvtime = current->last_recvtime = now;
942 g_queue_push_head (&receiver->received_intervals, current);
943 }
944
945 if (!history_too_short &&
946 g_queue_get_length (&receiver->received_intervals) > MAX_HISTORY_SIZE) {
947 ReceivedInterval *remove = g_queue_pop_head (&receiver->received_intervals);
948 if (remove == prev)
949 prev = NULL;
950 g_slice_free (ReceivedInterval, remove);
951 }
952
953
954 if (prev && (current->last_seqnum - current->first_seqnum == NDUPACK))
955 recalculate_loss_rate = TRUE;
956
957
958 if (prev && G_UNLIKELY (prev->last_seqnum + 1 == current->first_seqnum)) {
959 /* Merge closed gap if any */
960 current->first_seqnum = prev->first_seqnum;
961 current->first_timestamp = prev->first_timestamp;
962 current->first_recvtime = prev->first_recvtime;
963
964 g_slice_free (ReceivedInterval, prev);
965 g_queue_delete_link (&receiver->received_intervals, item->prev);
966
967 prev = item->prev ? item->prev->data : NULL;
968
969 recalculate_loss_rate = TRUE;
970 }
971
972 /* RFC 5348 section 6.1 Step 2, 3, 4:
973 * Check if done
974 * If not done, recalculte the loss event rate,
975 * and possibly send a feedback message
976 */
977
978 if (receiver->sender_rtt &&
979 (recalculate_loss_rate || !receiver->feedback_sent_on_last_timer)) {
980 gdouble new_loss_event_rate = calculate_loss_event_rate (receiver, now);
981
982 if (new_loss_event_rate > receiver->loss_event_rate ||
983 !receiver->feedback_sent_on_last_timer)
984 retval |= tfrc_receiver_feedback_timer_expired (receiver, now);
985 }
986
987 return retval;
988 }
989
990 gboolean
tfrc_receiver_feedback_timer_expired(TfrcReceiver * receiver,guint64 now)991 tfrc_receiver_feedback_timer_expired (TfrcReceiver *receiver, guint64 now)
992 {
993 if (receiver->received_bytes == 0 ||
994 receiver->prev_received_bytes_reset_time == now) {
995 g_assert (receiver->sender_rtt != 0);
996 receiver->feedback_timer_expiry = now + receiver->sender_rtt;
997 receiver->feedback_sent_on_last_timer = FALSE;
998 return FALSE;
999 }
1000 else
1001 {
1002 return TRUE;
1003 }
1004 }
1005
1006 gboolean
tfrc_receiver_send_feedback(TfrcReceiver * receiver,guint64 now,double * loss_event_rate,guint * receive_rate)1007 tfrc_receiver_send_feedback (TfrcReceiver *receiver, guint64 now,
1008 double *loss_event_rate, guint *receive_rate)
1009 {
1010 guint received_bytes;
1011 guint64 received_bytes_reset_time;
1012 guint received_packets;
1013
1014 if (now == receiver->prev_received_bytes_reset_time)
1015 return FALSE;
1016
1017 if (now - receiver->received_bytes_reset_time >
1018 receiver->sender_rtt_on_last_feedback ) {
1019 receiver->prev_received_bytes_reset_time =
1020 receiver->received_bytes_reset_time;
1021 receiver->prev_received_bytes = receiver->received_bytes;
1022 receiver->prev_received_packets = receiver->received_packets;
1023 received_bytes = receiver->received_bytes;
1024 received_packets = receiver->received_packets;
1025 received_bytes_reset_time = receiver->received_bytes_reset_time;
1026 } else {
1027 receiver->prev_received_bytes += receiver->received_bytes;
1028 receiver->prev_received_packets += receiver->received_packets;
1029 received_bytes = receiver->prev_received_bytes;
1030 received_packets = receiver->prev_received_packets;
1031 received_bytes_reset_time = receiver->prev_received_bytes_reset_time;
1032 }
1033
1034 receiver->received_bytes_reset_time = now;
1035 receiver->received_bytes = 0;
1036 receiver->received_packets = 0;
1037
1038 receiver->receive_rate = gst_util_uint64_scale_round (SECOND, received_bytes,
1039 now - received_bytes_reset_time);
1040
1041 if (receiver->sender_rtt_on_last_feedback &&
1042 receiver->receive_rate > receiver->max_receive_rate)
1043 {
1044 receiver->max_receive_rate = receiver->receive_rate;
1045 receiver->max_receive_rate_ss = received_bytes / received_packets;
1046 }
1047
1048 receiver->loss_event_rate = calculate_loss_event_rate (receiver, now);
1049
1050 if (receiver->sender_rtt)
1051 receiver->feedback_timer_expiry = now + receiver->sender_rtt;
1052 receiver->sender_rtt_on_last_feedback = receiver->sender_rtt;
1053 receiver->feedback_sent_on_last_timer = TRUE;
1054
1055 DEBUG_RECEIVER (receiver, "P: %f recv_rate: %u", receiver->loss_event_rate,
1056 receiver->receive_rate);
1057
1058 *receive_rate = receiver->receive_rate;
1059 *loss_event_rate = receiver->loss_event_rate;
1060
1061 return TRUE;
1062 }
1063
1064 guint64
tfrc_receiver_get_feedback_timer_expiry(TfrcReceiver * receiver)1065 tfrc_receiver_get_feedback_timer_expiry (TfrcReceiver *receiver)
1066 {
1067 g_assert (receiver->sender_rtt || receiver->feedback_timer_expiry == 0);
1068 return receiver->feedback_timer_expiry;
1069 }
1070
1071 struct _TfrcIsDataLimited
1072 {
1073 guint64 not_limited_1;
1074 guint64 not_limited_2;
1075 guint64 t_new;
1076 guint64 t_next;
1077 };
1078
1079 /*
1080 * This implements the algorithm proposed in RFC 5248 section 8.2.1 */
1081
1082 TfrcIsDataLimited *
tfrc_is_data_limited_new(guint64 now)1083 tfrc_is_data_limited_new (guint64 now)
1084 {
1085 TfrcIsDataLimited *idl = g_slice_new0 (TfrcIsDataLimited);
1086
1087 return idl;
1088 }
1089
1090 void
tfrc_is_data_limited_free(TfrcIsDataLimited * idl)1091 tfrc_is_data_limited_free (TfrcIsDataLimited *idl)
1092 {
1093 g_slice_free (TfrcIsDataLimited, idl);
1094 }
1095
1096 void
tfrc_is_data_limited_not_limited_now(TfrcIsDataLimited * idl,guint64 now)1097 tfrc_is_data_limited_not_limited_now (TfrcIsDataLimited *idl, guint64 now)
1098 {
1099 /* Sender is not data-limited at this instant. */
1100 if (idl->not_limited_1 <= idl->t_new)
1101 /* Goal: NotLimited1 > t_new. */
1102 idl->not_limited_1 = now;
1103 else if (idl->not_limited_2 <= idl->t_next)
1104 /* Goal: NotLimited2 > t_next. */
1105 idl->not_limited_2 = now;
1106 }
1107
1108 /*
1109 * Returns TRUE if the period since the previous feedback packet
1110 * was data limited
1111 */
1112 gboolean
tfrc_is_data_limited_received_feedback(TfrcIsDataLimited * idl,guint64 now,guint64 last_packet_timestamp,guint rtt)1113 tfrc_is_data_limited_received_feedback (TfrcIsDataLimited *idl, guint64 now,
1114 guint64 last_packet_timestamp, guint rtt)
1115 {
1116 gboolean ret;
1117 guint64 t_old;
1118
1119 idl->t_new = last_packet_timestamp;
1120 t_old = idl->t_new - rtt;
1121 idl->t_next = now;
1122 if ((t_old < idl->not_limited_1 && idl->not_limited_1 <= idl->t_new) ||
1123 (t_old < idl->not_limited_2 && idl->not_limited_2 <= idl->t_new))
1124 /* This was NOT a data-limited interval */
1125 ret = FALSE;
1126 else
1127 /* This was a data-limited interval. */
1128 ret = TRUE;
1129
1130 if (idl->not_limited_1 <= idl->t_new && idl->not_limited_2 > idl->t_new)
1131 idl->not_limited_1 = idl->not_limited_2;
1132
1133 return ret;
1134 }
1135