1 /*
2  * Copyright (C) 2009-2012 Jacek Sieka, arnetheduck on gmail point com
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17  */
18 
19 #include "stdinc.h"
20 
21 #include "ThrottleManager.h"
22 
23 #include "DownloadManager.h"
24 #include "Singleton.h"
25 #include "Socket.h"
26 #include "Thread.h"
27 #include "TimerManager.h"
28 #include "UploadManager.h"
29 #include "ClientManager.h"
30 
31 namespace dcpp {
32 /**
33  * Manager for throttling traffic flow.
34  * Inspired by Token Bucket algorithm: http://en.wikipedia.org/wiki/Token_bucket
35  */
36 
37 /*
38  * Throttles traffic and reads a packet from the network
39  */
read(Socket * sock,void * buffer,size_t len)40 int ThrottleManager::read(Socket* sock, void* buffer, size_t len)
41 {
42     int64_t readSize = -1;
43     size_t downs = DownloadManager::getInstance()->getDownloadCount();
44     auto downLimit = getDownLimit(); // avoid even intra-function races
45     if(!BOOLSETTING(THROTTLE_ENABLE) || !getCurThrottling() || downLimit == 0 || downs == 0)
46         return sock->read(buffer, len);
47 
48     {
49         Lock l(downCS);
50 
51         if(downTokens > 0)
52         {
53             int64_t slice = (downLimit * 1024) / downs;
54             readSize = min(slice, min(static_cast<int64_t>(len), downTokens));
55 
56             // read from socket
57             readSize = sock->read(buffer, static_cast<size_t>(readSize));
58 
59             if(readSize > 0)
60                 downTokens -= readSize;
61         }
62     }
63 
64     if(readSize != -1)
65     {
66         Thread::yield(); // give a chance to other transfers to get a token
67         return readSize;
68     }
69 
70     waitToken();
71     return -1;  // from BufferedSocket: -1 = retry, 0 = connection close
72 }
73 
74 /*
75  * Throttles traffic and writes a packet to the network
76  * Handle this a little bit differently than downloads due to OpenSSL stupidity
77  */
write(Socket * sock,void * buffer,size_t & len)78 int ThrottleManager::write(Socket* sock, void* buffer, size_t& len)
79 {
80     bool gotToken = false;
81     size_t ups = UploadManager::getInstance()->getUploadCount();
82     auto upLimit = getUpLimit(); // avoid even intra-function races
83     if(!BOOLSETTING(THROTTLE_ENABLE) || !getCurThrottling() || upLimit == 0 || ups == 0)
84         return sock->write(buffer, len);
85 
86     {
87         Lock l(upCS);
88 
89         if(upTokens > 0)
90         {
91             size_t slice = (upLimit * 1024) / ups;
92             len = min(slice, min(len, static_cast<size_t>(upTokens)));
93             upTokens -= len;
94 
95             gotToken = true; // token successfuly assigned
96         }
97     }
98 
99     if(gotToken)
100     {
101         // write to socket
102         int sent = sock->write(buffer, len);
103 
104         Thread::yield(); // give a chance to other transfers get a token
105         return sent;
106     }
107 
108     waitToken();
109     return 0;   // from BufferedSocket: -1 = failed, 0 = retry
110 }
111 
getCurSetting(SettingsManager::IntSetting setting)112 SettingsManager::IntSetting ThrottleManager::getCurSetting(SettingsManager::IntSetting setting) {
113     SettingsManager::IntSetting upLimit   = SettingsManager::MAX_UPLOAD_SPEED_MAIN;
114     SettingsManager::IntSetting downLimit = SettingsManager::MAX_DOWNLOAD_SPEED_MAIN;
115     SettingsManager::IntSetting slots     = SettingsManager::SLOTS_PRIMARY;
116 
117     if(BOOLSETTING(TIME_DEPENDENT_THROTTLE)) {
118         time_t currentTime;
119         time(&currentTime);
120         int currentHour = localtime(&currentTime)->tm_hour;
121         if((SETTING(BANDWIDTH_LIMIT_START) < SETTING(BANDWIDTH_LIMIT_END) &&
122             currentHour >= SETTING(BANDWIDTH_LIMIT_START) && currentHour < SETTING(BANDWIDTH_LIMIT_END)) ||
123             (SETTING(BANDWIDTH_LIMIT_START) > SETTING(BANDWIDTH_LIMIT_END) &&
124             (currentHour >= SETTING(BANDWIDTH_LIMIT_START) || currentHour < SETTING(BANDWIDTH_LIMIT_END))))
125         {
126             upLimit   = SettingsManager::MAX_UPLOAD_SPEED_ALTERNATE;
127             downLimit = SettingsManager::MAX_DOWNLOAD_SPEED_ALTERNATE;
128             slots     = SettingsManager::SLOTS_ALTERNATE_LIMITING;
129         }
130     }
131 
132     switch (setting) {
133         case SettingsManager::MAX_UPLOAD_SPEED_MAIN:
134             return upLimit;
135         case SettingsManager::MAX_DOWNLOAD_SPEED_MAIN:
136             return downLimit;
137         case SettingsManager::SLOTS:
138             return slots;
139         default:
140             return setting;
141     }
142 }
143 
getUpLimit()144 int ThrottleManager::getUpLimit() {
145     return SettingsManager::getInstance()->get(getCurSetting(SettingsManager::MAX_UPLOAD_SPEED_MAIN));
146 }
147 
getDownLimit()148 int ThrottleManager::getDownLimit() {
149     return SettingsManager::getInstance()->get(getCurSetting(SettingsManager::MAX_DOWNLOAD_SPEED_MAIN));
150 }
151 
setSetting(SettingsManager::IntSetting setting,int value)152 void ThrottleManager::setSetting(SettingsManager::IntSetting setting, int value) {
153         SettingsManager::getInstance()->set(setting, value);
154         ClientManager::getInstance()->infoUpdated();
155 }
156 
getCurThrottling()157 bool ThrottleManager::getCurThrottling() {
158     Lock l(stateCS);
159     return activeWaiter != -1;
160 }
161 
waitToken()162 void ThrottleManager::waitToken() {
163     // no tokens, wait for them, so long as throttling still active
164     // avoid keeping stateCS lock on whole function
165     CriticalSection *curCS = 0;
166     {
167         Lock l(stateCS);
168         if (activeWaiter != -1)
169             curCS = &waitCS[activeWaiter];
170     }
171     // possible post-CS aW shifts: 0->1/1->0: lock lands in wrong place, will
172     // either fall through immediately or wait depending on whether in
173     // stateCS-protected transition elsewhere; 0/1-> -1: falls through. Both harmless.
174     if (curCS)
175         Lock l(*curCS);
176 }
177 
~ThrottleManager(void)178 ThrottleManager::~ThrottleManager(void)
179 {
180     shutdown();
181     TimerManager::getInstance()->removeListener(this);
182 }
183 
184 #ifdef _WIN32
185 
shutdown()186 void ThrottleManager::shutdown() {
187     Lock l(stateCS);
188     if (activeWaiter != -1) {
189         waitCS[activeWaiter].unlock();
190         activeWaiter = -1;
191     }
192 }
193 #else //*nix
194 
shutdown()195 void ThrottleManager::shutdown()
196 {
197     bool wait = false;
198     {
199         Lock l(stateCS);
200         if (activeWaiter != -1)
201         {
202             n_lock = activeWaiter;
203             activeWaiter = -1;
204             halt = 1;
205             wait = true;
206         }
207     }
208 
209     // wait shutdown...
210     if (wait)
211     {
212         Lock l(shutdownCS);
213     }
214 }
215 #endif //*nix
216 
217 // TimerManagerListener
on(TimerManagerListener::Second,uint64_t)218 void ThrottleManager::on(TimerManagerListener::Second, uint64_t /* aTick */) noexcept
219 {
220     int newSlots = SettingsManager::getInstance()->get(getCurSetting(SettingsManager::SLOTS));
221     if(newSlots != SETTING(SLOTS)) {
222         setSetting(SettingsManager::SLOTS, newSlots);
223     }
224 
225     {
226         Lock l(stateCS);
227 
228 #ifndef _WIN32 //*nix
229 
230         if (halt == 1)
231         {
232             halt = -1;
233 
234             // unlock shutdown and token wait
235             dcassert(n_lock == 0 || n_lock == 1);
236             waitCS[n_lock].unlock();
237             shutdownCS.unlock();
238 
239             return;
240         }
241         else if (halt == -1)
242         {
243             return;
244         }
245 #endif
246         if (activeWaiter == -1)
247         {
248             // This will create slight weirdness for the read/write calls between
249             // here and the first activeWaiter-toggle below.
250             waitCS[activeWaiter = 0].lock();
251 
252 #ifndef _WIN32 //*nix
253 
254             // lock shutdown
255             shutdownCS.lock();
256 #endif
257         }
258     }
259 
260     int downLimit = getDownLimit();
261     int upLimit   = getUpLimit();
262 
263     // readd tokens
264     {
265         Lock l(downCS);
266         downTokens = downLimit * 1024;
267     }
268 
269     {
270         Lock l(upCS);
271         upTokens = upLimit * 1024;
272     }
273 
274     // let existing events drain out (fairness).
275     // www.cse.wustl.edu/~schmidt/win32-cv-1.html documents various
276     // fairer strategies, but when only broadcasting, irrelevant
277     {
278         Lock l(stateCS);
279 
280         dcassert(activeWaiter == 0 || activeWaiter == 1);
281         waitCS[1-activeWaiter].lock();
282         activeWaiter = 1-activeWaiter;
283         waitCS[1-activeWaiter].unlock();
284     }
285 }
286 
287 }   // namespace dcpp
288