1 /*
2 * Copyright (C) 2001-2009 Jacek Sieka, arnetheduck on gmail point com
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 */
18
19 #include "stdinc.h"
20 #include "DCPlusPlus.h"
21
22 #include "ConnectionManager.h"
23
24 #include "DownloadManager.h"
25 #include "UploadManager.h"
26 #include "CryptoManager.h"
27 #include "ClientManager.h"
28 #include "QueueManager.h"
29 #include "LogManager.h"
30
31 #include "UserConnection.h"
32
33 namespace dcpp {
34
ConnectionManager()35 ConnectionManager::ConnectionManager() : floodCounter(0), server(0), secureServer(0), shuttingDown(false) {
36 TimerManager::getInstance()->addListener(this);
37
38 features.push_back(UserConnection::FEATURE_MINISLOTS);
39 features.push_back(UserConnection::FEATURE_XML_BZLIST);
40 features.push_back(UserConnection::FEATURE_ADCGET);
41 features.push_back(UserConnection::FEATURE_TTHL);
42 features.push_back(UserConnection::FEATURE_TTHF);
43
44 adcFeatures.push_back("AD" + UserConnection::FEATURE_ADC_BAS0);
45 adcFeatures.push_back("AD" + UserConnection::FEATURE_ADC_BASE);
46 adcFeatures.push_back("AD" + UserConnection::FEATURE_ADC_TIGR);
47 adcFeatures.push_back("AD" + UserConnection::FEATURE_ADC_BZIP);
48 }
49
listen()50 void ConnectionManager::listen() throw(SocketException){
51 disconnect();
52 uint16_t port = static_cast<uint16_t>(SETTING(TCP_PORT));
53
54 server = new Server(false, port, SETTING(BIND_ADDRESS));
55
56 if(!CryptoManager::getInstance()->TLSOk()) {
57 dcdebug("Skipping secure port: %d\n", SETTING(USE_TLS));
58 return;
59 }
60
61 port = static_cast<uint16_t>(SETTING(TLS_PORT));
62
63 secureServer = new Server(true, port, SETTING(BIND_ADDRESS));
64 }
65
66 /**
67 * Request a connection for downloading.
68 * DownloadManager::addConnection will be called as soon as the connection is ready
69 * for downloading.
70 * @param aUser The user to connect to.
71 */
getDownloadConnection(const UserPtr & aUser,const string & hubHint)72 void ConnectionManager::getDownloadConnection(const UserPtr& aUser, const string& hubHint) {
73 dcassert((bool)aUser);
74 {
75 Lock l(cs);
76 ConnectionQueueItem::Iter i = find(downloads.begin(), downloads.end(), aUser);
77 if(i == downloads.end()) {
78 getCQI(aUser, true, hubHint);
79 } else {
80 DownloadManager::getInstance()->checkIdle(aUser);
81 }
82 }
83 }
84
getCQI(const UserPtr & aUser,bool download,const string & hubHint)85 ConnectionQueueItem* ConnectionManager::getCQI(const UserPtr& aUser, bool download, const string& hubHint) {
86 ConnectionQueueItem* cqi = new ConnectionQueueItem(aUser, download, hubHint);
87 if(download) {
88 dcassert(find(downloads.begin(), downloads.end(), aUser) == downloads.end());
89 downloads.push_back(cqi);
90 } else {
91 dcassert(find(uploads.begin(), uploads.end(), aUser) == uploads.end());
92 uploads.push_back(cqi);
93 }
94
95 fire(ConnectionManagerListener::Added(), cqi);
96 return cqi;
97 }
98
putCQI(ConnectionQueueItem * cqi)99 void ConnectionManager::putCQI(ConnectionQueueItem* cqi) {
100 fire(ConnectionManagerListener::Removed(), cqi);
101 if(cqi->getDownload()) {
102 dcassert(find(downloads.begin(), downloads.end(), cqi) != downloads.end());
103 downloads.erase(remove(downloads.begin(), downloads.end(), cqi), downloads.end());
104 } else {
105 dcassert(find(uploads.begin(), uploads.end(), cqi) != uploads.end());
106 uploads.erase(remove(uploads.begin(), uploads.end(), cqi), uploads.end());
107 }
108 delete cqi;
109 }
110
getConnection(bool aNmdc,bool secure)111 UserConnection* ConnectionManager::getConnection(bool aNmdc, bool secure) throw() {
112 UserConnection* uc = new UserConnection(secure);
113 uc->addListener(this);
114 {
115 Lock l(cs);
116 userConnections.push_back(uc);
117 }
118 if(aNmdc)
119 uc->setFlag(UserConnection::FLAG_NMDC);
120 return uc;
121 }
122
putConnection(UserConnection * aConn)123 void ConnectionManager::putConnection(UserConnection* aConn) {
124 aConn->removeListener(this);
125 aConn->disconnect();
126
127 Lock l(cs);
128 userConnections.erase(remove(userConnections.begin(), userConnections.end(), aConn), userConnections.end());
129 }
130
on(TimerManagerListener::Second,uint32_t aTick)131 void ConnectionManager::on(TimerManagerListener::Second, uint32_t aTick) throw() {
132 UserList passiveUsers;
133 ConnectionQueueItem::List removed;
134
135 {
136 Lock l(cs);
137
138 bool attemptDone = false;
139
140 for(ConnectionQueueItem::Iter i = downloads.begin(); i != downloads.end(); ++i) {
141 ConnectionQueueItem* cqi = *i;
142
143 if(cqi->getState() != ConnectionQueueItem::ACTIVE) {
144 if(!cqi->getUser()->isOnline()) {
145 // Not online anymore...remove it from the pending...
146 removed.push_back(cqi);
147 continue;
148 }
149
150 if(cqi->getUser()->isSet(User::PASSIVE) && !ClientManager::getInstance()->isActive()) {
151 passiveUsers.push_back(cqi->getUser());
152 removed.push_back(cqi);
153 continue;
154 }
155
156 if(cqi->getLastAttempt() == 0 || (((cqi->getLastAttempt() + 60*1000) < aTick) && !attemptDone)) {
157 cqi->setLastAttempt(aTick);
158
159 QueueItem::Priority prio = QueueManager::getInstance()->hasDownload(cqi->getUser());
160
161 if(prio == QueueItem::PAUSED) {
162 removed.push_back(cqi);
163 continue;
164 }
165
166 bool startDown = DownloadManager::getInstance()->startDownload(prio);
167
168 if(cqi->getState() == ConnectionQueueItem::WAITING) {
169 if(startDown) {
170 cqi->setState(ConnectionQueueItem::CONNECTING);
171 ClientManager::getInstance()->connect(cqi->getUser(), cqi->getToken(), cqi->getHubHint());
172 fire(ConnectionManagerListener::StatusChanged(), cqi);
173 attemptDone = true;
174 } else {
175 cqi->setState(ConnectionQueueItem::NO_DOWNLOAD_SLOTS);
176 fire(ConnectionManagerListener::Failed(), cqi, _("All download slots taken"));
177 }
178 } else if(cqi->getState() == ConnectionQueueItem::NO_DOWNLOAD_SLOTS && startDown) {
179 cqi->setState(ConnectionQueueItem::WAITING);
180 }
181 } else if(((cqi->getLastAttempt() + 50*1000) < aTick) && (cqi->getState() == ConnectionQueueItem::CONNECTING)) {
182 fire(ConnectionManagerListener::Failed(), cqi, _("Connection timeout"));
183 cqi->setState(ConnectionQueueItem::WAITING);
184 }
185 }
186 }
187
188 for(ConnectionQueueItem::Iter m = removed.begin(); m != removed.end(); ++m) {
189 putCQI(*m);
190 }
191
192 }
193
194 for(UserList::iterator ui = passiveUsers.begin(); ui != passiveUsers.end(); ++ui) {
195 QueueManager::getInstance()->removeSource(*ui, QueueItem::Source::FLAG_PASSIVE);
196 }
197 }
198
on(TimerManagerListener::Minute,uint32_t aTick)199 void ConnectionManager::on(TimerManagerListener::Minute, uint32_t aTick) throw() {
200 Lock l(cs);
201
202 for(UserConnectionList::iterator j = userConnections.begin(); j != userConnections.end(); ++j) {
203 if(((*j)->getLastActivity() + 180*1000) < aTick) {
204 (*j)->disconnect(true);
205 }
206 }
207 }
208
209 static const uint32_t FLOOD_TRIGGER = 20000;
210 static const uint32_t FLOOD_ADD = 2000;
211
Server(bool secure_,uint16_t aPort,const string & ip_)212 ConnectionManager::Server::Server(bool secure_, uint16_t aPort, const string& ip_ /* = "0.0.0.0" */) : port(0), secure(secure_), die(false) {
213 sock.create();
214 ip = ip_;
215 port = sock.bind(aPort, ip);
216 sock.listen();
217
218 start();
219 }
220
221 static const uint32_t POLL_TIMEOUT = 250;
222
run()223 int ConnectionManager::Server::run() throw() {
224 while(!die) {
225 try {
226 while(!die) {
227 if(sock.wait(POLL_TIMEOUT, Socket::WAIT_READ) == Socket::WAIT_READ) {
228 ConnectionManager::getInstance()->accept(sock, secure);
229 }
230 }
231 } catch(const Exception& e) {
232 dcdebug("ConnectionManager::Server::run Error: %s\n", e.getError().c_str());
233 }
234
235 bool failed = false;
236 while(!die) {
237 try {
238 sock.disconnect();
239 sock.create();
240 sock.bind(port, ip);
241 sock.listen();
242 if(failed) {
243 LogManager::getInstance()->message(_("Connectivity restored"));
244 failed = false;
245 }
246 } catch(const SocketException& e) {
247 dcdebug("ConnectionManager::Server::run Stopped listening: %s\n", e.getError().c_str());
248
249 if(!failed) {
250 LogManager::getInstance()->message(str(F_("Connectivity error: %1%") % e.getError()));
251 failed = true;
252 }
253
254 // Spin for 60 seconds
255 for(int i = 0; i < 60 && !die; ++i) {
256 Thread::sleep(1000);
257 }
258 }
259 }
260 }
261 return 0;
262 }
263
264 /**
265 * Someone's connecting, accept the connection and wait for identification...
266 * It's always the other fellow that starts sending if he made the connection.
267 */
accept(const Socket & sock,bool secure)268 void ConnectionManager::accept(const Socket& sock, bool secure) throw() {
269 uint64_t now = GET_TICK();
270
271 if(now > floodCounter) {
272 floodCounter = now + FLOOD_ADD;
273 } else {
274 if(false && now + FLOOD_TRIGGER < floodCounter) {
275 Socket s;
276 try {
277 s.accept(sock);
278 } catch(const SocketException&) {
279 // ...
280 }
281 dcdebug("Connection flood detected!\n");
282 return;
283 } else {
284 floodCounter += FLOOD_ADD;
285 }
286 }
287 UserConnection* uc = getConnection(false, secure);
288 uc->setFlag(UserConnection::FLAG_INCOMING);
289 uc->setState(UserConnection::STATE_SUPNICK);
290 uc->setLastActivity(GET_TICK());
291 try {
292 uc->accept(sock);
293 } catch(const Exception&) {
294 putConnection(uc);
295 delete uc;
296 }
297 }
298
nmdcConnect(const string & aServer,uint16_t aPort,const string & aNick,const string & hubUrl,const string & encoding)299 void ConnectionManager::nmdcConnect(const string& aServer, uint16_t aPort, const string& aNick, const string& hubUrl, const string& encoding) {
300 if(shuttingDown)
301 return;
302
303 UserConnection* uc = getConnection(true, false);
304 uc->setToken(aNick);
305 uc->setHubUrl(hubUrl);
306 uc->setEncoding(encoding);
307 uc->setState(UserConnection::STATE_CONNECT);
308 uc->setFlag(UserConnection::FLAG_NMDC);
309 try {
310 uc->connect(aServer, aPort);
311 } catch(const Exception&) {
312 putConnection(uc);
313 delete uc;
314 }
315 }
316
adcConnect(const OnlineUser & aUser,uint16_t aPort,const string & aToken,bool secure)317 void ConnectionManager::adcConnect(const OnlineUser& aUser, uint16_t aPort, const string& aToken, bool secure) {
318 if(shuttingDown)
319 return;
320
321 UserConnection* uc = getConnection(false, secure);
322 uc->setToken(aToken);
323 uc->setEncoding(Text::utf8);
324 uc->setState(UserConnection::STATE_CONNECT);
325 uc->setHubUrl(aUser.getClient().getHubUrl());
326 if(aUser.getIdentity().isOp()) {
327 uc->setFlag(UserConnection::FLAG_OP);
328 }
329 try {
330 uc->connect(aUser.getIdentity().getIp(), aPort);
331 } catch(const Exception&) {
332 putConnection(uc);
333 delete uc;
334 }
335 }
336
disconnect()337 void ConnectionManager::disconnect() throw() {
338 delete server;
339 delete secureServer;
340
341 server = secureServer = 0;
342 }
343
on(AdcCommand::SUP,UserConnection * aSource,const AdcCommand & cmd)344 void ConnectionManager::on(AdcCommand::SUP, UserConnection* aSource, const AdcCommand& cmd) throw() {
345 if(aSource->getState() != UserConnection::STATE_SUPNICK) {
346 // Already got this once, ignore...@todo fix support updates
347 dcdebug("CM::onSUP %p sent sup twice\n", (void*)aSource);
348 return;
349 }
350
351 bool baseOk = false;
352 bool tigrOk = false;
353
354 for(StringIterC i = cmd.getParameters().begin(); i != cmd.getParameters().end(); ++i) {
355 if(i->compare(0, 2, "AD") == 0) {
356 string feat = i->substr(2);
357 if(feat == UserConnection::FEATURE_ADC_BASE || feat == UserConnection::FEATURE_ADC_BAS0) {
358 baseOk = true;
359 // For bas0 tiger is implicit
360 if(feat == UserConnection::FEATURE_ADC_BAS0) {
361 tigrOk = true;
362 }
363 // ADC clients must support all these...
364 aSource->setFlag(UserConnection::FLAG_SUPPORTS_ADCGET);
365 aSource->setFlag(UserConnection::FLAG_SUPPORTS_MINISLOTS);
366 aSource->setFlag(UserConnection::FLAG_SUPPORTS_TTHF);
367 aSource->setFlag(UserConnection::FLAG_SUPPORTS_TTHL);
368 // For compatibility with older clients...
369 aSource->setFlag(UserConnection::FLAG_SUPPORTS_XML_BZLIST);
370 } else if(feat == UserConnection::FEATURE_ZLIB_GET) {
371 aSource->setFlag(UserConnection::FLAG_SUPPORTS_ZLIB_GET);
372 } else if(feat == UserConnection::FEATURE_ADC_BZIP) {
373 aSource->setFlag(UserConnection::FLAG_SUPPORTS_XML_BZLIST);
374 } else if(feat == UserConnection::FEATURE_ADC_TIGR) {
375 tigrOk = true;
376 }
377 }
378 }
379
380 if(!baseOk) {
381 aSource->send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_PROTOCOL_GENERIC, "Invalid SUP"));
382 aSource->disconnect();
383 return;
384 }
385
386 if(aSource->isSet(UserConnection::FLAG_INCOMING)) {
387 StringList defFeatures = adcFeatures;
388 if(BOOLSETTING(COMPRESS_TRANSFERS)) {
389 defFeatures.push_back("AD" + UserConnection::FEATURE_ZLIB_GET);
390 }
391 aSource->sup(defFeatures);
392 aSource->inf(false);
393 } else {
394 aSource->inf(true);
395 }
396 aSource->setState(UserConnection::STATE_INF);
397 }
398
on(AdcCommand::STA,UserConnection *,const AdcCommand & cmd)399 void ConnectionManager::on(AdcCommand::STA, UserConnection*, const AdcCommand& cmd) throw() {
400
401 }
402
on(UserConnectionListener::Connected,UserConnection * aSource)403 void ConnectionManager::on(UserConnectionListener::Connected, UserConnection* aSource) throw() {
404 if(aSource->isSecure() && !aSource->isTrusted() && !BOOLSETTING(ALLOW_UNTRUSTED_CLIENTS)) {
405 putConnection(aSource);
406 QueueManager::getInstance()->removeSource(aSource->getUser(), QueueItem::Source::FLAG_UNTRUSTED);
407 return;
408 }
409
410 dcassert(aSource->getState() == UserConnection::STATE_CONNECT);
411 if(aSource->isSet(UserConnection::FLAG_NMDC)) {
412 aSource->myNick(aSource->getToken());
413 aSource->lock(CryptoManager::getInstance()->getLock(), CryptoManager::getInstance()->getPk() + "Ref=" + aSource->getHubUrl());
414 } else {
415 StringList defFeatures = adcFeatures;
416 if(BOOLSETTING(COMPRESS_TRANSFERS)) {
417 defFeatures.push_back("AD" + UserConnection::FEATURE_ZLIB_GET);
418 }
419 aSource->sup(defFeatures);
420 aSource->send(AdcCommand(AdcCommand::SEV_SUCCESS, AdcCommand::SUCCESS, Util::emptyString).addParam("RF", aSource->getHubUrl()));
421 }
422 aSource->setState(UserConnection::STATE_SUPNICK);
423 }
424
on(UserConnectionListener::MyNick,UserConnection * aSource,const string & aNick)425 void ConnectionManager::on(UserConnectionListener::MyNick, UserConnection* aSource, const string& aNick) throw() {
426 if(aSource->getState() != UserConnection::STATE_SUPNICK) {
427 // Already got this once, ignore...
428 dcdebug("CM::onMyNick %p sent nick twice\n", (void*)aSource);
429 return;
430 }
431
432 dcassert(aNick.size() > 0);
433 dcdebug("ConnectionManager::onMyNick %p, %s\n", (void*)aSource, aNick.c_str());
434 dcassert(!aSource->getUser());
435
436 if(aSource->isSet(UserConnection::FLAG_INCOMING)) {
437 // Try to guess where this came from...
438 pair<string, string> i = expectedConnections.remove(aNick);
439 if(i.second.empty()) {
440 dcassert(i.first.empty());
441 dcdebug("Unknown incoming connection from %s\n", aNick.c_str());
442 putConnection(aSource);
443 return;
444 }
445 aSource->setToken(i.first);
446 aSource->setHubUrl(i.second);
447 aSource->setEncoding(ClientManager::getInstance()->findHubEncoding(i.second));
448 }
449
450 string nick = Text::toUtf8(aNick, aSource->getEncoding());
451 CID cid = ClientManager::getInstance()->makeCid(nick, aSource->getHubUrl());
452
453 // First, we try looking in the pending downloads...hopefully it's one of them...
454 {
455 Lock l(cs);
456 for(ConnectionQueueItem::Iter i = downloads.begin(); i != downloads.end(); ++i) {
457 ConnectionQueueItem* cqi = *i;
458 if((cqi->getState() == ConnectionQueueItem::CONNECTING || cqi->getState() == ConnectionQueueItem::WAITING) && cqi->getUser()->getCID() == cid) {
459 aSource->setUser(cqi->getUser());
460 // Indicate that we're interested in this file...
461 aSource->setFlag(UserConnection::FLAG_DOWNLOAD);
462 break;
463 }
464 }
465 }
466
467 if(!aSource->getUser()) {
468 // Make sure we know who it is, i e that he/she is connected...
469
470 aSource->setUser(ClientManager::getInstance()->findUser(cid));
471 if(!aSource->getUser() || !ClientManager::getInstance()->isOnline(aSource->getUser())) {
472 dcdebug("CM::onMyNick Incoming connection from unknown user %s\n", nick.c_str());
473 putConnection(aSource);
474 return;
475 }
476 // We don't need this connection for downloading...make it an upload connection instead...
477 aSource->setFlag(UserConnection::FLAG_UPLOAD);
478 }
479
480 if(ClientManager::getInstance()->isOp(aSource->getUser(), aSource->getHubUrl()))
481 aSource->setFlag(UserConnection::FLAG_OP);
482
483 if( aSource->isSet(UserConnection::FLAG_INCOMING) ) {
484 aSource->myNick(aSource->getToken());
485 aSource->lock(CryptoManager::getInstance()->getLock(), CryptoManager::getInstance()->getPk());
486 }
487
488 aSource->setState(UserConnection::STATE_LOCK);
489 }
490
on(UserConnectionListener::CLock,UserConnection * aSource,const string & aLock,const string & aPk)491 void ConnectionManager::on(UserConnectionListener::CLock, UserConnection* aSource, const string& aLock, const string& aPk) throw() {
492 if(aSource->getState() != UserConnection::STATE_LOCK) {
493 dcdebug("CM::onLock %p received lock twice, ignoring\n", (void*)aSource);
494 return;
495 }
496
497 if( CryptoManager::getInstance()->isExtended(aLock) ) {
498 // Alright, we have an extended protocol, set a user flag for this user and refresh his info...
499 if( (aPk.find("DCPLUSPLUS") != string::npos) && aSource->getUser() && !aSource->getUser()->isSet(User::DCPLUSPLUS)) {
500 aSource->getUser()->setFlag(User::DCPLUSPLUS);
501 }
502 StringList defFeatures = features;
503 if(BOOLSETTING(COMPRESS_TRANSFERS)) {
504 defFeatures.push_back(UserConnection::FEATURE_ZLIB_GET);
505 }
506
507 aSource->supports(defFeatures);
508 }
509
510 aSource->setState(UserConnection::STATE_DIRECTION);
511 aSource->direction(aSource->getDirectionString(), aSource->getNumber());
512 aSource->key(CryptoManager::getInstance()->makeKey(aLock));
513 }
514
on(UserConnectionListener::Direction,UserConnection * aSource,const string & dir,const string & num)515 void ConnectionManager::on(UserConnectionListener::Direction, UserConnection* aSource, const string& dir, const string& num) throw() {
516 if(aSource->getState() != UserConnection::STATE_DIRECTION) {
517 dcdebug("CM::onDirection %p received direction twice, ignoring\n", (void*)aSource);
518 return;
519 }
520
521 dcassert(aSource->isSet(UserConnection::FLAG_DOWNLOAD) ^ aSource->isSet(UserConnection::FLAG_UPLOAD));
522 if(dir == "Upload") {
523 // Fine, the other fellow want's to send us data...make sure we really want that...
524 if(aSource->isSet(UserConnection::FLAG_UPLOAD)) {
525 // Huh? Strange...disconnect...
526 putConnection(aSource);
527 return;
528 }
529 } else {
530 if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) {
531 int number = Util::toInt(num);
532 // Damn, both want to download...the one with the highest number wins...
533 if(aSource->getNumber() < number) {
534 // Damn! We lost!
535 aSource->unsetFlag(UserConnection::FLAG_DOWNLOAD);
536 aSource->setFlag(UserConnection::FLAG_UPLOAD);
537 } else if(aSource->getNumber() == number) {
538 putConnection(aSource);
539 return;
540 }
541 }
542 }
543
544 dcassert(aSource->isSet(UserConnection::FLAG_DOWNLOAD) ^ aSource->isSet(UserConnection::FLAG_UPLOAD));
545
546 aSource->setState(UserConnection::STATE_KEY);
547 }
548
addDownloadConnection(UserConnection * uc)549 void ConnectionManager::addDownloadConnection(UserConnection* uc) {
550 dcassert(uc->isSet(UserConnection::FLAG_DOWNLOAD));
551 bool addConn = false;
552 {
553 Lock l(cs);
554
555 ConnectionQueueItem::Iter i = find(downloads.begin(), downloads.end(), uc->getUser());
556 if(i != downloads.end()) {
557 ConnectionQueueItem* cqi = *i;
558 if(cqi->getState() == ConnectionQueueItem::WAITING || cqi->getState() == ConnectionQueueItem::CONNECTING) {
559 cqi->setState(ConnectionQueueItem::ACTIVE);
560 uc->setFlag(UserConnection::FLAG_ASSOCIATED);
561
562 fire(ConnectionManagerListener::Connected(), cqi);
563
564 dcdebug("ConnectionManager::addDownloadConnection, leaving to downloadmanager\n");
565 addConn = true;
566 }
567 }
568 }
569
570 if(addConn) {
571 DownloadManager::getInstance()->addConnection(uc);
572 } else {
573 putConnection(uc);
574 }
575 }
576
addUploadConnection(UserConnection * uc)577 void ConnectionManager::addUploadConnection(UserConnection* uc) {
578 dcassert(uc->isSet(UserConnection::FLAG_UPLOAD));
579
580 bool addConn = false;
581 {
582 Lock l(cs);
583
584 ConnectionQueueItem::Iter i = find(uploads.begin(), uploads.end(), uc->getUser());
585 if(i == uploads.end()) {
586 ConnectionQueueItem* cqi = getCQI(uc->getUser(), false, Util::emptyString);
587
588 cqi->setState(ConnectionQueueItem::ACTIVE);
589 uc->setFlag(UserConnection::FLAG_ASSOCIATED);
590
591 fire(ConnectionManagerListener::Connected(), cqi);
592
593 dcdebug("ConnectionManager::addUploadConnection, leaving to uploadmanager\n");
594 addConn = true;
595 }
596 }
597
598 if(addConn) {
599 UploadManager::getInstance()->addConnection(uc);
600 } else {
601 putConnection(uc);
602 }
603 }
604
on(UserConnectionListener::Key,UserConnection * aSource,const string &)605 void ConnectionManager::on(UserConnectionListener::Key, UserConnection* aSource, const string&/* aKey*/) throw() {
606 if(aSource->getState() != UserConnection::STATE_KEY) {
607 dcdebug("CM::onKey Bad state, ignoring");
608 return;
609 }
610
611 dcassert(aSource->getUser());
612
613 if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) {
614 addDownloadConnection(aSource);
615 } else {
616 addUploadConnection(aSource);
617 }
618 }
619
on(AdcCommand::INF,UserConnection * aSource,const AdcCommand & cmd)620 void ConnectionManager::on(AdcCommand::INF, UserConnection* aSource, const AdcCommand& cmd) throw() {
621 if(aSource->getState() != UserConnection::STATE_INF) {
622 aSource->send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_PROTOCOL_GENERIC, "Expecting INF"));
623 aSource->disconnect();
624 return;
625 }
626
627 string cid;
628 if(!cmd.getParam("ID", 0, cid)) {
629 aSource->send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_INF_MISSING, "ID missing").addParam("FL", "ID"));
630 dcdebug("CM::onINF missing ID\n");
631 aSource->disconnect();
632 return;
633 }
634
635 aSource->setUser(ClientManager::getInstance()->findUser(CID(cid)));
636
637 if(!aSource->getUser()) {
638 dcdebug("CM::onINF: User not found");
639 aSource->send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_GENERIC, "User not found"));
640 putConnection(aSource);
641 return;
642 }
643
644 string token;
645 if(aSource->isSet(UserConnection::FLAG_INCOMING)) {
646 if(!cmd.getParam("TO", 0, token)) {
647 aSource->send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_GENERIC, "TO missing"));
648 putConnection(aSource);
649 return;
650 }
651 } else {
652 token = aSource->getToken();
653 }
654
655 bool down = false;
656 {
657 Lock l(cs);
658 ConnectionQueueItem::Iter i = find(downloads.begin(), downloads.end(), aSource->getUser());
659
660 if(i != downloads.end()) {
661 const string& to = (*i)->getToken();
662
663 // 0.698 would send an empty token in some cases...remove this bugfix at some point
664 if(to == token || token.empty()) {
665 down = true;
666 }
667 }
668 /** @todo check tokens for upload connections */
669 }
670
671 if(down) {
672 aSource->setFlag(UserConnection::FLAG_DOWNLOAD);
673 addDownloadConnection(aSource);
674 } else {
675 aSource->setFlag(UserConnection::FLAG_UPLOAD);
676 addUploadConnection(aSource);
677 }
678 }
679
force(const UserPtr & aUser)680 void ConnectionManager::force(const UserPtr& aUser) {
681 Lock l(cs);
682
683 ConnectionQueueItem::Iter i = find(downloads.begin(), downloads.end(), aUser);
684 if(i == downloads.end()) {
685 return;
686 }
687
688 (*i)->setLastAttempt(0);
689 }
690
on(UserConnectionListener::Failed,UserConnection * aSource,const string & aError)691 void ConnectionManager::on(UserConnectionListener::Failed, UserConnection* aSource, const string& aError) throw() {
692 Lock l(cs);
693
694 if(aSource->isSet(UserConnection::FLAG_ASSOCIATED)) {
695 if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) {
696 ConnectionQueueItem::Iter i = find(downloads.begin(), downloads.end(), aSource->getUser());
697 dcassert(i != downloads.end());
698 ConnectionQueueItem* cqi = *i;
699 cqi->setState(ConnectionQueueItem::WAITING);
700 cqi->setLastAttempt(GET_TICK());
701 fire(ConnectionManagerListener::Failed(), cqi, aError);
702 } else if(aSource->isSet(UserConnection::FLAG_UPLOAD)) {
703 ConnectionQueueItem::Iter i = find(uploads.begin(), uploads.end(), aSource->getUser());
704 dcassert(i != uploads.end());
705 ConnectionQueueItem* cqi = *i;
706 putCQI(cqi);
707 }
708 }
709 putConnection(aSource);
710 }
711
disconnect(const UserPtr & aUser)712 void ConnectionManager::disconnect(const UserPtr& aUser) {
713 Lock l(cs);
714 for(UserConnectionList::iterator i = userConnections.begin(); i != userConnections.end(); ++i) {
715 UserConnection* uc = *i;
716 if(uc->getUser() == aUser)
717 uc->disconnect(true);
718 }
719 }
720
disconnect(const UserPtr & aUser,int isDownload)721 void ConnectionManager::disconnect(const UserPtr& aUser, int isDownload) {
722 Lock l(cs);
723 for(UserConnectionList::iterator i = userConnections.begin(); i != userConnections.end(); ++i) {
724 UserConnection* uc = *i;
725 if(uc->getUser() == aUser && uc->isSet(isDownload ? UserConnection::FLAG_DOWNLOAD : UserConnection::FLAG_UPLOAD)) {
726 uc->disconnect(true);
727 break;
728 }
729 }
730 }
731
shutdown()732 void ConnectionManager::shutdown() {
733 TimerManager::getInstance()->removeListener(this);
734 shuttingDown = true;
735 disconnect();
736 {
737 Lock l(cs);
738 for(UserConnectionList::iterator j = userConnections.begin(); j != userConnections.end(); ++j) {
739 (*j)->disconnect(true);
740 }
741 }
742 // Wait until all connections have died out...
743 while(true) {
744 {
745 Lock l(cs);
746 if(userConnections.empty()) {
747 break;
748 }
749 }
750 Thread::sleep(50);
751 }
752 }
753
754 // UserConnectionListener
on(UserConnectionListener::Supports,UserConnection * conn,const StringList & feat)755 void ConnectionManager::on(UserConnectionListener::Supports, UserConnection* conn, const StringList& feat) throw() {
756 for(StringList::const_iterator i = feat.begin(); i != feat.end(); ++i) {
757 if(*i == UserConnection::FEATURE_MINISLOTS) {
758 conn->setFlag(UserConnection::FLAG_SUPPORTS_MINISLOTS);
759 } else if(*i == UserConnection::FEATURE_XML_BZLIST) {
760 conn->setFlag(UserConnection::FLAG_SUPPORTS_XML_BZLIST);
761 } else if(*i == UserConnection::FEATURE_ADCGET) {
762 conn->setFlag(UserConnection::FLAG_SUPPORTS_ADCGET);
763 } else if(*i == UserConnection::FEATURE_ZLIB_GET) {
764 conn->setFlag(UserConnection::FLAG_SUPPORTS_ZLIB_GET);
765 } else if(*i == UserConnection::FEATURE_TTHL) {
766 conn->setFlag(UserConnection::FLAG_SUPPORTS_TTHL);
767 } else if(*i == UserConnection::FEATURE_TTHF) {
768 conn->setFlag(UserConnection::FLAG_SUPPORTS_TTHF);
769 }
770 }
771 }
772
773 } // namespace dcpp
774