1 /*
2 * Copyright (C) 2001-2012 Jacek Sieka, arnetheduck on gmail point com
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #include "stdinc.h"
20
21 #include "ConnectionManager.h"
22
23 #include "DownloadManager.h"
24 #include "UploadManager.h"
25 #include "CryptoManager.h"
26 #include "ClientManager.h"
27 #include "QueueManager.h"
28 #include "LogManager.h"
29
30 #include "UserConnection.h"
31
32 namespace dcpp {
33
ConnectionManager()34 ConnectionManager::ConnectionManager() : floodCounter(0), server(0), secureServer(0), shuttingDown(false) {
35 TimerManager::getInstance()->addListener(this);
36
37 features.push_back(UserConnection::FEATURE_MINISLOTS);
38 features.push_back(UserConnection::FEATURE_XML_BZLIST);
39 features.push_back(UserConnection::FEATURE_ADCGET);
40 features.push_back(UserConnection::FEATURE_TTHL);
41 features.push_back(UserConnection::FEATURE_TTHF);
42
43 adcFeatures.push_back("AD" + UserConnection::FEATURE_ADC_BAS0);
44 adcFeatures.push_back("AD" + UserConnection::FEATURE_ADC_BASE);
45 adcFeatures.push_back("AD" + UserConnection::FEATURE_ADC_TIGR);
46 adcFeatures.push_back("AD" + UserConnection::FEATURE_ADC_BZIP);
47 }
48
listen()49 void ConnectionManager::listen() {
50 disconnect();
51
52 server = new Server(false, static_cast<uint16_t>(SETTING(TCP_PORT)), SETTING(BIND_ADDRESS));
53
54 if(!CryptoManager::getInstance()->TLSOk()) {
55 dcdebug("Skipping secure port: %d\n", SETTING(TLS_PORT));
56 return;
57 }
58
59 secureServer = new Server(true, static_cast<uint16_t>(SETTING(TLS_PORT)), SETTING(BIND_ADDRESS));
60 }
61
62 /**
63 * Request a connection for downloading.
64 * DownloadManager::addConnection will be called as soon as the connection is ready
65 * for downloading.
66 * @param aUser The user to connect to.
67 */
getDownloadConnection(const HintedUser & aUser)68 void ConnectionManager::getDownloadConnection(const HintedUser& aUser) {
69 dcassert((bool)aUser.user);
70 {
71 Lock l(cs);
72 ConnectionQueueItem::Iter i = find(downloads.begin(), downloads.end(), aUser.user);
73 if(i == downloads.end()) {
74 getCQI(aUser, true);
75 } else {
76 DownloadManager::getInstance()->checkIdle(aUser.user);
77 }
78 }
79 }
80
getCQI(const HintedUser & aUser,bool download)81 ConnectionQueueItem* ConnectionManager::getCQI(const HintedUser& aUser, bool download) {
82 ConnectionQueueItem* cqi = new ConnectionQueueItem(aUser, download);
83 if(download) {
84 dcassert(find(downloads.begin(), downloads.end(), aUser.user) == downloads.end());
85 downloads.push_back(cqi);
86 } else {
87 dcassert(find(uploads.begin(), uploads.end(), aUser.user) == uploads.end());
88 uploads.push_back(cqi);
89 }
90
91 fire(ConnectionManagerListener::Added(), cqi);
92 return cqi;
93 }
94
putCQI(ConnectionQueueItem * cqi)95 void ConnectionManager::putCQI(ConnectionQueueItem* cqi) {
96 fire(ConnectionManagerListener::Removed(), cqi);
97 if(cqi->getDownload()) {
98 dcassert(find(downloads.begin(), downloads.end(), cqi) != downloads.end());
99 downloads.erase(remove(downloads.begin(), downloads.end(), cqi), downloads.end());
100 } else {
101 dcassert(find(uploads.begin(), uploads.end(), cqi) != uploads.end());
102 uploads.erase(remove(uploads.begin(), uploads.end(), cqi), uploads.end());
103 }
104 delete cqi;
105 }
106
getConnection(bool aNmdc,bool secure)107 UserConnection* ConnectionManager::getConnection(bool aNmdc, bool secure) noexcept {
108 UserConnection* uc = new UserConnection(secure);
109 uc->addListener(this);
110 {
111 Lock l(cs);
112 userConnections.push_back(uc);
113 }
114 if(aNmdc)
115 uc->setFlag(UserConnection::FLAG_NMDC);
116 return uc;
117 }
118
putConnection(UserConnection * aConn)119 void ConnectionManager::putConnection(UserConnection* aConn) {
120 aConn->removeListener(this);
121 aConn->disconnect();
122
123 Lock l(cs);
124 userConnections.erase(remove(userConnections.begin(), userConnections.end(), aConn), userConnections.end());
125 }
126
on(TimerManagerListener::Second,uint64_t aTick)127 void ConnectionManager::on(TimerManagerListener::Second, uint64_t aTick) noexcept {
128 UserList passiveUsers;
129 ConnectionQueueItem::List removed;
130
131 {
132 Lock l(cs);
133
134 bool attemptDone = false;
135
136 for(auto i = downloads.begin(); i != downloads.end(); ++i) {
137 ConnectionQueueItem* cqi = *i;
138
139 if(cqi->getState() != ConnectionQueueItem::ACTIVE) {
140 if(!cqi->getUser().user->isOnline()) {
141 // Not online anymore...remove it from the pending...
142 removed.push_back(cqi);
143 continue;
144 }
145
146 if(cqi->getUser().user->isSet(User::PASSIVE) && !ClientManager::getInstance()->isActive()) {
147 passiveUsers.push_back(cqi->getUser());
148 removed.push_back(cqi);
149 continue;
150 }
151
152 if(cqi->getErrors() == -1 && cqi->getLastAttempt() != 0) {
153 // protocol error, don't reconnect except after a forced attempt
154 continue;
155 }
156
157 if(cqi->getLastAttempt() == 0 || (!attemptDone &&
158 cqi->getLastAttempt() + 60 * 1000 * max(1, cqi->getErrors()) < aTick))
159 {
160 cqi->setLastAttempt(aTick);
161
162 QueueItem::Priority prio = QueueManager::getInstance()->hasDownload(cqi->getUser());
163
164 if(prio == QueueItem::PAUSED) {
165 removed.push_back(cqi);
166 continue;
167 }
168
169 bool startDown = DownloadManager::getInstance()->startDownload(prio);
170
171 if(cqi->getState() == ConnectionQueueItem::WAITING) {
172 if(startDown) {
173 cqi->setState(ConnectionQueueItem::CONNECTING);
174 ClientManager::getInstance()->connect(cqi->getUser(), cqi->getToken());
175 fire(ConnectionManagerListener::StatusChanged(), cqi);
176 attemptDone = true;
177 } else {
178 cqi->setState(ConnectionQueueItem::NO_DOWNLOAD_SLOTS);
179 fire(ConnectionManagerListener::Failed(), cqi, _("All download slots taken"));
180 }
181 } else if(cqi->getState() == ConnectionQueueItem::NO_DOWNLOAD_SLOTS && startDown) {
182 cqi->setState(ConnectionQueueItem::WAITING);
183 }
184 } else if(cqi->getState() == ConnectionQueueItem::CONNECTING && cqi->getLastAttempt() + 50 * 1000 < aTick) {
185 cqi->setErrors(cqi->getErrors() + 1);
186 fire(ConnectionManagerListener::Failed(), cqi, _("Connection timeout"));
187 cqi->setState(ConnectionQueueItem::WAITING);
188 }
189 }
190 }
191
192 for(auto m = removed.begin(); m != removed.end(); ++m) {
193 putCQI(*m);
194 }
195
196 }
197
198 for(auto ui = passiveUsers.begin(); ui != passiveUsers.end(); ++ui) {
199 QueueManager::getInstance()->removeSource(*ui, QueueItem::Source::FLAG_PASSIVE);
200 }
201 }
202
on(TimerManagerListener::Minute,uint64_t aTick)203 void ConnectionManager::on(TimerManagerListener::Minute, uint64_t aTick) noexcept {
204 Lock l(cs);
205
206 for(auto j = userConnections.begin(); j != userConnections.end(); ++j) {
207 if(((*j)->getLastActivity() + 180*1000) < aTick) {
208 (*j)->disconnect(true);
209 }
210 }
211 }
212
213 static const uint32_t FLOOD_TRIGGER = 20000;
214 static const uint32_t FLOOD_ADD = 2000;
215
Server(bool secure_,uint16_t aPort,const string & ip_)216 ConnectionManager::Server::Server(bool secure_, uint16_t aPort, const string& ip_ /* = "0.0.0.0" */) : port(0), secure(secure_), die(false) {
217 sock.create();
218 sock.setSocketOpt(SO_REUSEADDR, 1);
219 ip = SETTING(BIND_IFACE)? sock.getIfaceI4(SETTING(BIND_IFACE_NAME)).c_str() : ip_;
220 port = sock.bind(aPort, ip);
221 sock.listen();
222
223 start();
224 }
225
226 static const uint32_t POLL_TIMEOUT = 250;
227
run()228 int ConnectionManager::Server::run() noexcept {
229 {
230 char threadName[17];
231 snprintf(threadName, sizeof threadName, "Server_%u", port);
232 setThreadName(threadName);
233 }
234 while(!die) {
235 try {
236 while(!die) {
237 if(sock.wait(POLL_TIMEOUT, Socket::WAIT_READ) == Socket::WAIT_READ) {
238 ConnectionManager::getInstance()->accept(sock, secure);
239 }
240 }
241 } catch(const Exception& e) {
242 dcdebug("ConnectionManager::Server::run Error: %s\n", e.getError().c_str());
243 }
244
245 bool failed = false;
246 while(!die) {
247 try {
248 sock.disconnect();
249 sock.create();
250 sock.bind(port, ip);
251 sock.listen();
252 if(failed) {
253 LogManager::getInstance()->message(_("Connectivity restored"));
254 failed = false;
255 }
256 break;
257 } catch(const SocketException& e) {
258 dcdebug("ConnectionManager::Server::run Stopped listening: %s\n", e.getError().c_str());
259
260 if(!failed) {
261 LogManager::getInstance()->message(str(F_("Connectivity error: %1%") % e.getError()));
262 failed = true;
263 }
264
265 // Spin for 60 seconds
266 for(int i = 0; i < 60 && !die; ++i) {
267 Thread::sleep(1000);
268 }
269 }
270 }
271 }
272 return 0;
273 }
274
275 /**
276 * Someone's connecting, accept the connection and wait for identification...
277 * It's always the other fellow that starts sending if he made the connection.
278 */
accept(const Socket & sock,bool secure)279 void ConnectionManager::accept(const Socket& sock, bool secure) noexcept {
280 uint64_t now = GET_TICK();
281
282 if(now > floodCounter) {
283 floodCounter = now + FLOOD_ADD;
284 } else {
285 if(false && now + FLOOD_TRIGGER < floodCounter) {
286 Socket s;
287 try {
288 s.accept(sock);
289 } catch(const SocketException&) {
290 // ...
291 }
292 dcdebug("Connection flood detected!\n");
293 return;
294 } else {
295 floodCounter += FLOOD_ADD;
296 }
297 }
298 UserConnection* uc = getConnection(false, secure);
299 uc->setFlag(UserConnection::FLAG_INCOMING);
300 uc->setState(UserConnection::STATE_SUPNICK);
301 uc->setLastActivity(GET_TICK());
302 try {
303 uc->accept(sock);
304 } catch(const Exception&) {
305 putConnection(uc);
306 delete uc;
307 }
308 }
309
addCTM2HUB(const string & server,const string & port)310 void ConnectionManager::addCTM2HUB(const string &server, const string &port)
311 {
312 Lock l(cs);
313 const string key = server + ':' + port;
314 ddosctm2hub.insert(key);
315 }
316
nmdcConnect(const string & aServer,uint16_t aPort,const string & aNick,const string & hubUrl,const string & encoding,bool secure)317 void ConnectionManager::nmdcConnect(const string& aServer, uint16_t aPort, const string& aNick, const string& hubUrl, const string& encoding, bool secure) {
318 nmdcConnect(aServer, aPort, 0, BufferedSocket::NAT_NONE, aNick, hubUrl, encoding, secure);
319 }
320
nmdcConnect(const string & aServer,uint16_t aPort,uint16_t localPort,BufferedSocket::NatRoles natRole,const string & aNick,const string & hubUrl,const string & encoding,bool secure)321 void ConnectionManager::nmdcConnect(const string& aServer, uint16_t aPort, uint16_t localPort, BufferedSocket::NatRoles natRole, const string& aNick, const string& hubUrl, const string& encoding, bool secure) {
322 if(shuttingDown)
323 return;
324 {
325 Lock l(cs);
326 if (!ddosctm2hub.empty() && ddosctm2hub.find(aServer + ":" + Util::toString(aPort)) != ddosctm2hub.end()) {
327 return;
328 }
329 }
330 UserConnection* uc = getConnection(true, secure);
331 uc->setToken(aNick);
332 uc->setHubUrl(hubUrl);
333 uc->setEncoding(encoding);
334 uc->setState(UserConnection::STATE_CONNECT);
335 uc->setFlag(UserConnection::FLAG_NMDC);
336 try {
337 uc->connect(aServer, aPort, localPort, natRole);
338 } catch(const Exception&) {
339 putConnection(uc);
340 delete uc;
341 }
342 }
343
adcConnect(const OnlineUser & aUser,uint16_t aPort,const string & aToken,bool secure)344 void ConnectionManager::adcConnect(const OnlineUser& aUser, uint16_t aPort, const string& aToken, bool secure) {
345 adcConnect(aUser, aPort, 0, BufferedSocket::NAT_NONE, aToken, secure);
346 }
347
adcConnect(const OnlineUser & aUser,uint16_t aPort,uint16_t localPort,BufferedSocket::NatRoles natRole,const string & aToken,bool secure)348 void ConnectionManager::adcConnect(const OnlineUser& aUser, uint16_t aPort, uint16_t localPort, BufferedSocket::NatRoles natRole, const string& aToken, bool secure) {
349 if(shuttingDown)
350 return;
351
352 UserConnection* uc = getConnection(false, secure);
353 uc->setToken(aToken);
354 uc->setEncoding(Text::utf8);
355 uc->setState(UserConnection::STATE_CONNECT);
356 #ifdef WITH_DHT
357 uc->setHubUrl(&aUser.getClient() == NULL ? "DHT" : aUser.getClient().getHubUrl());
358 #else
359 uc->setHubUrl(aUser.getClient().getHubUrl());
360 #endif
361 if(aUser.getIdentity().isOp()) {
362 uc->setFlag(UserConnection::FLAG_OP);
363 }
364 try {
365 uc->connect(aUser.getIdentity().getIp(), aPort, localPort, natRole);
366 } catch(const Exception&) {
367 putConnection(uc);
368 delete uc;
369 }
370 }
371
disconnect()372 void ConnectionManager::disconnect() noexcept {
373 delete server;
374 delete secureServer;
375
376 server = secureServer = 0;
377 }
378
on(AdcCommand::SUP,UserConnection * aSource,const AdcCommand & cmd)379 void ConnectionManager::on(AdcCommand::SUP, UserConnection* aSource, const AdcCommand& cmd) noexcept {
380 if(aSource->getState() != UserConnection::STATE_SUPNICK) {
381 // Already got this once, ignore...@todo fix support updates
382 dcdebug("CM::onSUP %p sent sup twice\n", (void*)aSource);
383 return;
384 }
385
386 bool baseOk = false;
387
388 for(auto i = cmd.getParameters().begin(); i != cmd.getParameters().end(); ++i) {
389 if(i->compare(0, 2, "AD") == 0) {
390 string feat = i->substr(2);
391 if(feat == UserConnection::FEATURE_ADC_BASE || feat == UserConnection::FEATURE_ADC_BAS0) {
392 baseOk = true;
393 // ADC clients must support all these...
394 aSource->setFlag(UserConnection::FLAG_SUPPORTS_ADCGET);
395 aSource->setFlag(UserConnection::FLAG_SUPPORTS_MINISLOTS);
396 aSource->setFlag(UserConnection::FLAG_SUPPORTS_TTHF);
397 aSource->setFlag(UserConnection::FLAG_SUPPORTS_TTHL);
398 // For compatibility with older clients...
399 aSource->setFlag(UserConnection::FLAG_SUPPORTS_XML_BZLIST);
400 } else if(feat == UserConnection::FEATURE_ZLIB_GET) {
401 aSource->setFlag(UserConnection::FLAG_SUPPORTS_ZLIB_GET);
402 } else if(feat == UserConnection::FEATURE_ADC_BZIP) {
403 aSource->setFlag(UserConnection::FLAG_SUPPORTS_XML_BZLIST);
404 }
405 }
406 }
407
408 if(!baseOk) {
409 aSource->send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_PROTOCOL_GENERIC, "Invalid SUP"));
410 aSource->disconnect();
411 return;
412 }
413
414 if(aSource->isSet(UserConnection::FLAG_INCOMING)) {
415 StringList defFeatures = adcFeatures;
416 if(BOOLSETTING(COMPRESS_TRANSFERS)) {
417 defFeatures.push_back("AD" + UserConnection::FEATURE_ZLIB_GET);
418 }
419 aSource->sup(defFeatures);
420 aSource->inf(false);
421 } else {
422 aSource->inf(true);
423 }
424 aSource->setState(UserConnection::STATE_INF);
425 }
426
on(AdcCommand::STA,UserConnection *,const AdcCommand & cmd)427 void ConnectionManager::on(AdcCommand::STA, UserConnection*, const AdcCommand& cmd) noexcept {
428
429 }
430
on(UserConnectionListener::Connected,UserConnection * aSource)431 void ConnectionManager::on(UserConnectionListener::Connected, UserConnection* aSource) noexcept {
432 // incorrect check because aSource->getUser().get() == nullptr
433 if(aSource->isSecure() && !aSource->isTrusted() && !BOOLSETTING(ALLOW_UNTRUSTED_CLIENTS)) {
434 putConnection(aSource);
435 // QueueManager::getInstance()->removeSource(aSource->getUser(), QueueItem::Source::FLAG_UNTRUSTED);
436 return;
437 }
438
439 dcassert(aSource->getState() == UserConnection::STATE_CONNECT);
440 if(aSource->isSet(UserConnection::FLAG_NMDC)) {
441 aSource->myNick(aSource->getToken());
442 aSource->lock(CryptoManager::getInstance()->getLock(), CryptoManager::getInstance()->getPk() + "Ref=" + aSource->getHubUrl());
443 } else {
444 StringList defFeatures = adcFeatures;
445 if(BOOLSETTING(COMPRESS_TRANSFERS)) {
446 defFeatures.push_back("AD" + UserConnection::FEATURE_ZLIB_GET);
447 }
448 aSource->sup(defFeatures);
449 aSource->send(AdcCommand(AdcCommand::SEV_SUCCESS, AdcCommand::SUCCESS, Util::emptyString).addParam("RF", aSource->getHubUrl()));
450 }
451 aSource->setState(UserConnection::STATE_SUPNICK);
452 }
453
on(UserConnectionListener::MyNick,UserConnection * aSource,const string & aNick)454 void ConnectionManager::on(UserConnectionListener::MyNick, UserConnection* aSource, const string& aNick) noexcept {
455 if(aSource->getState() != UserConnection::STATE_SUPNICK) {
456 // Already got this once, ignore...
457 dcdebug("CM::onMyNick %p sent nick twice\n", (void*)aSource);
458 return;
459 }
460
461 dcassert(!aNick.empty());
462 dcdebug("ConnectionManager::onMyNick %p, %s\n", (void*)aSource, aNick.c_str());
463 dcassert(!aSource->getUser());
464
465 if(aSource->isSet(UserConnection::FLAG_INCOMING)) {
466 // Try to guess where this came from...
467 pair<string, string> i = expectedConnections.remove(aNick);
468 if(i.second.empty()) {
469 dcassert(i.first.empty());
470 dcdebug("Unknown incoming connection from %s\n", aNick.c_str());
471 putConnection(aSource);
472 return;
473 }
474 aSource->setToken(i.first);
475 aSource->setHubUrl(i.second);
476 aSource->setEncoding(ClientManager::getInstance()->findHubEncoding(i.second));
477 }
478
479 string nick = Text::toUtf8(aNick, aSource->getEncoding());
480 CID cid = ClientManager::getInstance()->makeCid(nick, aSource->getHubUrl());
481
482 // First, we try looking in the pending downloads...hopefully it's one of them...
483 {
484 Lock l(cs);
485 for(auto i = downloads.begin(); i != downloads.end(); ++i) {
486 ConnectionQueueItem* cqi = *i;
487 cqi->setErrors(0);
488 if((cqi->getState() == ConnectionQueueItem::CONNECTING || cqi->getState() == ConnectionQueueItem::WAITING) &&
489 cqi->getUser().user->getCID() == cid)
490 {
491 aSource->setUser(cqi->getUser());
492 // Indicate that we're interested in this file...
493 aSource->setFlag(UserConnection::FLAG_DOWNLOAD);
494 break;
495 }
496 }
497 }
498
499 if(!aSource->getUser()) {
500 // Make sure we know who it is, i e that he/she is connected...
501
502 aSource->setUser(ClientManager::getInstance()->findUser(cid));
503 if(!aSource->getUser() || !ClientManager::getInstance()->isOnline(aSource->getUser())) {
504 dcdebug("CM::onMyNick Incoming connection from unknown user %s\n", nick.c_str());
505 putConnection(aSource);
506 return;
507 }
508 // We don't need this connection for downloading...make it an upload connection instead...
509 aSource->setFlag(UserConnection::FLAG_UPLOAD);
510 }
511
512 if(ClientManager::getInstance()->isOp(aSource->getUser(), aSource->getHubUrl()))
513 aSource->setFlag(UserConnection::FLAG_OP);
514
515 ClientManager::getInstance()->setIPUser(aSource->getUser(), aSource->getRemoteIp());
516
517 if( aSource->isSet(UserConnection::FLAG_INCOMING) ) {
518 aSource->myNick(aSource->getToken());
519 aSource->lock(CryptoManager::getInstance()->getLock(), CryptoManager::getInstance()->getPk());
520 }
521
522 aSource->setState(UserConnection::STATE_LOCK);
523 }
524
on(UserConnectionListener::CLock,UserConnection * aSource,const string & aLock,const string & aPk)525 void ConnectionManager::on(UserConnectionListener::CLock, UserConnection* aSource, const string& aLock, const string& aPk) noexcept {
526 if(aSource->getState() != UserConnection::STATE_LOCK) {
527 dcdebug("CM::onLock %p received lock twice, ignoring\n", (void*)aSource);
528 return;
529 }
530
531 if( CryptoManager::getInstance()->isExtended(aLock) ) {
532 StringList defFeatures = features;
533 if(BOOLSETTING(COMPRESS_TRANSFERS)) {
534 defFeatures.push_back(UserConnection::FEATURE_ZLIB_GET);
535 }
536
537 aSource->supports(defFeatures);
538 }
539
540 aSource->setState(UserConnection::STATE_DIRECTION);
541 aSource->direction(aSource->getDirectionString(), aSource->getNumber());
542 aSource->key(CryptoManager::getInstance()->makeKey(aLock));
543 }
544
on(UserConnectionListener::Direction,UserConnection * aSource,const string & dir,const string & num)545 void ConnectionManager::on(UserConnectionListener::Direction, UserConnection* aSource, const string& dir, const string& num) noexcept {
546 if(aSource->getState() != UserConnection::STATE_DIRECTION) {
547 dcdebug("CM::onDirection %p received direction twice, ignoring\n", (void*)aSource);
548 return;
549 }
550
551 dcassert(aSource->isSet(UserConnection::FLAG_DOWNLOAD) ^ aSource->isSet(UserConnection::FLAG_UPLOAD));
552 if(dir == "Upload") {
553 // Fine, the other fellow want's to send us data...make sure we really want that...
554 if(aSource->isSet(UserConnection::FLAG_UPLOAD)) {
555 // Huh? Strange...disconnect...
556 putConnection(aSource);
557 return;
558 }
559 } else {
560 if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) {
561 int number = Util::toInt(num);
562 // Damn, both want to download...the one with the highest number wins...
563 if(aSource->getNumber() < number) {
564 // Damn! We lost!
565 aSource->unsetFlag(UserConnection::FLAG_DOWNLOAD);
566 aSource->setFlag(UserConnection::FLAG_UPLOAD);
567 } else if(aSource->getNumber() == number) {
568 putConnection(aSource);
569 return;
570 }
571 }
572 }
573
574 dcassert(aSource->isSet(UserConnection::FLAG_DOWNLOAD) ^ aSource->isSet(UserConnection::FLAG_UPLOAD));
575
576 aSource->setState(UserConnection::STATE_KEY);
577 }
578
addDownloadConnection(UserConnection * uc)579 void ConnectionManager::addDownloadConnection(UserConnection* uc) {
580 dcassert(uc->isSet(UserConnection::FLAG_DOWNLOAD));
581 bool addConn = false;
582 {
583 Lock l(cs);
584
585 auto i = find(downloads.begin(), downloads.end(), uc->getUser());
586 if(i != downloads.end()) {
587 ConnectionQueueItem* cqi = *i;
588 if(cqi->getState() == ConnectionQueueItem::WAITING || cqi->getState() == ConnectionQueueItem::CONNECTING) {
589 cqi->setState(ConnectionQueueItem::ACTIVE);
590 uc->setFlag(UserConnection::FLAG_ASSOCIATED);
591
592 fire(ConnectionManagerListener::Connected(), cqi);
593
594 dcdebug("ConnectionManager::addDownloadConnection, leaving to downloadmanager\n");
595 addConn = true;
596 }
597 }
598 }
599
600 if(addConn) {
601 DownloadManager::getInstance()->addConnection(uc);
602 } else {
603 putConnection(uc);
604 }
605 }
606
addUploadConnection(UserConnection * uc)607 void ConnectionManager::addUploadConnection(UserConnection* uc) {
608 dcassert(uc->isSet(UserConnection::FLAG_UPLOAD));
609
610 bool addConn = false;
611 {
612 Lock l(cs);
613
614 auto i = find(uploads.begin(), uploads.end(), uc->getUser());
615 if(i == uploads.end()) {
616 ConnectionQueueItem* cqi = getCQI(uc->getHintedUser(), false);
617
618 cqi->setState(ConnectionQueueItem::ACTIVE);
619 uc->setFlag(UserConnection::FLAG_ASSOCIATED);
620
621 fire(ConnectionManagerListener::Connected(), cqi);
622
623 dcdebug("ConnectionManager::addUploadConnection, leaving to uploadmanager\n");
624 addConn = true;
625 }
626 }
627
628 if(addConn) {
629 UploadManager::getInstance()->addConnection(uc);
630 } else {
631 putConnection(uc);
632 }
633 }
634
on(UserConnectionListener::Key,UserConnection * aSource,const string &)635 void ConnectionManager::on(UserConnectionListener::Key, UserConnection* aSource, const string&/* aKey*/) noexcept {
636 if(aSource->getState() != UserConnection::STATE_KEY) {
637 dcdebug("CM::onKey Bad state, ignoring");
638 return;
639 }
640
641 dcassert(aSource->getUser());
642
643 if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) {
644 addDownloadConnection(aSource);
645 } else {
646 addUploadConnection(aSource);
647 }
648 }
649
on(AdcCommand::INF,UserConnection * aSource,const AdcCommand & cmd)650 void ConnectionManager::on(AdcCommand::INF, UserConnection* aSource, const AdcCommand& cmd) noexcept {
651 if(aSource->getState() != UserConnection::STATE_INF) {
652 aSource->send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_PROTOCOL_GENERIC, "Expecting INF"));
653 aSource->disconnect();
654 return;
655 }
656
657 string cid;
658 if(!cmd.getParam("ID", 0, cid)) {
659 aSource->send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_INF_MISSING, "ID missing").addParam("FL", "ID"));
660 dcdebug("CM::onINF missing ID\n");
661 aSource->disconnect();
662 return;
663 }
664
665 aSource->setUser(ClientManager::getInstance()->findUser(CID(cid)));
666
667 if(!aSource->getUser()) {
668 dcdebug("CM::onINF: User not found");
669 aSource->send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_GENERIC, "User not found"));
670 putConnection(aSource);
671 return;
672 }
673
674 if(!checkKeyprint(aSource)) {
675 putConnection(aSource);
676 return;
677 }
678
679 string token;
680 if(aSource->isSet(UserConnection::FLAG_INCOMING)) {
681 if(!cmd.getParam("TO", 0, token)) {
682 aSource->send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_GENERIC, "TO missing"));
683 putConnection(aSource);
684 return;
685 }
686 } else {
687 token = aSource->getToken();
688 }
689
690 bool down = false;
691 {
692 Lock l(cs);
693 auto i = find(downloads.begin(), downloads.end(), aSource->getUser());
694
695 if(i != downloads.end()) {
696 (*i)->setErrors(0);
697
698 const string& to = (*i)->getToken();
699
700 if(to == token) {
701 down = true;
702 }
703 }
704 /** @todo check tokens for upload connections */
705 }
706
707 if(down) {
708 aSource->setFlag(UserConnection::FLAG_DOWNLOAD);
709 addDownloadConnection(aSource);
710 } else {
711 aSource->setFlag(UserConnection::FLAG_UPLOAD);
712 addUploadConnection(aSource);
713 }
714 }
715
force(const UserPtr & aUser)716 void ConnectionManager::force(const UserPtr& aUser) {
717 Lock l(cs);
718
719 auto i = find(downloads.begin(), downloads.end(), aUser);
720 if(i == downloads.end()) {
721 return;
722 }
723
724 (*i)->setLastAttempt(0);
725 }
726
checkKeyprint(UserConnection * aSource)727 bool ConnectionManager::checkKeyprint(UserConnection *aSource) {
728 dcassert(aSource->getUser());
729
730 auto kp = aSource->getKeyprint();
731 if(kp.empty()) {
732 return true;
733 }
734
735 auto kp2 = ClientManager::getInstance()->getField(aSource->getUser()->getCID(), aSource->getHubUrl(), "KP");
736 if(kp2.empty()) {
737 // TODO false probably
738 return true;
739 }
740
741 if(kp2.compare(0, 7, "SHA256/") != 0) {
742 // Unsupported hash
743 return true;
744 }
745
746 dcdebug("Keyprint: %s vs %s\n", Encoder::toBase32(&kp[0], kp.size()).c_str(), kp2.c_str() + 7);
747
748 vector<uint8_t> kp2v(kp.size());
749 Encoder::fromBase32(&kp2[7], &kp2v[0], kp2v.size());
750 if(!std::equal(kp.begin(), kp.end(), kp2v.begin())) {
751 dcdebug("Not equal...\n");
752 return false;
753 }
754
755 return true;
756 }
757
failed(UserConnection * aSource,const string & aError,bool protocolError)758 void ConnectionManager::failed(UserConnection* aSource, const string& aError, bool protocolError) {
759 Lock l(cs);
760
761 if(aSource->isSet(UserConnection::FLAG_ASSOCIATED)) {
762 if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) {
763 auto i = find(downloads.begin(), downloads.end(), aSource->getUser());
764 dcassert(i != downloads.end());
765 ConnectionQueueItem* cqi = *i;
766 cqi->setState(ConnectionQueueItem::WAITING);
767 cqi->setLastAttempt(GET_TICK());
768 cqi->setErrors(protocolError ? -1 : (cqi->getErrors() + 1));
769 fire(ConnectionManagerListener::Failed(), cqi, aError);
770 } else if(aSource->isSet(UserConnection::FLAG_UPLOAD)) {
771 auto i = find(uploads.begin(), uploads.end(), aSource->getUser());
772 dcassert(i != uploads.end());
773 ConnectionQueueItem* cqi = *i;
774 putCQI(cqi);
775 }
776 }
777 putConnection(aSource);
778 }
779
on(UserConnectionListener::Failed,UserConnection * aSource,const string & aError)780 void ConnectionManager::on(UserConnectionListener::Failed, UserConnection* aSource, const string& aError) noexcept {
781 failed(aSource, aError, false);
782 }
783
on(UserConnectionListener::ProtocolError,UserConnection * aSource,const string & aError)784 void ConnectionManager::on(UserConnectionListener::ProtocolError, UserConnection* aSource, const string& aError) noexcept {
785 failed(aSource, aError, true);
786 }
787
disconnect(const UserPtr & aUser)788 void ConnectionManager::disconnect(const UserPtr& aUser) {
789 Lock l(cs);
790 for(auto i = userConnections.begin(); i != userConnections.end(); ++i) {
791 UserConnection* uc = *i;
792 if(uc->getUser() == aUser)
793 uc->disconnect(true);
794 }
795 }
796
disconnect(const UserPtr & aUser,int isDownload)797 void ConnectionManager::disconnect(const UserPtr& aUser, int isDownload) {
798 Lock l(cs);
799 for(auto i = userConnections.begin(); i != userConnections.end(); ++i) {
800 UserConnection* uc = *i;
801 if(uc->getUser() == aUser && uc->isSet(isDownload ? UserConnection::FLAG_DOWNLOAD : UserConnection::FLAG_UPLOAD)) {
802 uc->disconnect(true);
803 break;
804 }
805 }
806 }
807
shutdown()808 void ConnectionManager::shutdown() {
809 TimerManager::getInstance()->removeListener(this);
810 shuttingDown = true;
811 disconnect();
812 {
813 Lock l(cs);
814 for(auto j = userConnections.begin(); j != userConnections.end(); ++j) {
815 (*j)->disconnect(true);
816 }
817 }
818 // Wait until all connections have died out...
819 while(true) {
820 {
821 Lock l(cs);
822 if(userConnections.empty()) {
823 break;
824 }
825 }
826 Thread::sleep(50);
827 }
828 }
829
830 // UserConnectionListener
on(UserConnectionListener::Supports,UserConnection * conn,const StringList & feat)831 void ConnectionManager::on(UserConnectionListener::Supports, UserConnection* conn, const StringList& feat) noexcept {
832 for(auto i = feat.begin(); i != feat.end(); ++i) {
833 if(*i == UserConnection::FEATURE_MINISLOTS) {
834 conn->setFlag(UserConnection::FLAG_SUPPORTS_MINISLOTS);
835 } else if(*i == UserConnection::FEATURE_XML_BZLIST) {
836 conn->setFlag(UserConnection::FLAG_SUPPORTS_XML_BZLIST);
837 } else if(*i == UserConnection::FEATURE_ADCGET) {
838 conn->setFlag(UserConnection::FLAG_SUPPORTS_ADCGET);
839 } else if(*i == UserConnection::FEATURE_ZLIB_GET) {
840 conn->setFlag(UserConnection::FLAG_SUPPORTS_ZLIB_GET);
841 } else if(*i == UserConnection::FEATURE_TTHL) {
842 conn->setFlag(UserConnection::FLAG_SUPPORTS_TTHL);
843 } else if(*i == UserConnection::FEATURE_TTHF) {
844 conn->setFlag(UserConnection::FLAG_SUPPORTS_TTHF);
845 }
846 }
847 }
848
849 } // namespace dcpp
850