1 /* Copyright 2009-2018 Pierre Ossman for Cendio AB
2  *
3  * This is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License as published by
5  * the Free Software Foundation; either version 2 of the License, or
6  * (at your option) any later version.
7  *
8  * This software is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this software; if not, write to the Free Software
15  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307,
16  * USA.
17  */
18 
19 /*
20  * This code implements congestion control in the same way as TCP in
21  * order to avoid excessive latency in the transport. This is needed
22  * because "buffer bloat" is unfortunately still a very real problem.
23  *
24  * The basic principle is TCP Congestion Control (RFC 5618), with the
25  * addition of using the TCP Vegas algorithm. The reason we use Vegas
26  * is that we run on top of a reliable transport so we need a latency
27  * based algorithm rather than a loss based one. There is also a lot of
28  * interpolation of values. This is because we have rather horrible
29  * granularity in our measurements.
30  *
31  * We use a simplistic form of slow start in order to ramp up quickly
32  * from an idle state. We do not have any persistent threshold though
33  * as we have too much noise for it to be reliable.
34  */
35 
36 #include <assert.h>
37 #include <sys/time.h>
38 
39 #ifdef __linux__
40 #include <sys/ioctl.h>
41 #include <sys/socket.h>
42 #include <netinet/in.h>
43 #include <netinet/tcp.h>
44 #include <linux/sockios.h>
45 #endif
46 
47 #include <rfb/Congestion.h>
48 #include <rfb/LogWriter.h>
49 #include <rfb/util.h>
50 
51 // Debug output on what the congestion control is up to
52 #undef CONGESTION_DEBUG
53 
54 // Dump socket congestion window debug trace to disk
55 #undef CONGESTION_TRACE
56 
57 using namespace rfb;
58 
59 // This window should get us going fairly fast on a decent bandwidth network.
60 // If it's too high, it will rapidly be reduced and stay low.
61 static const unsigned INITIAL_WINDOW = 16384;
62 
63 // TCP's minimal window is 3*MSS. But since we don't know the MSS, we
64 // make a guess at 4 KiB (it's probably a bit higher).
65 static const unsigned MINIMUM_WINDOW = 4096;
66 
67 // The current default maximum window for Linux (4 MiB). Should be a good
68 // limit for now...
69 static const unsigned MAXIMUM_WINDOW = 4194304;
70 
71 // Compare position even when wrapped around
isAfter(unsigned a,unsigned b)72 static inline bool isAfter(unsigned a, unsigned b) {
73   return a != b && a - b <= UINT_MAX / 2;
74 }
75 
76 static LogWriter vlog("Congestion");
77 
Congestion()78 Congestion::Congestion() :
79     lastPosition(0), extraBuffer(0),
80     baseRTT(-1), congWindow(INITIAL_WINDOW), inSlowStart(true),
81     safeBaseRTT(-1), measurements(0), minRTT(-1), minCongestedRTT(-1)
82 {
83   gettimeofday(&lastUpdate, NULL);
84   gettimeofday(&lastSent, NULL);
85   memset(&lastPong, 0, sizeof(lastPong));
86   gettimeofday(&lastPongArrival, NULL);
87   gettimeofday(&lastAdjustment, NULL);
88 }
89 
~Congestion()90 Congestion::~Congestion()
91 {
92 }
93 
updatePosition(unsigned pos)94 void Congestion::updatePosition(unsigned pos)
95 {
96   struct timeval now;
97   unsigned delta, consumed;
98 
99   gettimeofday(&now, NULL);
100 
101   delta = pos - lastPosition;
102   if ((delta > 0) || (extraBuffer > 0))
103     lastSent = now;
104 
105   // Idle for too long?
106   // We use a very crude RTO calculation in order to keep things simple
107   // FIXME: should implement RFC 2861
108   if (msBetween(&lastSent, &now) > __rfbmax(baseRTT*2, 100)) {
109 
110 #ifdef CONGESTION_DEBUG
111     vlog.debug("Connection idle for %d ms, resetting congestion control",
112                msBetween(&lastSent, &now));
113 #endif
114 
115     // Close congestion window and redo wire latency measurement
116     congWindow = __rfbmin(INITIAL_WINDOW, congWindow);
117     baseRTT = -1;
118     measurements = 0;
119     gettimeofday(&lastAdjustment, NULL);
120     minRTT = minCongestedRTT = -1;
121     inSlowStart = true;
122   }
123 
124   // Commonly we will be in a state of overbuffering. We need to
125   // estimate the extra delay that causes so we can separate it from
126   // the delay caused by an incorrect congestion window.
127   // (we cannot do this until we have a RTT measurement though)
128   if (baseRTT != (unsigned)-1) {
129     extraBuffer += delta;
130     consumed = msBetween(&lastUpdate, &now) * congWindow / baseRTT;
131     if (extraBuffer < consumed)
132       extraBuffer = 0;
133     else
134       extraBuffer -= consumed;
135   }
136 
137   lastPosition = pos;
138   lastUpdate = now;
139 }
140 
sentPing()141 void Congestion::sentPing()
142 {
143   struct RTTInfo rttInfo;
144 
145   memset(&rttInfo, 0, sizeof(struct RTTInfo));
146 
147   gettimeofday(&rttInfo.tv, NULL);
148   rttInfo.pos = lastPosition;
149   rttInfo.extra = getExtraBuffer();
150   rttInfo.congested = isCongested();
151 
152   pings.push_back(rttInfo);
153 }
154 
gotPong()155 void Congestion::gotPong()
156 {
157   struct timeval now;
158   struct RTTInfo rttInfo;
159   unsigned rtt, delay;
160 
161   if (pings.empty())
162     return;
163 
164   gettimeofday(&now, NULL);
165 
166   rttInfo = pings.front();
167   pings.pop_front();
168 
169   lastPong = rttInfo;
170   lastPongArrival = now;
171 
172   rtt = msBetween(&rttInfo.tv, &now);
173   if (rtt < 1)
174     rtt = 1;
175 
176   // Try to estimate wire latency by tracking lowest seen latency
177   if (rtt < baseRTT)
178     safeBaseRTT = baseRTT = rtt;
179 
180   // Pings sent before the last adjustment aren't interesting as they
181   // aren't a measurement of the current congestion window
182   if (isBefore(&rttInfo.tv, &lastAdjustment))
183     return;
184 
185   // Estimate added delay because of overtaxed buffers (see above)
186   delay = rttInfo.extra * baseRTT / congWindow;
187   if (delay < rtt)
188     rtt -= delay;
189   else
190     rtt = 1;
191 
192   // A latency less than the wire latency means that we've
193   // understimated the congestion window. We can't really determine
194   // how much, so pretend that we got no buffer latency at all.
195   if (rtt < baseRTT)
196     rtt = baseRTT;
197 
198   // Record the minimum seen delay (hopefully ignores jitter) and let
199   // the congestion control do its thing.
200   //
201   // Note: We are delay based rather than loss based, which means we
202   //       need to look at pongs even if they weren't limited by the
203   //       current window ("congested"). Otherwise we will fail to
204   //       detect increasing congestion until the application exceeds
205   //       the congestion window.
206   if (rtt < minRTT)
207     minRTT = rtt;
208   if (rttInfo.congested) {
209     if (rtt < minCongestedRTT)
210       minCongestedRTT = rtt;
211   }
212 
213   measurements++;
214   updateCongestion();
215 }
216 
isCongested()217 bool Congestion::isCongested()
218 {
219   if (getInFlight() < congWindow)
220     return false;
221 
222   return true;
223 }
224 
getUncongestedETA()225 int Congestion::getUncongestedETA()
226 {
227   unsigned targetAcked;
228 
229   const struct RTTInfo* prevPing;
230   unsigned eta, elapsed;
231   unsigned etaNext, delay;
232 
233   std::list<struct RTTInfo>::const_iterator iter;
234 
235   targetAcked = lastPosition - congWindow;
236 
237   // Simple case?
238   if (isAfter(lastPong.pos, targetAcked))
239     return 0;
240 
241   // No measurements yet?
242   if (baseRTT == (unsigned)-1)
243     return -1;
244 
245   prevPing = &lastPong;
246   eta = 0;
247   elapsed = msSince(&lastPongArrival);
248 
249   // Walk the ping queue and figure out which one we are waiting for to
250   // get to an uncongested state
251 
252   for (iter = pings.begin(); ;++iter) {
253     struct RTTInfo curPing;
254 
255     // If we aren't waiting for a pong that will clear the congested
256     // state then we have to estimate the final bit by pretending that
257     // we had a ping just after the last position update.
258     if (iter == pings.end()) {
259       curPing.tv = lastUpdate;
260       curPing.pos = lastPosition;
261       curPing.extra = extraBuffer;
262     } else {
263       curPing = *iter;
264     }
265 
266     etaNext = msBetween(&prevPing->tv, &curPing.tv);
267     // Compensate for buffering delays
268     delay = curPing.extra * baseRTT / congWindow;
269     etaNext += delay;
270     delay = prevPing->extra * baseRTT / congWindow;
271     if (delay >= etaNext)
272       etaNext = 0;
273     else
274       etaNext -= delay;
275 
276     // Found it?
277     if (isAfter(curPing.pos, targetAcked)) {
278       eta += etaNext * (curPing.pos - targetAcked) / (curPing.pos - prevPing->pos);
279       if (elapsed > eta)
280         return 0;
281       else
282         return eta - elapsed;
283     }
284 
285     assert(iter != pings.end());
286 
287     eta += etaNext;
288     prevPing = &*iter;
289   }
290 }
291 
getBandwidth()292 size_t Congestion::getBandwidth()
293 {
294   size_t bandwidth;
295 
296   // No measurements yet? Guess RTT of 60 ms
297   if (safeBaseRTT == (unsigned)-1)
298     bandwidth = congWindow * 1000 / 60;
299   else
300     bandwidth = congWindow * 1000 / safeBaseRTT;
301 
302   // We're still probing so guess actual bandwidth is halfway between
303   // the current guess and the next one (slow start doubles each time)
304   if (inSlowStart)
305     bandwidth = bandwidth + bandwidth / 2;
306 
307   return bandwidth;
308 }
309 
debugTrace(const char * filename,int fd)310 void Congestion::debugTrace(const char* filename, int fd)
311 {
312 #ifdef CONGESTION_TRACE
313 #ifdef __linux__
314   FILE *f;
315   f = fopen(filename, "ab");
316   if (f != NULL) {
317     struct tcp_info info;
318     int buffered;
319     socklen_t len;
320     len = sizeof(info);
321     if ((getsockopt(fd, IPPROTO_TCP,
322                     TCP_INFO, &info, &len) == 0) &&
323         (ioctl(fd, SIOCOUTQ, &buffered) == 0)) {
324       struct timeval now;
325       gettimeofday(&now, NULL);
326       fprintf(f, "%u.%06u,%u,%u,%u,%u\n",
327               (unsigned)now.tv_sec, (unsigned)now.tv_usec,
328               congWindow, info.tcpi_snd_cwnd * info.tcpi_snd_mss,
329               getInFlight(), buffered);
330     }
331     fclose(f);
332   }
333 #endif
334 #endif
335 }
336 
getExtraBuffer()337 unsigned Congestion::getExtraBuffer()
338 {
339   unsigned elapsed;
340   unsigned consumed;
341 
342   if (baseRTT == (unsigned)-1)
343     return 0;
344 
345   elapsed = msSince(&lastUpdate);
346   consumed = elapsed * congWindow / baseRTT;
347 
348   if (consumed >= extraBuffer)
349     return 0;
350   else
351     return extraBuffer - consumed;
352 }
353 
getInFlight()354 unsigned Congestion::getInFlight()
355 {
356   struct RTTInfo nextPong;
357   unsigned etaNext, delay, elapsed, acked;
358 
359   // Simple case?
360   if (lastPosition == lastPong.pos)
361     return 0;
362 
363   // No measurements yet?
364   if (baseRTT == (unsigned)-1) {
365     if (!pings.empty())
366       return lastPosition - pings.front().pos;
367     return 0;
368   }
369 
370   // If we aren't waiting for any pong then we have to estimate things
371   // by pretending that we had a ping just after the last position
372   // update.
373   if (pings.empty()) {
374     nextPong.tv = lastUpdate;
375     nextPong.pos = lastPosition;
376     nextPong.extra = extraBuffer;
377   } else {
378     nextPong = pings.front();
379   }
380 
381   // First we need to estimate how many bytes have made it through
382   // completely. Look at the next ping that should arrive and figure
383   // out how far behind it should be and interpolate the positions.
384 
385   etaNext = msBetween(&lastPong.tv, &nextPong.tv);
386   // Compensate for buffering delays
387   delay = nextPong.extra * baseRTT / congWindow;
388   etaNext += delay;
389   delay = lastPong.extra * baseRTT / congWindow;
390   if (delay >= etaNext)
391     etaNext = 0;
392   else
393     etaNext -= delay;
394 
395   elapsed = msSince(&lastPongArrival);
396 
397   // The pong should be here any second. Be optimistic and assume
398   // we can already use its value.
399   if (etaNext <= elapsed)
400     acked = nextPong.pos;
401   else {
402     acked = lastPong.pos;
403     acked += (nextPong.pos - lastPong.pos) * elapsed / etaNext;
404   }
405 
406   return lastPosition - acked;
407 }
408 
updateCongestion()409 void Congestion::updateCongestion()
410 {
411   unsigned diff;
412 
413   // We want at least three measurements to avoid noise
414   if (measurements < 3)
415     return;
416 
417   assert(minRTT >= baseRTT);
418   assert(minCongestedRTT >= baseRTT);
419 
420   // The goal is to have a slightly too large congestion window since
421   // a "perfect" one cannot be distinguished from a too small one. This
422   // translates to a goal of a few extra milliseconds of delay.
423 
424   diff = minRTT - baseRTT;
425 
426   if (diff > __rfbmax(100, baseRTT/2)) {
427     // We have no way of detecting loss, so assume massive latency
428     // spike means packet loss. Adjust the window and go directly
429     // to congestion avoidance.
430 #ifdef CONGESTION_DEBUG
431     vlog.debug("Latency spike! Backing off...");
432 #endif
433     congWindow = congWindow * baseRTT / minRTT;
434     inSlowStart = false;
435   }
436 
437   if (inSlowStart) {
438     // Slow start. Aggressive growth until we see congestion.
439 
440     if (diff > 25) {
441       // If we see an increased latency then we assume we've hit the
442       // limit and it's time to leave slow start and switch to
443       // congestion avoidance
444       congWindow = congWindow * baseRTT / minRTT;
445       inSlowStart = false;
446     } else {
447       // It's not safe to increase unless we actually used the entire
448       // congestion window, hence we look at minCongestedRTT and not
449       // minRTT
450 
451       diff = minCongestedRTT - baseRTT;
452       if (diff < 25)
453         congWindow *= 2;
454     }
455   } else {
456     // Congestion avoidance (VEGAS)
457 
458     if (diff > 50) {
459       // Slightly too fast
460       congWindow -= 4096;
461     } else {
462       // Only the "congested" pongs are checked to see if the
463       // window is too small.
464 
465       diff = minCongestedRTT - baseRTT;
466 
467       if (diff < 5) {
468         // Way too slow
469         congWindow += 8192;
470       } else if (diff < 25) {
471         // Too slow
472         congWindow += 4096;
473       }
474     }
475   }
476 
477   if (congWindow < MINIMUM_WINDOW)
478     congWindow = MINIMUM_WINDOW;
479   if (congWindow > MAXIMUM_WINDOW)
480     congWindow = MAXIMUM_WINDOW;
481 
482 #ifdef CONGESTION_DEBUG
483   vlog.debug("RTT: %d/%d ms (%d ms), Window: %d KiB, Bandwidth: %g Mbps%s",
484              minRTT, minCongestedRTT, baseRTT, congWindow / 1024,
485              congWindow * 8.0 / baseRTT / 1000.0,
486              inSlowStart ? " (slow start)" : "");
487 #endif
488 
489   measurements = 0;
490   gettimeofday(&lastAdjustment, NULL);
491   minRTT = minCongestedRTT = -1;
492 }
493 
494