1 /*
2  * Farstream - Farstream TFRC implementation
3  *
4  * Copyright 2010 Collabora Ltd.
5  *  @author: Olivier Crete <olivier.crete@collabora.co.uk>
6  * Copyright 2010 Nokia Corp.
7  *
8  * tfrc.c - An implemention of TCP Friendly rate control, RFCs 5348 and 4828
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License as published by the Free Software Foundation; either
13  * version 2.1 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Lesser General Public License for more details.
19  *
20  * You should have received a copy of the GNU Lesser General Public
21  * License along with this library; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
23  */
24 
25 #include "tfrc.h"
26 
27 #include <math.h>
28 #include <string.h>
29 
30 /* for gst_util_uint64_scale_round */
31 #include <gst/gst.h>
32 
33 /*
34  * ALL TIMES ARE IN MICROSECONDS
35  * bitrates are in bytes/sec
36  */
37 
38 #if 0
39 #define DEBUG_RECEIVER(receiver, format, ...) \
40   g_debug ("TFRC-R (%p): " format , receiver,  __VA_ARGS__)
41 #else
42 #define DEBUG_RECEIVER(...)
43 #endif
44 
45 #if 0
46 #define DEBUG_SENDER(sender, format, ...) \
47   g_debug ("TFRC-S (%p): " format,  sender,  __VA_ARGS__)
48 #else
49 #define DEBUG_SENDER(...)
50 #endif
51 
52 #define DEFAULT_MSS 1460
53 
54 #define SECOND (1000 * 1000)
55 
56 #define MIN_NOFEEDBACK_TIMER (20 * 1000)
57 
58 /*
59  * @s: segment size in bytes
60  * @R: RTT in milli seconds (instead of seconds)
61  * @p: loss event per packet transmitted
62  *
63  *                              s
64  * X_Bps = -----------------------------------------------
65  *         R * (sqrt(2*p/3) + 12*sqrt(3*p/8)*p*(1+32*p^2))
66  *
67  * Returns: The bitrate in bytes/s
68  */
69 static gdouble
calculate_bitrate(gdouble s,gdouble R,gdouble p)70 calculate_bitrate (gdouble s, gdouble R, gdouble p)
71 {
72   gdouble f = sqrt (2 * p / 3) + 12 * sqrt (3 * p / 8) * p * (1 + 32 * p * p);
73 
74   return (SECOND * s) / (R * f);
75 }
76 
77 #define RECEIVE_RATE_HISTORY_SIZE      (4)
78 
79 /* Specified in RFC 4828 Section 3 second bullet */
80 #define HEADER_SIZE     (40)
81 
82 struct ReceiveRateItem {
83   guint64 timestamp;
84   guint rate;
85 };
86 
87 struct _TfrcSender {
88   guint computed_rate; /* The rate computer from the TCP throughput equation */
89 
90   gboolean sp;
91   guint average_packet_size; /* 16 times larger */
92   gboolean use_inst_rate; /* use inst_rate instead of rate */
93 
94   guint mss; /* max segment size */
95   guint rate; /* maximum allowed sending rate in bytes/sec */
96   guint inst_rate; /* corrected maximum allowed sending rate */
97   guint averaged_rtt;
98   guint sqmean_rtt;
99   guint last_sqrt_rtt;
100   guint64 tld; /* Time Last Doubled during slow-start */
101 
102   guint64 nofeedback_timer_expiry;
103 
104   guint retransmission_timeout; /* RTO */
105 
106   struct ReceiveRateItem receive_rate_history[RECEIVE_RATE_HISTORY_SIZE];
107 
108   gdouble last_loss_event_rate;
109 
110   gboolean sent_packet;
111 };
112 
113 TfrcSender *
tfrc_sender_new(guint segment_size,guint64 now,guint initial_rate)114 tfrc_sender_new (guint segment_size, guint64 now, guint initial_rate)
115 {
116   TfrcSender *sender = g_slice_new0 (TfrcSender);
117 
118   /* initialized as described in RFC 5348 4.2 */
119   sender->use_inst_rate = TRUE;
120   sender->mss = DEFAULT_MSS;
121   sender->average_packet_size = segment_size << 4;
122   if (initial_rate)
123     sender->rate = initial_rate;
124   else
125     sender->rate = segment_size;
126 
127   sender->retransmission_timeout = 2 * SECOND;
128   sender->nofeedback_timer_expiry = now + sender->retransmission_timeout;
129   return sender;
130 }
131 
132 TfrcSender *
tfrc_sender_new_sp(guint64 now,guint initial_average_packet_size)133 tfrc_sender_new_sp (guint64 now, guint initial_average_packet_size)
134 {
135   TfrcSender *sender = tfrc_sender_new (1460, now, 0);
136 
137   sender->sp = TRUE;
138 
139   sender->average_packet_size = initial_average_packet_size << 4;
140 
141   return sender;
142 }
143 
144 void
tfrc_sender_use_inst_rate(TfrcSender * sender,gboolean use_inst_rate)145 tfrc_sender_use_inst_rate (TfrcSender *sender, gboolean use_inst_rate)
146 {
147   sender->use_inst_rate = use_inst_rate;
148 }
149 
150 
151 void
tfrc_sender_free(TfrcSender * sender)152 tfrc_sender_free (TfrcSender *sender)
153 {
154   g_slice_free (TfrcSender, sender);
155 }
156 
157 static guint
sender_get_segment_size(TfrcSender * sender)158 sender_get_segment_size (TfrcSender *sender)
159 {
160   if (sender->sp)
161     return sender->mss;
162   else
163     return sender->average_packet_size >> 4;
164 }
165 
166 void
tfrc_sender_on_first_rtt(TfrcSender * sender,guint64 now)167 tfrc_sender_on_first_rtt (TfrcSender *sender, guint64 now)
168 {
169   sender->receive_rate_history[0].rate = G_MAXUINT;
170   sender->receive_rate_history[0].timestamp = now;
171 }
172 
get_max_receive_rate(TfrcSender * sender,gboolean ignore_max_uint)173 static guint get_max_receive_rate (TfrcSender *sender, gboolean ignore_max_uint)
174 {
175   guint max_rate = 0;
176   guint i;
177 
178   for (i = 0; i < RECEIVE_RATE_HISTORY_SIZE; i++)
179   {
180     if (G_UNLIKELY (sender->receive_rate_history[i].rate == G_MAXUINT))
181     {
182       if (ignore_max_uint)
183         return max_rate;
184       else
185         return G_MAXUINT;
186     }
187     max_rate = MAX (max_rate, sender->receive_rate_history[i].rate);
188   }
189 
190   return max_rate;
191 }
192 
193 static void
add_to_receive_rate_history(TfrcSender * sender,guint receive_rate,guint64 now)194 add_to_receive_rate_history (TfrcSender *sender, guint receive_rate,
195     guint64 now)
196 {
197   int i;
198 
199   for (i = RECEIVE_RATE_HISTORY_SIZE - 1; i > 0; i --)
200     sender->receive_rate_history[i] = sender->receive_rate_history[i-1];
201 
202   sender->receive_rate_history[0].rate = receive_rate;
203   sender->receive_rate_history[0].timestamp = now;
204 }
205 
206 static guint
maximize_receive_rate_history(TfrcSender * sender,guint receive_rate,guint64 now)207 maximize_receive_rate_history (TfrcSender *sender, guint receive_rate,
208     guint64 now)
209 {
210   guint max_rate;
211 
212   add_to_receive_rate_history (sender, receive_rate, now);
213 
214   max_rate = get_max_receive_rate (sender, TRUE);
215 
216   DEBUG_SENDER (sender, "MAXIMIZE recv: %u max: %u", receive_rate, max_rate);
217 
218   memset (sender->receive_rate_history, 0,
219       sizeof(struct ReceiveRateItem) * RECEIVE_RATE_HISTORY_SIZE);
220 
221   sender->receive_rate_history[0].rate = max_rate;
222   sender->receive_rate_history[0].timestamp = now;
223 
224   return max_rate;
225 }
226 
227 static void
update_receive_rate_history(TfrcSender * sender,guint receive_rate,guint64 now)228 update_receive_rate_history (TfrcSender *sender, guint receive_rate,
229     guint64 now)
230 {
231   guint i;
232 
233   add_to_receive_rate_history (sender, receive_rate, now);
234 
235   for (i = 1; i < RECEIVE_RATE_HISTORY_SIZE; i++)
236     if (sender->receive_rate_history[i].rate &&
237         sender->receive_rate_history[i].timestamp <
238         now - (2 * sender->averaged_rtt))
239       sender->receive_rate_history[i].rate = 0;
240 }
241 
242 const guint T_MBI = 64; /* the maximum backoff interval of 64 seconds */
243 
244 static guint
compute_initial_rate(guint mss,guint rtt)245 compute_initial_rate (guint mss, guint rtt)
246 {
247   if (G_UNLIKELY (rtt == 0))
248     return 0;
249 
250   return (SECOND * MIN (4 * mss, MAX (2 * mss, 4380))) / rtt;
251 }
252 
253 /* RFC 5348 section 4.3 step 4 second part */
254 static void
recompute_sending_rate(TfrcSender * sender,guint recv_limit,gdouble loss_event_rate,guint64 now)255 recompute_sending_rate (TfrcSender *sender, guint recv_limit,
256     gdouble loss_event_rate, guint64 now)
257 {
258   if (loss_event_rate > 0) {
259     /* congestion avoidance phase */
260     sender->computed_rate = calculate_bitrate (sender_get_segment_size (sender),
261         sender->averaged_rtt, loss_event_rate);
262     sender->rate = MAX (MIN (sender->computed_rate, recv_limit),
263             sender_get_segment_size (sender)/T_MBI);
264     DEBUG_SENDER (sender, "congestion avoidance: %u (computed: %u ss: %u)",
265         sender->rate, sender->computed_rate, sender_get_segment_size (sender));
266   } else if (now - sender->tld >= sender->averaged_rtt) {
267     /* initial slow-start */
268     sender->rate = MAX (MIN (2 * sender->rate, recv_limit),
269         compute_initial_rate (sender->mss, sender->averaged_rtt));
270     sender->tld = now;
271     DEBUG_SENDER (sender, "initial slow start: %u", sender->rate);
272   }
273 }
274 
275 static void
tfrc_sender_update_inst_rate(TfrcSender * sender)276 tfrc_sender_update_inst_rate (TfrcSender *sender)
277 {
278   if (!sender->last_sqrt_rtt)
279     return;
280 
281   /*
282    * Update the instantaneous
283    *  transmit rate, X_inst, following RFC 5348 Section 4.5.
284    */
285 
286   if (sender->sqmean_rtt)
287     sender->sqmean_rtt = 0.9 * sender->sqmean_rtt + sender->last_sqrt_rtt / 10;
288   else
289     sender->sqmean_rtt = sender->last_sqrt_rtt;
290 
291   sender->inst_rate = sender->rate * sender->sqmean_rtt / sender->last_sqrt_rtt;
292   if (sender->inst_rate < sender_get_segment_size (sender) / T_MBI)
293     sender->inst_rate = sender_get_segment_size (sender) / T_MBI;
294 
295 }
296 
297 void
tfrc_sender_on_feedback_packet(TfrcSender * sender,guint64 now,guint rtt,guint receive_rate,gdouble loss_event_rate,gboolean is_data_limited)298 tfrc_sender_on_feedback_packet (TfrcSender *sender, guint64 now,
299     guint rtt, guint receive_rate, gdouble loss_event_rate,
300     gboolean is_data_limited)
301 {
302   guint recv_limit; /* the limit on the sending rate computed from X_recv_set */
303 
304   g_return_if_fail (rtt > 0 && rtt <= 10 * SECOND);
305 
306   /* On first feedback packet, set he rate based on the mss and rtt */
307   if (sender->tld == 0) {
308     sender->rate = compute_initial_rate (sender->mss, rtt);
309     sender->tld = now;
310     DEBUG_SENDER (sender, "fb: initial rate: %u", sender->rate);
311   }
312 
313   /* Apply the steps from RFC 5348 section 4.3 */
314 
315   /* Step 1 (calculating the rtt) is done before calling this function */
316 
317   /* Step 2: Update the RTT */
318   if (sender->averaged_rtt == 0)
319     sender->averaged_rtt = rtt;
320   else
321     sender->averaged_rtt = (sender->averaged_rtt * 9 + rtt) / 10;
322 
323   if (sender->averaged_rtt == 0)
324     sender->averaged_rtt = 1;
325 
326   /* Step 3: Update the timeout interval */
327   sender->retransmission_timeout = MAX (MAX (4 * sender->averaged_rtt,
328           SECOND * 2 * sender_get_segment_size (sender) / sender->rate ),
329       MIN_NOFEEDBACK_TIMER);
330 
331   /* Step 4: Update the allowed sending rate */
332 
333 
334   if (G_UNLIKELY (is_data_limited)) {
335     /* the entire interval covered by the feedback packet
336        was a data-limited interval */
337     if (loss_event_rate > sender->last_loss_event_rate) {
338       guint i;
339       /* the feedback packet reports a new loss event or an
340          increase in the loss event rate p */
341 
342       /* Halve entries in the receive rate history */
343       for (i = 0; i < RECEIVE_RATE_HISTORY_SIZE; i++)
344         sender->receive_rate_history[i].rate /= 2;
345 
346       receive_rate *= 0.85;
347 
348       recv_limit = maximize_receive_rate_history (sender, receive_rate,
349           now);
350       DEBUG_SENDER (sender, "fb: data limited, new loss event %f > %f,"
351           " recv_limit: %u", loss_event_rate, sender->last_loss_event_rate,
352           recv_limit);
353     } else {
354       recv_limit = 2 * maximize_receive_rate_history (sender, receive_rate,
355           now);
356       DEBUG_SENDER (sender, "fb: data limited, no new loss event %f <= %f,"
357           " recv_limit: %u", loss_event_rate, sender->last_loss_event_rate,
358           recv_limit);
359     }
360   } else {
361     /* typical behavior */
362     update_receive_rate_history (sender, receive_rate, now);
363     recv_limit = get_max_receive_rate (sender, FALSE);
364     if (recv_limit < G_MAXUINT / 2)
365       recv_limit *= 2;
366     else
367       recv_limit = G_MAXUINT;
368     DEBUG_SENDER (sender, "fb: not data limited, recv_limit: %u",
369         recv_limit);
370   }
371 
372   recompute_sending_rate (sender, recv_limit, loss_event_rate, now);
373 
374   /* Step 5: update the instantaneous
375      transmit rate, X_inst, following Section 4.5.
376   */
377 
378   sender->last_sqrt_rtt = sqrt (rtt);
379   tfrc_sender_update_inst_rate (sender);
380 
381   /* Step 6: Reset the nofeedback timer to expire after RTO seconds. */
382 
383   sender->nofeedback_timer_expiry = now + sender->retransmission_timeout;
384   sender->sent_packet = FALSE;
385 
386   sender->last_loss_event_rate = loss_event_rate;
387 }
388 
389 static void
update_limits(TfrcSender * sender,guint timer_limit,guint64 now)390 update_limits(TfrcSender *sender, guint timer_limit, guint64 now)
391 {
392   if (timer_limit < sender_get_segment_size (sender) / T_MBI)
393     timer_limit = sender_get_segment_size (sender) / T_MBI;
394 
395   memset (sender->receive_rate_history, 0,
396       sizeof(struct ReceiveRateItem) * RECEIVE_RATE_HISTORY_SIZE);
397 
398   sender->receive_rate_history[0].rate = timer_limit / 2;
399   sender->receive_rate_history[0].timestamp = now;
400 
401   recompute_sending_rate (sender, timer_limit,
402       sender->last_loss_event_rate, now);
403 }
404 
405 
406 void
tfrc_sender_no_feedback_timer_expired(TfrcSender * sender,guint64 now)407 tfrc_sender_no_feedback_timer_expired (TfrcSender *sender, guint64 now)
408 {
409   guint receive_rate = get_max_receive_rate (sender, FALSE);
410   guint recover_rate = compute_initial_rate (sender->mss, sender->averaged_rtt);
411 
412   if (sender->averaged_rtt == 0 && sender->sent_packet) {
413     /* We do not have X_Bps or recover_rate yet.
414      * Halve the allowed sending rate.
415      */
416 
417     sender->rate = MAX ( sender->rate / 2,
418         sender_get_segment_size (sender) / T_MBI);
419     DEBUG_SENDER (sender, "no_fb: no p, initial, halve rate: %u", sender->rate);
420     tfrc_sender_update_inst_rate (sender);
421   } else if (((sender->last_loss_event_rate > 0 &&
422               receive_rate < recover_rate) ||
423           (sender->last_loss_event_rate == 0 &&
424               sender->rate < 2 * recover_rate)) &&
425       !sender->sent_packet) {
426     /* Don't halve the allowed sending rate. */
427     /* do nothing */
428     DEBUG_SENDER (sender, "no_fb: idle, do nothing recv: %u recover: %u",
429         receive_rate, recover_rate);
430   } else if (sender->last_loss_event_rate == 0) {
431     /* We do not have X_Bps yet.
432      * Halve the allowed sending rate.
433      */
434     sender->rate = MAX ( sender->rate / 2,
435         sender_get_segment_size (sender) / T_MBI);
436     DEBUG_SENDER (sender, "no_fb: no p, halve rate: %u recover: %u, sent: %u", sender->rate,
437         recover_rate, sender->sent_packet);
438     tfrc_sender_update_inst_rate (sender);
439   } else if (sender->computed_rate / 2 > receive_rate) {
440     /* 2 * X_recv was already limiting the sending rate.
441      * Halve the allowed sending rate.
442    */
443     DEBUG_SENDER (sender, "no_fb: computed rate %u > 2 * recv_rate %u",
444         sender->computed_rate, receive_rate);
445     update_limits(sender, receive_rate, now);
446   } else {
447     DEBUG_SENDER (sender, "no_fb: ELSE computed: %u", sender->computed_rate);
448     update_limits(sender, sender->computed_rate / 2, now);
449   }
450 
451   g_assert (sender->rate != 0);
452 
453   sender->nofeedback_timer_expiry = now + MAX (MAX ( 4 * sender->averaged_rtt,
454           SECOND * 2 * sender_get_segment_size (sender) / sender->rate),
455       MIN_NOFEEDBACK_TIMER);
456   sender->sent_packet = FALSE;
457 }
458 
459 void
tfrc_sender_sending_packet(TfrcSender * sender,guint size)460 tfrc_sender_sending_packet (TfrcSender *sender, guint size)
461 {
462   /* this should be:
463    * avg = size + (avg * 15/16)
464    */
465   sender->average_packet_size =
466       size + ((15 * sender->average_packet_size) >> 4);
467 
468   sender->sent_packet = TRUE;
469 }
470 
471 guint
tfrc_sender_get_send_rate(TfrcSender * sender)472 tfrc_sender_get_send_rate (TfrcSender *sender)
473 {
474   guint rate;
475 
476   if (!sender)
477     return DEFAULT_MSS;
478 
479   if (sender->use_inst_rate && sender->inst_rate)
480     rate = sender->inst_rate;
481   else
482     rate = sender->rate;
483 
484   if (sender->sp)
485     return rate * (sender->average_packet_size >> 4) /
486         ((sender->average_packet_size >> 4) + HEADER_SIZE);
487   else
488     return rate;
489 }
490 
491 guint64
tfrc_sender_get_no_feedback_timer_expiry(TfrcSender * sender)492 tfrc_sender_get_no_feedback_timer_expiry (TfrcSender *sender)
493 {
494   return sender->nofeedback_timer_expiry;
495 }
496 
497 guint
tfrc_sender_get_averaged_rtt(TfrcSender * sender)498 tfrc_sender_get_averaged_rtt (TfrcSender *sender)
499 {
500   return sender->averaged_rtt;
501 }
502 
503 
504 #define NDUPACK 3 /* Number of packets to receive after a loss before declaring the loss event */
505 #define LOSS_EVENTS_MAX (9)
506 #define LOSS_INTERVALS_MAX (8)
507 #define MAX_HISTORY_SIZE (LOSS_EVENTS_MAX * 2) /* 2 is a random number */
508 #define MIN_HISTORY_DURATION (10)
509 
510 typedef struct  {
511   guint64 first_timestamp;
512   guint first_seqnum;
513   guint64 first_recvtime;
514 
515   guint64 last_timestamp;
516   guint last_seqnum;
517   guint64 last_recvtime;
518 } ReceivedInterval;
519 
520 struct _TfrcReceiver {
521   GQueue received_intervals;
522 
523   gboolean sp;
524 
525   guint sender_rtt;
526   guint receive_rate;
527   guint max_receive_rate;
528   guint max_receive_rate_ss;
529   guint64 feedback_timer_expiry;
530 
531   guint first_loss_interval;
532 
533   gdouble loss_event_rate;
534 
535   gboolean feedback_sent_on_last_timer;
536 
537   guint received_bytes;
538   guint prev_received_bytes;
539   guint64 received_bytes_reset_time;
540   guint64 prev_received_bytes_reset_time;
541   guint received_packets;
542   guint prev_received_packets;
543   guint sender_rtt_on_last_feedback;
544 };
545 
546 TfrcReceiver *
tfrc_receiver_new(guint64 now)547 tfrc_receiver_new (guint64 now)
548 {
549   TfrcReceiver *receiver = g_slice_new0 (TfrcReceiver);
550 
551   g_queue_init (&receiver->received_intervals);
552   receiver->received_bytes_reset_time = now;
553   receiver->prev_received_bytes_reset_time = now;
554 
555   return receiver;
556 }
557 
558 
559 TfrcReceiver *
tfrc_receiver_new_sp(guint64 now)560 tfrc_receiver_new_sp (guint64 now)
561 {
562   TfrcReceiver *receiver = tfrc_receiver_new (now);
563 
564   receiver->sp = TRUE;
565 
566   return receiver;
567 }
568 
569 void
tfrc_receiver_free(TfrcReceiver * receiver)570 tfrc_receiver_free (TfrcReceiver *receiver)
571 {
572   ReceivedInterval *ri;
573 
574   while ((ri = g_queue_pop_tail (&receiver->received_intervals)))
575     g_slice_free (ReceivedInterval, ri);
576 
577   g_slice_free (TfrcReceiver, receiver);
578 }
579 
580 /*
581  * @s:  segment size in bytes
582  * @R: RTT in milli seconds (instead of seconds)
583  * @rate: the sending rate
584  *
585  * Returns the 1/p that would produce this sending rate
586  * This is used to compute the first loss interval
587  */
588 
589 static gdouble
compute_first_loss_interval(gdouble s,gdouble R,gdouble rate)590 compute_first_loss_interval (gdouble s, gdouble R, gdouble rate)
591 {
592   gdouble p_min = 0;
593   gdouble p_max = 1;
594   gdouble p;
595   gdouble computed_rate;
596 
597   /* Use an iterative process to find p
598    * it would be faster to use a table, but I'm lazy
599    */
600 
601   do {
602     p = (p_min + p_max) / 2;
603     computed_rate = calculate_bitrate (s, R, p);
604 
605     if (computed_rate < rate)
606       p_max = p;
607     else
608       p_min = p;
609 
610   } while (computed_rate < 0.95 * rate || computed_rate > 1.05 * rate);
611 
612   return 1 / p;
613 }
614 
615 
616 /* Implements RFC 5348 section 5 */
617 static gdouble
calculate_loss_event_rate(TfrcReceiver * receiver,guint64 now)618 calculate_loss_event_rate (TfrcReceiver *receiver, guint64 now)
619 {
620   guint64 loss_event_times[LOSS_EVENTS_MAX];
621   guint loss_event_seqnums[LOSS_EVENTS_MAX];
622   guint loss_event_pktcount[LOSS_EVENTS_MAX];
623   guint loss_intervals[LOSS_EVENTS_MAX];
624   const gdouble weights[8] = { 1.0, 1.0, 1.0, 1.0, 0.8, 0.6, 0.4, 0.2 };
625   gint max_index = -1;
626   GList *item;
627   guint max_seqnum = 0;
628   gint i;
629   guint max_interval;
630   gdouble I_tot0 = 0;
631   gdouble I_tot1 = 0;
632   gdouble W_tot = 0;
633   gdouble I_tot;
634 
635   if (receiver->sender_rtt == 0)
636     return 0;
637 
638   if (receiver->received_intervals.length < 2)
639     return 0;
640 
641   DEBUG_RECEIVER (receiver, "start loss event rate computation (rtt: %u)",
642       receiver->sender_rtt);
643 
644   for (item = g_queue_peek_head_link (&receiver->received_intervals)->next;
645        item;
646        item = item->next) {
647     ReceivedInterval *current = item->data;
648     ReceivedInterval *prev = item->prev->data;
649     guint64 start_ts;
650     guint start_seqnum;
651 
652     max_seqnum = current->last_seqnum;
653 
654     DEBUG_RECEIVER (receiver, "Loss: ts %"G_GUINT64_FORMAT
655         "->%"G_GUINT64_FORMAT" seq %u->%u",
656         prev->last_timestamp, current->first_timestamp, prev->last_seqnum,
657         current->first_seqnum);
658 
659     /* If the current loss is entirely within one RTT of the beginning of the
660      * last loss, lets merge it into there
661      */
662     if (max_index >= 0 && current->first_timestamp <
663         loss_event_times[max_index % LOSS_EVENTS_MAX] + receiver->sender_rtt) {
664       loss_event_pktcount[max_index % LOSS_EVENTS_MAX] +=
665           current->first_seqnum - prev->last_seqnum;
666       DEBUG_RECEIVER (receiver, "Merged: pktcount[%u] = %u", max_index,
667           loss_event_pktcount[max_index % LOSS_EVENTS_MAX]);
668       continue;
669     }
670 
671     if (max_index >= 0 && prev->last_timestamp <
672         loss_event_times[max_index % LOSS_EVENTS_MAX] + receiver->sender_rtt) {
673       /* This is the case where a loss event ends in a middle of an interval
674        * without packets, then we close this loss event and start a new one
675        */
676       start_ts = loss_event_times[max_index % LOSS_EVENTS_MAX] +
677           receiver->sender_rtt;
678       start_seqnum = prev->last_seqnum +
679           gst_util_uint64_scale_round (
680             current->first_seqnum - prev->last_seqnum,
681             start_ts - prev->last_timestamp,
682             1 + current->first_timestamp - prev->last_timestamp);
683       loss_event_pktcount[max_index % LOSS_EVENTS_MAX] +=
684           start_seqnum - prev->last_seqnum - 1;
685       DEBUG_RECEIVER (receiver,
686           "Loss ends inside loss interval pktcount[%u] = %u",
687           max_index, loss_event_pktcount[max_index % LOSS_EVENTS_MAX]);
688     } else {
689       /* this is the case where the packet loss starts an entirely new loss
690        * event
691        */
692       start_ts = prev->last_timestamp +
693           gst_util_uint64_scale_round (1,
694               current->first_timestamp - prev->last_timestamp,
695               current->first_seqnum - prev->last_seqnum);
696       start_seqnum = prev->last_seqnum + 1;
697     }
698 
699     DEBUG_RECEIVER (receiver, "start_ts: %" G_GUINT64_FORMAT " seqnum: %u",
700         start_ts, start_seqnum);
701 
702     /* Now we have one or more loss events that start
703      * during this interval of lost packets, if there is more than one
704      * all but the last one are of RTT length
705      */
706     while (start_ts <= current->first_timestamp) {
707       max_index ++;
708 
709       loss_event_times[max_index % LOSS_EVENTS_MAX] = start_ts;
710       loss_event_seqnums[max_index % LOSS_EVENTS_MAX] = start_seqnum;
711       if (current->first_timestamp == prev->last_timestamp) {
712         /* if current->first_ts == prev->last_ts,
713          * then the computation of start_seqnum below will yield a division
714          * by 0
715          */
716         loss_event_pktcount[max_index % LOSS_EVENTS_MAX] =
717           current->first_seqnum - start_seqnum;
718         break;
719       }
720 
721       start_ts += receiver->sender_rtt;
722       start_seqnum = prev->last_seqnum +
723           gst_util_uint64_scale_round (
724             current->first_seqnum - prev->last_seqnum,
725             start_ts - prev->last_timestamp,
726             current->first_timestamp - prev->last_timestamp);
727 
728       /* Make sure our interval has at least one packet in it */
729       if (G_UNLIKELY (start_seqnum <=
730               loss_event_seqnums[max_index % LOSS_EVENTS_MAX]))
731       {
732         start_seqnum = loss_event_seqnums[max_index % LOSS_EVENTS_MAX] + 1;
733         start_ts = prev->last_timestamp +
734             gst_util_uint64_scale_round (
735               current->first_timestamp - prev->last_timestamp,
736               start_seqnum - prev->last_seqnum,
737               current->first_seqnum - prev->last_seqnum);
738       }
739 
740       if (start_seqnum > current->first_seqnum)
741       {
742         g_assert (start_ts > current->first_timestamp);
743         start_seqnum = current->first_seqnum;
744         /* No need top change start_ts, the loop will stop anyway */
745       }
746       loss_event_pktcount[max_index % LOSS_EVENTS_MAX] = start_seqnum -
747           loss_event_seqnums[max_index % LOSS_EVENTS_MAX];
748       DEBUG_RECEIVER (receiver, "loss %u times: %" G_GUINT64_FORMAT
749           " seqnum: %u pktcount: %u",
750           max_index, loss_event_times[max_index % LOSS_EVENTS_MAX],
751           loss_event_seqnums[max_index % LOSS_EVENTS_MAX],
752           loss_event_pktcount[max_index % LOSS_EVENTS_MAX]);
753     }
754   }
755 
756   if (max_index < 0 ||
757       (max_index < 1 && receiver->max_receive_rate == 0))
758     return 0;
759 
760   /* RFC 5348 Section 5.3: The size of loss events */
761   loss_intervals[0] =
762     max_seqnum - loss_event_seqnums[max_index % LOSS_EVENTS_MAX] + 1;
763   DEBUG_RECEIVER (receiver, "intervals[0] = %u", loss_intervals[0]);
764   for (i = max_index - 1, max_interval = 1;
765        max_interval < LOSS_INTERVALS_MAX &&
766          i >= 0 && i > max_index - LOSS_EVENTS_MAX;
767        i--, max_interval++) {
768     guint cur_i = i % LOSS_EVENTS_MAX;
769     guint prev_i = (i + 1) % LOSS_EVENTS_MAX;
770 
771     /* if its Small Packet variant and the loss event is short,
772      * that is less than 2 * RTT, then the loss interval is divided by the
773      * number of packets lost
774      * see RFC 4828 section 3 bullet 3 paragraph 2 */
775     if (receiver->sp &&
776         loss_event_times[prev_i] - loss_event_times[cur_i] <
777         2 * receiver->sender_rtt)
778       loss_intervals[max_interval] = (loss_event_seqnums[prev_i] -
779           loss_event_seqnums[cur_i]) / loss_event_pktcount[cur_i];
780     else
781       loss_intervals[max_interval] =
782         loss_event_seqnums[prev_i] - loss_event_seqnums[cur_i];
783     DEBUG_RECEIVER (receiver, "intervals[%u] = %u", max_interval,
784         loss_intervals[max_interval]);
785   }
786 
787   /* If the first loss interval is still used, use the computed
788    * value according to RFC 5348 section 6.3.1
789    */
790   if (max_interval < LOSS_INTERVALS_MAX)
791   {
792     if (!receiver->first_loss_interval)
793     {
794       receiver->first_loss_interval =
795           /* segment size is 1 because we're computing it based on the
796            * X_pps equation.. in packets/s
797            */
798           compute_first_loss_interval (receiver->max_receive_rate_ss,
799               receiver->sender_rtt, receiver->max_receive_rate);
800       DEBUG_RECEIVER (receiver, "Computed the first loss interval to %u"
801           " (rtt: %u s: %u rate:%u)",
802           receiver->first_loss_interval, receiver->sender_rtt,
803           receiver->max_receive_rate_ss, receiver->max_receive_rate);
804     }
805     loss_intervals[max_interval] = receiver->first_loss_interval;
806     DEBUG_RECEIVER (receiver, "intervals[%u] = %u", max_interval,
807         loss_intervals[max_interval]);
808     max_interval++;
809  }
810 
811   /* Section 5.4: Average loss rate */
812   for (i = 1; i < max_interval; i++) {
813     I_tot1 += loss_intervals[i] * weights[i - 1];
814     W_tot += weights[i - 1];
815   }
816 
817   /* Modified according to RFC 4828 section 3 bullet 3 paragraph 4*/
818   if (receiver->sp && now - loss_event_times[0] < 2 * receiver->sender_rtt) {
819     I_tot = I_tot1;
820   } else {
821     for (i = 0; i < max_interval - 1; i++)
822       I_tot0 += loss_intervals[i] * weights[i];
823 
824     I_tot = MAX (I_tot0, I_tot1);
825   }
826 
827   return W_tot / I_tot;
828 }
829 
830 
831 /* Implements RFC 5348 section 6.1 */
832 gboolean
tfrc_receiver_got_packet(TfrcReceiver * receiver,guint64 timestamp,guint64 now,guint seqnum,guint sender_rtt,guint packet_size)833 tfrc_receiver_got_packet (TfrcReceiver *receiver, guint64 timestamp,
834     guint64 now, guint seqnum, guint sender_rtt, guint packet_size)
835 {
836   GList *item = NULL;
837   ReceivedInterval *current = NULL;
838   ReceivedInterval *prev = NULL;
839   gboolean recalculate_loss_rate = FALSE;
840   gboolean retval = FALSE;
841   gboolean history_too_short = !sender_rtt; /* No RTT, keep all history */
842 
843   receiver->received_bytes += packet_size;
844   receiver->received_packets++;
845 
846   g_assert (sender_rtt != 0 || receiver->sender_rtt == 0);
847 
848   if (receiver->sender_rtt)
849     receiver->sender_rtt = (0.9 * receiver->sender_rtt) + (sender_rtt / 10);
850   else
851     receiver->sender_rtt = sender_rtt;
852 
853   /* RFC 5348 section 6.3: First packet received */
854   if (g_queue_get_length (&receiver->received_intervals) == 0 ||
855       receiver->sender_rtt == 0) {
856     if (receiver->sender_rtt)
857       receiver->feedback_timer_expiry = now + receiver->sender_rtt;
858 
859     /* First packet or no RTT yet, lets send a feedback packet */
860     retval = TRUE;
861   }
862 
863   /* RFC 5348 section 6.1 Step 1: Add to packet history */
864 
865   for (item = g_queue_peek_tail_link (&receiver->received_intervals);
866        item;
867        item = item->prev) {
868     current = item->data;
869     prev = item->prev ? item->prev->data : NULL;
870 
871     if (G_LIKELY (seqnum == current->last_seqnum + 1)) {
872       /* Extend the current packet forwardd */
873       current->last_seqnum = seqnum;
874       current->last_timestamp = timestamp;
875       current->last_recvtime = now;
876     } else if (seqnum >= current->first_seqnum &&
877         seqnum <= current->last_seqnum) {
878       /* Is inside the current interval, must be duplicate, ignore */
879     } else if (seqnum > current->last_seqnum + 1) {
880       /* We had a loss, lets add a new one */
881       prev = current;
882 
883       current = g_slice_new (ReceivedInterval);
884       current->first_timestamp = current->last_timestamp = timestamp;
885       current->first_seqnum = current->last_seqnum = seqnum;
886       current->first_recvtime = current->last_recvtime = now;
887       g_queue_push_tail (&receiver->received_intervals, current);
888 
889       item = g_queue_peek_tail_link (&receiver->received_intervals);
890     } else if (seqnum == current->first_seqnum - 1) {
891       /* Extend the current packet backwards */
892       current->first_seqnum = seqnum;
893       current->first_timestamp = timestamp;
894       current->first_recvtime = now;
895     } else if (seqnum < current->first_timestamp &&
896         (!prev || seqnum > prev->last_seqnum + 1)) {
897       /* We have something that goes in the middle of a gap,
898          so lets created a new received interval */
899       current = g_slice_new (ReceivedInterval);
900 
901       current->first_timestamp = current->last_timestamp = timestamp;
902       current->first_seqnum = current->last_seqnum = seqnum;
903       current->first_recvtime = current->last_recvtime = now;
904 
905       g_queue_insert_before (&receiver->received_intervals, item, current);
906       item = item->prev;
907       prev = item->prev ? item->prev->data : NULL;
908     } else
909       continue;
910     break;
911   }
912 
913   /* Don't forget history if we have aless than MIN_HISTORY_DURATION * rtt
914    * of history
915    */
916   if (!history_too_short)
917   {
918     ReceivedInterval *newest =
919       g_queue_peek_tail (&receiver->received_intervals);
920     ReceivedInterval *oldest =
921       g_queue_peek_head (&receiver->received_intervals);
922     if (newest && oldest)
923       history_too_short =
924         newest->last_timestamp - oldest->first_timestamp <
925         MIN_HISTORY_DURATION * receiver->sender_rtt;
926     else
927       history_too_short = TRUE;
928   }
929 
930   /* It's the first one or we're at the start */
931   if (G_UNLIKELY (!current)) {
932     /* If its before MAX_HISTORY_SIZE, its too old, just discard it */
933     if (!history_too_short &&
934         g_queue_get_length (&receiver->received_intervals) > MAX_HISTORY_SIZE)
935       return retval;
936 
937     current = g_slice_new (ReceivedInterval);
938 
939     current->first_timestamp = current->last_timestamp = timestamp;
940     current->first_seqnum = current->last_seqnum = seqnum;
941     current->first_recvtime = current->last_recvtime = now;
942     g_queue_push_head (&receiver->received_intervals, current);
943   }
944 
945   if (!history_too_short &&
946       g_queue_get_length (&receiver->received_intervals) > MAX_HISTORY_SIZE) {
947     ReceivedInterval *remove = g_queue_pop_head (&receiver->received_intervals);
948     if (remove == prev)
949       prev = NULL;
950     g_slice_free (ReceivedInterval, remove);
951   }
952 
953 
954   if (prev && (current->last_seqnum - current->first_seqnum == NDUPACK))
955     recalculate_loss_rate = TRUE;
956 
957 
958   if (prev &&  G_UNLIKELY (prev->last_seqnum + 1 == current->first_seqnum)) {
959     /* Merge closed gap if any */
960     current->first_seqnum = prev->first_seqnum;
961     current->first_timestamp = prev->first_timestamp;
962     current->first_recvtime = prev->first_recvtime;
963 
964     g_slice_free (ReceivedInterval, prev);
965     g_queue_delete_link (&receiver->received_intervals, item->prev);
966 
967     prev = item->prev ? item->prev->data : NULL;
968 
969     recalculate_loss_rate = TRUE;
970   }
971 
972   /* RFC 5348 section 6.1 Step 2, 3, 4:
973    * Check if done
974    * If not done, recalculte the loss event rate,
975    * and possibly send a feedback message
976    */
977 
978   if (receiver->sender_rtt &&
979       (recalculate_loss_rate || !receiver->feedback_sent_on_last_timer)) {
980     gdouble new_loss_event_rate = calculate_loss_event_rate (receiver, now);
981 
982     if (new_loss_event_rate > receiver->loss_event_rate ||
983         !receiver->feedback_sent_on_last_timer)
984       retval |= tfrc_receiver_feedback_timer_expired (receiver, now);
985   }
986 
987   return retval;
988 }
989 
990 gboolean
tfrc_receiver_feedback_timer_expired(TfrcReceiver * receiver,guint64 now)991 tfrc_receiver_feedback_timer_expired (TfrcReceiver *receiver, guint64 now)
992 {
993   if (receiver->received_bytes == 0 ||
994       receiver->prev_received_bytes_reset_time == now) {
995     g_assert (receiver->sender_rtt != 0);
996     receiver->feedback_timer_expiry = now + receiver->sender_rtt;
997     receiver->feedback_sent_on_last_timer = FALSE;
998     return FALSE;
999   }
1000   else
1001   {
1002     return TRUE;
1003   }
1004 }
1005 
1006 gboolean
tfrc_receiver_send_feedback(TfrcReceiver * receiver,guint64 now,double * loss_event_rate,guint * receive_rate)1007 tfrc_receiver_send_feedback (TfrcReceiver *receiver, guint64 now,
1008     double *loss_event_rate, guint *receive_rate)
1009 {
1010   guint received_bytes;
1011   guint64 received_bytes_reset_time;
1012   guint received_packets;
1013 
1014   if (now == receiver->prev_received_bytes_reset_time)
1015     return FALSE;
1016 
1017   if (now - receiver->received_bytes_reset_time >
1018       receiver->sender_rtt_on_last_feedback ) {
1019     receiver->prev_received_bytes_reset_time =
1020         receiver->received_bytes_reset_time;
1021     receiver->prev_received_bytes = receiver->received_bytes;
1022     receiver->prev_received_packets = receiver->received_packets;
1023     received_bytes = receiver->received_bytes;
1024     received_packets = receiver->received_packets;
1025     received_bytes_reset_time = receiver->received_bytes_reset_time;
1026   } else {
1027     receiver->prev_received_bytes += receiver->received_bytes;
1028     receiver->prev_received_packets += receiver->received_packets;
1029     received_bytes = receiver->prev_received_bytes;
1030     received_packets = receiver->prev_received_packets;
1031     received_bytes_reset_time = receiver->prev_received_bytes_reset_time;
1032   }
1033 
1034   receiver->received_bytes_reset_time = now;
1035   receiver->received_bytes = 0;
1036   receiver->received_packets = 0;
1037 
1038   receiver->receive_rate = gst_util_uint64_scale_round (SECOND, received_bytes,
1039       now - received_bytes_reset_time);
1040 
1041   if (receiver->sender_rtt_on_last_feedback &&
1042       receiver->receive_rate > receiver->max_receive_rate)
1043   {
1044     receiver->max_receive_rate = receiver->receive_rate;
1045     receiver->max_receive_rate_ss = received_bytes / received_packets;
1046   }
1047 
1048   receiver->loss_event_rate = calculate_loss_event_rate (receiver, now);
1049 
1050   if (receiver->sender_rtt)
1051     receiver->feedback_timer_expiry = now + receiver->sender_rtt;
1052   receiver->sender_rtt_on_last_feedback = receiver->sender_rtt;
1053   receiver->feedback_sent_on_last_timer = TRUE;
1054 
1055   DEBUG_RECEIVER (receiver, "P: %f recv_rate: %u", receiver->loss_event_rate,
1056       receiver->receive_rate);
1057 
1058   *receive_rate = receiver->receive_rate;
1059   *loss_event_rate = receiver->loss_event_rate;
1060 
1061   return TRUE;
1062 }
1063 
1064 guint64
tfrc_receiver_get_feedback_timer_expiry(TfrcReceiver * receiver)1065 tfrc_receiver_get_feedback_timer_expiry (TfrcReceiver *receiver)
1066 {
1067   g_assert (receiver->sender_rtt || receiver->feedback_timer_expiry == 0);
1068   return receiver->feedback_timer_expiry;
1069 }
1070 
1071 struct _TfrcIsDataLimited
1072 {
1073   guint64 not_limited_1;
1074   guint64 not_limited_2;
1075   guint64 t_new;
1076   guint64 t_next;
1077 };
1078 
1079 /*
1080  * This implements the algorithm proposed in RFC 5248 section 8.2.1 */
1081 
1082 TfrcIsDataLimited *
tfrc_is_data_limited_new(guint64 now)1083 tfrc_is_data_limited_new (guint64 now)
1084 {
1085   TfrcIsDataLimited *idl = g_slice_new0 (TfrcIsDataLimited);
1086 
1087   return idl;
1088 }
1089 
1090 void
tfrc_is_data_limited_free(TfrcIsDataLimited * idl)1091 tfrc_is_data_limited_free (TfrcIsDataLimited *idl)
1092 {
1093   g_slice_free (TfrcIsDataLimited, idl);
1094 }
1095 
1096 void
tfrc_is_data_limited_not_limited_now(TfrcIsDataLimited * idl,guint64 now)1097 tfrc_is_data_limited_not_limited_now (TfrcIsDataLimited *idl, guint64 now)
1098 {
1099   /* Sender is not data-limited at this instant. */
1100   if (idl->not_limited_1 <= idl->t_new)
1101     /* Goal: NotLimited1 > t_new. */
1102     idl->not_limited_1 = now;
1103   else if (idl->not_limited_2 <= idl->t_next)
1104     /* Goal: NotLimited2 > t_next. */
1105     idl->not_limited_2 = now;
1106 }
1107 
1108 /*
1109  * Returns TRUE if the period since the previous feedback packet
1110  * was data limited
1111  */
1112 gboolean
tfrc_is_data_limited_received_feedback(TfrcIsDataLimited * idl,guint64 now,guint64 last_packet_timestamp,guint rtt)1113 tfrc_is_data_limited_received_feedback (TfrcIsDataLimited *idl, guint64 now,
1114     guint64 last_packet_timestamp, guint rtt)
1115 {
1116   gboolean ret;
1117   guint64 t_old;
1118 
1119   idl->t_new = last_packet_timestamp;
1120   t_old = idl->t_new - rtt;
1121   idl->t_next = now;
1122   if ((t_old < idl->not_limited_1 && idl->not_limited_1 <= idl->t_new) ||
1123       (t_old <  idl->not_limited_2 && idl->not_limited_2 <= idl->t_new))
1124     /* This was NOT a data-limited interval */
1125     ret =  FALSE;
1126   else
1127     /* This was a data-limited interval. */
1128     ret = TRUE;
1129 
1130   if (idl->not_limited_1 <= idl->t_new && idl->not_limited_2 > idl->t_new)
1131       idl->not_limited_1 = idl->not_limited_2;
1132 
1133   return ret;
1134 }
1135