1 /*
2 * Copyright (C) 2009-2012 Jacek Sieka, arnetheduck on gmail point com
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #include "stdinc.h"
20
21 #include "ThrottleManager.h"
22
23 #include "DownloadManager.h"
24 #include "Singleton.h"
25 #include "Socket.h"
26 #include "Thread.h"
27 #include "TimerManager.h"
28 #include "UploadManager.h"
29 #include "ClientManager.h"
30
31 namespace dcpp {
32 /**
33 * Manager for throttling traffic flow.
34 * Inspired by Token Bucket algorithm: http://en.wikipedia.org/wiki/Token_bucket
35 */
36
37 /*
38 * Throttles traffic and reads a packet from the network
39 */
read(Socket * sock,void * buffer,size_t len)40 int ThrottleManager::read(Socket* sock, void* buffer, size_t len)
41 {
42 int64_t readSize = -1;
43 size_t downs = DownloadManager::getInstance()->getDownloadCount();
44 auto downLimit = getDownLimit(); // avoid even intra-function races
45 if(!BOOLSETTING(THROTTLE_ENABLE) || !getCurThrottling() || downLimit == 0 || downs == 0)
46 return sock->read(buffer, len);
47
48 {
49 Lock l(downCS);
50
51 if(downTokens > 0)
52 {
53 int64_t slice = (downLimit * 1024) / downs;
54 readSize = min(slice, min(static_cast<int64_t>(len), downTokens));
55
56 // read from socket
57 readSize = sock->read(buffer, static_cast<size_t>(readSize));
58
59 if(readSize > 0)
60 downTokens -= readSize;
61 }
62 }
63
64 if(readSize != -1)
65 {
66 Thread::yield(); // give a chance to other transfers to get a token
67 return readSize;
68 }
69
70 waitToken();
71 return -1; // from BufferedSocket: -1 = retry, 0 = connection close
72 }
73
74 /*
75 * Throttles traffic and writes a packet to the network
76 * Handle this a little bit differently than downloads due to OpenSSL stupidity
77 */
write(Socket * sock,void * buffer,size_t & len)78 int ThrottleManager::write(Socket* sock, void* buffer, size_t& len)
79 {
80 bool gotToken = false;
81 size_t ups = UploadManager::getInstance()->getUploadCount();
82 auto upLimit = getUpLimit(); // avoid even intra-function races
83 if(!BOOLSETTING(THROTTLE_ENABLE) || !getCurThrottling() || upLimit == 0 || ups == 0)
84 return sock->write(buffer, len);
85
86 {
87 Lock l(upCS);
88
89 if(upTokens > 0)
90 {
91 size_t slice = (upLimit * 1024) / ups;
92 len = min(slice, min(len, static_cast<size_t>(upTokens)));
93 upTokens -= len;
94
95 gotToken = true; // token successfuly assigned
96 }
97 }
98
99 if(gotToken)
100 {
101 // write to socket
102 int sent = sock->write(buffer, len);
103
104 Thread::yield(); // give a chance to other transfers get a token
105 return sent;
106 }
107
108 waitToken();
109 return 0; // from BufferedSocket: -1 = failed, 0 = retry
110 }
111
getCurSetting(SettingsManager::IntSetting setting)112 SettingsManager::IntSetting ThrottleManager::getCurSetting(SettingsManager::IntSetting setting) {
113 SettingsManager::IntSetting upLimit = SettingsManager::MAX_UPLOAD_SPEED_MAIN;
114 SettingsManager::IntSetting downLimit = SettingsManager::MAX_DOWNLOAD_SPEED_MAIN;
115 SettingsManager::IntSetting slots = SettingsManager::SLOTS_PRIMARY;
116
117 if(BOOLSETTING(TIME_DEPENDENT_THROTTLE)) {
118 time_t currentTime;
119 time(¤tTime);
120 int currentHour = localtime(¤tTime)->tm_hour;
121 if((SETTING(BANDWIDTH_LIMIT_START) < SETTING(BANDWIDTH_LIMIT_END) &&
122 currentHour >= SETTING(BANDWIDTH_LIMIT_START) && currentHour < SETTING(BANDWIDTH_LIMIT_END)) ||
123 (SETTING(BANDWIDTH_LIMIT_START) > SETTING(BANDWIDTH_LIMIT_END) &&
124 (currentHour >= SETTING(BANDWIDTH_LIMIT_START) || currentHour < SETTING(BANDWIDTH_LIMIT_END))))
125 {
126 upLimit = SettingsManager::MAX_UPLOAD_SPEED_ALTERNATE;
127 downLimit = SettingsManager::MAX_DOWNLOAD_SPEED_ALTERNATE;
128 slots = SettingsManager::SLOTS_ALTERNATE_LIMITING;
129 }
130 }
131
132 switch (setting) {
133 case SettingsManager::MAX_UPLOAD_SPEED_MAIN:
134 return upLimit;
135 case SettingsManager::MAX_DOWNLOAD_SPEED_MAIN:
136 return downLimit;
137 case SettingsManager::SLOTS:
138 return slots;
139 default:
140 return setting;
141 }
142 }
143
getUpLimit()144 int ThrottleManager::getUpLimit() {
145 return SettingsManager::getInstance()->get(getCurSetting(SettingsManager::MAX_UPLOAD_SPEED_MAIN));
146 }
147
getDownLimit()148 int ThrottleManager::getDownLimit() {
149 return SettingsManager::getInstance()->get(getCurSetting(SettingsManager::MAX_DOWNLOAD_SPEED_MAIN));
150 }
151
setSetting(SettingsManager::IntSetting setting,int value)152 void ThrottleManager::setSetting(SettingsManager::IntSetting setting, int value) {
153 SettingsManager::getInstance()->set(setting, value);
154 ClientManager::getInstance()->infoUpdated();
155 }
156
getCurThrottling()157 bool ThrottleManager::getCurThrottling() {
158 Lock l(stateCS);
159 return activeWaiter != -1;
160 }
161
waitToken()162 void ThrottleManager::waitToken() {
163 // no tokens, wait for them, so long as throttling still active
164 // avoid keeping stateCS lock on whole function
165 CriticalSection *curCS = 0;
166 {
167 Lock l(stateCS);
168 if (activeWaiter != -1)
169 curCS = &waitCS[activeWaiter];
170 }
171 // possible post-CS aW shifts: 0->1/1->0: lock lands in wrong place, will
172 // either fall through immediately or wait depending on whether in
173 // stateCS-protected transition elsewhere; 0/1-> -1: falls through. Both harmless.
174 if (curCS)
175 Lock l(*curCS);
176 }
177
~ThrottleManager(void)178 ThrottleManager::~ThrottleManager(void)
179 {
180 shutdown();
181 TimerManager::getInstance()->removeListener(this);
182 }
183
184 #ifdef _WIN32
185
shutdown()186 void ThrottleManager::shutdown() {
187 Lock l(stateCS);
188 if (activeWaiter != -1) {
189 waitCS[activeWaiter].unlock();
190 activeWaiter = -1;
191 }
192 }
193 #else //*nix
194
shutdown()195 void ThrottleManager::shutdown()
196 {
197 bool wait = false;
198 {
199 Lock l(stateCS);
200 if (activeWaiter != -1)
201 {
202 n_lock = activeWaiter;
203 activeWaiter = -1;
204 halt = 1;
205 wait = true;
206 }
207 }
208
209 // wait shutdown...
210 if (wait)
211 {
212 Lock l(shutdownCS);
213 }
214 }
215 #endif //*nix
216
217 // TimerManagerListener
on(TimerManagerListener::Second,uint64_t)218 void ThrottleManager::on(TimerManagerListener::Second, uint64_t /* aTick */) noexcept
219 {
220 int newSlots = SettingsManager::getInstance()->get(getCurSetting(SettingsManager::SLOTS));
221 if(newSlots != SETTING(SLOTS)) {
222 setSetting(SettingsManager::SLOTS, newSlots);
223 }
224
225 {
226 Lock l(stateCS);
227
228 #ifndef _WIN32 //*nix
229
230 if (halt == 1)
231 {
232 halt = -1;
233
234 // unlock shutdown and token wait
235 dcassert(n_lock == 0 || n_lock == 1);
236 waitCS[n_lock].unlock();
237 shutdownCS.unlock();
238
239 return;
240 }
241 else if (halt == -1)
242 {
243 return;
244 }
245 #endif
246 if (activeWaiter == -1)
247 {
248 // This will create slight weirdness for the read/write calls between
249 // here and the first activeWaiter-toggle below.
250 waitCS[activeWaiter = 0].lock();
251
252 #ifndef _WIN32 //*nix
253
254 // lock shutdown
255 shutdownCS.lock();
256 #endif
257 }
258 }
259
260 int downLimit = getDownLimit();
261 int upLimit = getUpLimit();
262
263 // readd tokens
264 {
265 Lock l(downCS);
266 downTokens = downLimit * 1024;
267 }
268
269 {
270 Lock l(upCS);
271 upTokens = upLimit * 1024;
272 }
273
274 // let existing events drain out (fairness).
275 // www.cse.wustl.edu/~schmidt/win32-cv-1.html documents various
276 // fairer strategies, but when only broadcasting, irrelevant
277 {
278 Lock l(stateCS);
279
280 dcassert(activeWaiter == 0 || activeWaiter == 1);
281 waitCS[1-activeWaiter].lock();
282 activeWaiter = 1-activeWaiter;
283 waitCS[1-activeWaiter].unlock();
284 }
285 }
286
287 } // namespace dcpp
288