1 /*
2  * Copyright (C) 2001-2012 Jacek Sieka, arnetheduck on gmail point com
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17  */
18 
19 #include "stdinc.h"
20 
21 #include "ConnectionManager.h"
22 
23 #include "DownloadManager.h"
24 #include "UploadManager.h"
25 #include "CryptoManager.h"
26 #include "ClientManager.h"
27 #include "QueueManager.h"
28 #include "LogManager.h"
29 
30 #include "UserConnection.h"
31 
32 namespace dcpp {
33 
ConnectionManager()34 ConnectionManager::ConnectionManager() : floodCounter(0), server(0), secureServer(0), shuttingDown(false) {
35     TimerManager::getInstance()->addListener(this);
36 
37     features.push_back(UserConnection::FEATURE_MINISLOTS);
38     features.push_back(UserConnection::FEATURE_XML_BZLIST);
39     features.push_back(UserConnection::FEATURE_ADCGET);
40     features.push_back(UserConnection::FEATURE_TTHL);
41     features.push_back(UserConnection::FEATURE_TTHF);
42 
43     adcFeatures.push_back("AD" + UserConnection::FEATURE_ADC_BAS0);
44     adcFeatures.push_back("AD" + UserConnection::FEATURE_ADC_BASE);
45     adcFeatures.push_back("AD" + UserConnection::FEATURE_ADC_TIGR);
46     adcFeatures.push_back("AD" + UserConnection::FEATURE_ADC_BZIP);
47 }
48 
listen()49 void ConnectionManager::listen() {
50     disconnect();
51 
52     server = new Server(false, static_cast<uint16_t>(SETTING(TCP_PORT)), SETTING(BIND_ADDRESS));
53 
54     if(!CryptoManager::getInstance()->TLSOk()) {
55         dcdebug("Skipping secure port: %d\n", SETTING(TLS_PORT));
56         return;
57     }
58 
59     secureServer = new Server(true, static_cast<uint16_t>(SETTING(TLS_PORT)), SETTING(BIND_ADDRESS));
60 }
61 
62 /**
63  * Request a connection for downloading.
64  * DownloadManager::addConnection will be called as soon as the connection is ready
65  * for downloading.
66  * @param aUser The user to connect to.
67  */
getDownloadConnection(const HintedUser & aUser)68 void ConnectionManager::getDownloadConnection(const HintedUser& aUser) {
69     dcassert((bool)aUser.user);
70     {
71         Lock l(cs);
72         ConnectionQueueItem::Iter i = find(downloads.begin(), downloads.end(), aUser.user);
73         if(i == downloads.end()) {
74             getCQI(aUser, true);
75         } else {
76             DownloadManager::getInstance()->checkIdle(aUser.user);
77         }
78     }
79 }
80 
getCQI(const HintedUser & aUser,bool download)81 ConnectionQueueItem* ConnectionManager::getCQI(const HintedUser& aUser, bool download) {
82     ConnectionQueueItem* cqi = new ConnectionQueueItem(aUser, download);
83     if(download) {
84         dcassert(find(downloads.begin(), downloads.end(), aUser.user) == downloads.end());
85         downloads.push_back(cqi);
86     } else {
87         dcassert(find(uploads.begin(), uploads.end(), aUser.user) == uploads.end());
88         uploads.push_back(cqi);
89     }
90 
91     fire(ConnectionManagerListener::Added(), cqi);
92     return cqi;
93 }
94 
putCQI(ConnectionQueueItem * cqi)95 void ConnectionManager::putCQI(ConnectionQueueItem* cqi) {
96     fire(ConnectionManagerListener::Removed(), cqi);
97     if(cqi->getDownload()) {
98         dcassert(find(downloads.begin(), downloads.end(), cqi) != downloads.end());
99         downloads.erase(remove(downloads.begin(), downloads.end(), cqi), downloads.end());
100     } else {
101         dcassert(find(uploads.begin(), uploads.end(), cqi) != uploads.end());
102         uploads.erase(remove(uploads.begin(), uploads.end(), cqi), uploads.end());
103     }
104     delete cqi;
105 }
106 
getConnection(bool aNmdc,bool secure)107 UserConnection* ConnectionManager::getConnection(bool aNmdc, bool secure) noexcept {
108     UserConnection* uc = new UserConnection(secure);
109     uc->addListener(this);
110     {
111         Lock l(cs);
112         userConnections.push_back(uc);
113     }
114     if(aNmdc)
115         uc->setFlag(UserConnection::FLAG_NMDC);
116     return uc;
117 }
118 
putConnection(UserConnection * aConn)119 void ConnectionManager::putConnection(UserConnection* aConn) {
120     aConn->removeListener(this);
121     aConn->disconnect();
122 
123     Lock l(cs);
124     userConnections.erase(remove(userConnections.begin(), userConnections.end(), aConn), userConnections.end());
125 }
126 
on(TimerManagerListener::Second,uint64_t aTick)127 void ConnectionManager::on(TimerManagerListener::Second, uint64_t aTick) noexcept {
128     UserList passiveUsers;
129     ConnectionQueueItem::List removed;
130 
131     {
132         Lock l(cs);
133 
134         bool attemptDone = false;
135 
136         for(auto i = downloads.begin(); i != downloads.end(); ++i) {
137             ConnectionQueueItem* cqi = *i;
138 
139             if(cqi->getState() != ConnectionQueueItem::ACTIVE) {
140                 if(!cqi->getUser().user->isOnline()) {
141                     // Not online anymore...remove it from the pending...
142                     removed.push_back(cqi);
143                     continue;
144                 }
145 
146                 if(cqi->getUser().user->isSet(User::PASSIVE) && !ClientManager::getInstance()->isActive()) {
147                     passiveUsers.push_back(cqi->getUser());
148                     removed.push_back(cqi);
149                     continue;
150                 }
151 
152                 if(cqi->getErrors() == -1 && cqi->getLastAttempt() != 0) {
153                         // protocol error, don't reconnect except after a forced attempt
154                         continue;
155                 }
156 
157                 if(cqi->getLastAttempt() == 0 || (!attemptDone &&
158                     cqi->getLastAttempt() + 60 * 1000 * max(1, cqi->getErrors()) < aTick))
159                 {
160                     cqi->setLastAttempt(aTick);
161 
162                     QueueItem::Priority prio = QueueManager::getInstance()->hasDownload(cqi->getUser());
163 
164                     if(prio == QueueItem::PAUSED) {
165                         removed.push_back(cqi);
166                         continue;
167                     }
168 
169                     bool startDown = DownloadManager::getInstance()->startDownload(prio);
170 
171                     if(cqi->getState() == ConnectionQueueItem::WAITING) {
172                         if(startDown) {
173                             cqi->setState(ConnectionQueueItem::CONNECTING);
174                             ClientManager::getInstance()->connect(cqi->getUser(), cqi->getToken());
175                             fire(ConnectionManagerListener::StatusChanged(), cqi);
176                             attemptDone = true;
177                         } else {
178                             cqi->setState(ConnectionQueueItem::NO_DOWNLOAD_SLOTS);
179                             fire(ConnectionManagerListener::Failed(), cqi, _("All download slots taken"));
180                         }
181                     } else if(cqi->getState() == ConnectionQueueItem::NO_DOWNLOAD_SLOTS && startDown) {
182                         cqi->setState(ConnectionQueueItem::WAITING);
183                     }
184                 } else if(cqi->getState() == ConnectionQueueItem::CONNECTING && cqi->getLastAttempt() + 50 * 1000 < aTick) {
185                     cqi->setErrors(cqi->getErrors() + 1);
186                     fire(ConnectionManagerListener::Failed(), cqi, _("Connection timeout"));
187                     cqi->setState(ConnectionQueueItem::WAITING);
188                 }
189             }
190         }
191 
192         for(auto m = removed.begin(); m != removed.end(); ++m) {
193             putCQI(*m);
194         }
195 
196     }
197 
198     for(auto ui = passiveUsers.begin(); ui != passiveUsers.end(); ++ui) {
199         QueueManager::getInstance()->removeSource(*ui, QueueItem::Source::FLAG_PASSIVE);
200     }
201 }
202 
on(TimerManagerListener::Minute,uint64_t aTick)203 void ConnectionManager::on(TimerManagerListener::Minute, uint64_t aTick) noexcept {
204     Lock l(cs);
205 
206     for(auto j = userConnections.begin(); j != userConnections.end(); ++j) {
207         if(((*j)->getLastActivity() + 180*1000) < aTick) {
208             (*j)->disconnect(true);
209         }
210     }
211 }
212 
213 static const uint32_t FLOOD_TRIGGER = 20000;
214 static const uint32_t FLOOD_ADD = 2000;
215 
Server(bool secure_,uint16_t aPort,const string & ip_)216 ConnectionManager::Server::Server(bool secure_, uint16_t aPort, const string& ip_ /* = "0.0.0.0" */) : port(0), secure(secure_), die(false) {
217     sock.create();
218     sock.setSocketOpt(SO_REUSEADDR, 1);
219     ip = SETTING(BIND_IFACE)? sock.getIfaceI4(SETTING(BIND_IFACE_NAME)).c_str() : ip_;
220     port = sock.bind(aPort, ip);
221     sock.listen();
222 
223     start();
224 }
225 
226 static const uint32_t POLL_TIMEOUT = 250;
227 
run()228 int ConnectionManager::Server::run() noexcept {
229     {
230         char threadName[17];
231         snprintf(threadName, sizeof threadName, "Server_%u", port);
232         setThreadName(threadName);
233     }
234     while(!die) {
235         try {
236             while(!die) {
237                 if(sock.wait(POLL_TIMEOUT, Socket::WAIT_READ) == Socket::WAIT_READ) {
238                     ConnectionManager::getInstance()->accept(sock, secure);
239                 }
240             }
241         } catch(const Exception& e) {
242             dcdebug("ConnectionManager::Server::run Error: %s\n", e.getError().c_str());
243         }
244 
245         bool failed = false;
246         while(!die) {
247             try {
248                 sock.disconnect();
249                 sock.create();
250                 sock.bind(port, ip);
251                 sock.listen();
252                 if(failed) {
253                     LogManager::getInstance()->message(_("Connectivity restored"));
254                     failed = false;
255                 }
256                 break;
257             } catch(const SocketException& e) {
258                 dcdebug("ConnectionManager::Server::run Stopped listening: %s\n", e.getError().c_str());
259 
260                 if(!failed) {
261                     LogManager::getInstance()->message(str(F_("Connectivity error: %1%") % e.getError()));
262                     failed = true;
263                 }
264 
265                 // Spin for 60 seconds
266                 for(int i = 0; i < 60 && !die; ++i) {
267                     Thread::sleep(1000);
268                 }
269             }
270         }
271     }
272     return 0;
273 }
274 
275 /**
276  * Someone's connecting, accept the connection and wait for identification...
277  * It's always the other fellow that starts sending if he made the connection.
278  */
accept(const Socket & sock,bool secure)279 void ConnectionManager::accept(const Socket& sock, bool secure) noexcept {
280     uint64_t now = GET_TICK();
281 
282     if(now > floodCounter) {
283         floodCounter = now + FLOOD_ADD;
284     } else {
285         if(false && now + FLOOD_TRIGGER < floodCounter) {
286             Socket s;
287             try {
288                 s.accept(sock);
289             } catch(const SocketException&) {
290                 // ...
291             }
292             dcdebug("Connection flood detected!\n");
293             return;
294         } else {
295             floodCounter += FLOOD_ADD;
296         }
297     }
298     UserConnection* uc = getConnection(false, secure);
299     uc->setFlag(UserConnection::FLAG_INCOMING);
300     uc->setState(UserConnection::STATE_SUPNICK);
301     uc->setLastActivity(GET_TICK());
302     try {
303         uc->accept(sock);
304     } catch(const Exception&) {
305         putConnection(uc);
306         delete uc;
307     }
308 }
309 
addCTM2HUB(const string & server,const string & port)310 void ConnectionManager::addCTM2HUB(const string &server, const string &port)
311 {
312     Lock l(cs);
313     const string key = server + ':' + port;
314     ddosctm2hub.insert(key);
315 }
316 
nmdcConnect(const string & aServer,uint16_t aPort,const string & aNick,const string & hubUrl,const string & encoding,bool secure)317 void ConnectionManager::nmdcConnect(const string& aServer, uint16_t aPort, const string& aNick, const string& hubUrl, const string& encoding, bool secure) {
318     nmdcConnect(aServer, aPort, 0, BufferedSocket::NAT_NONE, aNick, hubUrl, encoding, secure);
319 }
320 
nmdcConnect(const string & aServer,uint16_t aPort,uint16_t localPort,BufferedSocket::NatRoles natRole,const string & aNick,const string & hubUrl,const string & encoding,bool secure)321 void ConnectionManager::nmdcConnect(const string& aServer, uint16_t aPort, uint16_t localPort, BufferedSocket::NatRoles natRole, const string& aNick, const string& hubUrl, const string& encoding, bool secure) {
322     if(shuttingDown)
323         return;
324     {
325         Lock l(cs);
326         if (!ddosctm2hub.empty() && ddosctm2hub.find(aServer + ":" + Util::toString(aPort)) != ddosctm2hub.end()) {
327             return;
328         }
329     }
330     UserConnection* uc = getConnection(true, secure);
331     uc->setToken(aNick);
332     uc->setHubUrl(hubUrl);
333     uc->setEncoding(encoding);
334     uc->setState(UserConnection::STATE_CONNECT);
335     uc->setFlag(UserConnection::FLAG_NMDC);
336     try {
337         uc->connect(aServer, aPort, localPort, natRole);
338     } catch(const Exception&) {
339         putConnection(uc);
340         delete uc;
341     }
342 }
343 
adcConnect(const OnlineUser & aUser,uint16_t aPort,const string & aToken,bool secure)344 void ConnectionManager::adcConnect(const OnlineUser& aUser, uint16_t aPort, const string& aToken, bool secure) {
345         adcConnect(aUser, aPort, 0, BufferedSocket::NAT_NONE, aToken, secure);
346 }
347 
adcConnect(const OnlineUser & aUser,uint16_t aPort,uint16_t localPort,BufferedSocket::NatRoles natRole,const string & aToken,bool secure)348 void ConnectionManager::adcConnect(const OnlineUser& aUser, uint16_t aPort, uint16_t localPort, BufferedSocket::NatRoles natRole, const string& aToken, bool secure) {
349     if(shuttingDown)
350         return;
351 
352     UserConnection* uc = getConnection(false, secure);
353     uc->setToken(aToken);
354     uc->setEncoding(Text::utf8);
355     uc->setState(UserConnection::STATE_CONNECT);
356 #ifdef WITH_DHT
357     uc->setHubUrl(&aUser.getClient() == NULL ? "DHT" : aUser.getClient().getHubUrl());
358 #else
359     uc->setHubUrl(aUser.getClient().getHubUrl());
360 #endif
361     if(aUser.getIdentity().isOp()) {
362         uc->setFlag(UserConnection::FLAG_OP);
363     }
364     try {
365         uc->connect(aUser.getIdentity().getIp(), aPort, localPort, natRole);
366     } catch(const Exception&) {
367         putConnection(uc);
368         delete uc;
369     }
370 }
371 
disconnect()372 void ConnectionManager::disconnect() noexcept {
373     delete server;
374     delete secureServer;
375 
376     server = secureServer = 0;
377 }
378 
on(AdcCommand::SUP,UserConnection * aSource,const AdcCommand & cmd)379 void ConnectionManager::on(AdcCommand::SUP, UserConnection* aSource, const AdcCommand& cmd) noexcept {
380     if(aSource->getState() != UserConnection::STATE_SUPNICK) {
381         // Already got this once, ignore...@todo fix support updates
382         dcdebug("CM::onSUP %p sent sup twice\n", (void*)aSource);
383         return;
384     }
385 
386     bool baseOk = false;
387 
388     for(auto i = cmd.getParameters().begin(); i != cmd.getParameters().end(); ++i) {
389         if(i->compare(0, 2, "AD") == 0) {
390             string feat = i->substr(2);
391             if(feat == UserConnection::FEATURE_ADC_BASE || feat == UserConnection::FEATURE_ADC_BAS0) {
392                 baseOk = true;
393                 // ADC clients must support all these...
394                 aSource->setFlag(UserConnection::FLAG_SUPPORTS_ADCGET);
395                 aSource->setFlag(UserConnection::FLAG_SUPPORTS_MINISLOTS);
396                 aSource->setFlag(UserConnection::FLAG_SUPPORTS_TTHF);
397                 aSource->setFlag(UserConnection::FLAG_SUPPORTS_TTHL);
398                 // For compatibility with older clients...
399                 aSource->setFlag(UserConnection::FLAG_SUPPORTS_XML_BZLIST);
400             } else if(feat == UserConnection::FEATURE_ZLIB_GET) {
401                 aSource->setFlag(UserConnection::FLAG_SUPPORTS_ZLIB_GET);
402             } else if(feat == UserConnection::FEATURE_ADC_BZIP) {
403                 aSource->setFlag(UserConnection::FLAG_SUPPORTS_XML_BZLIST);
404             }
405         }
406     }
407 
408     if(!baseOk) {
409         aSource->send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_PROTOCOL_GENERIC, "Invalid SUP"));
410         aSource->disconnect();
411         return;
412     }
413 
414     if(aSource->isSet(UserConnection::FLAG_INCOMING)) {
415         StringList defFeatures = adcFeatures;
416         if(BOOLSETTING(COMPRESS_TRANSFERS)) {
417             defFeatures.push_back("AD" + UserConnection::FEATURE_ZLIB_GET);
418         }
419         aSource->sup(defFeatures);
420         aSource->inf(false);
421     } else {
422         aSource->inf(true);
423     }
424     aSource->setState(UserConnection::STATE_INF);
425 }
426 
on(AdcCommand::STA,UserConnection *,const AdcCommand & cmd)427 void ConnectionManager::on(AdcCommand::STA, UserConnection*, const AdcCommand& cmd) noexcept {
428 
429 }
430 
on(UserConnectionListener::Connected,UserConnection * aSource)431 void ConnectionManager::on(UserConnectionListener::Connected, UserConnection* aSource) noexcept {
432     // incorrect check because aSource->getUser().get() == nullptr
433     if(aSource->isSecure() && !aSource->isTrusted() && !BOOLSETTING(ALLOW_UNTRUSTED_CLIENTS)) {
434         putConnection(aSource);
435 //        QueueManager::getInstance()->removeSource(aSource->getUser(), QueueItem::Source::FLAG_UNTRUSTED);
436         return;
437     }
438 
439     dcassert(aSource->getState() == UserConnection::STATE_CONNECT);
440     if(aSource->isSet(UserConnection::FLAG_NMDC)) {
441         aSource->myNick(aSource->getToken());
442         aSource->lock(CryptoManager::getInstance()->getLock(), CryptoManager::getInstance()->getPk() + "Ref=" + aSource->getHubUrl());
443     } else {
444         StringList defFeatures = adcFeatures;
445         if(BOOLSETTING(COMPRESS_TRANSFERS)) {
446             defFeatures.push_back("AD" + UserConnection::FEATURE_ZLIB_GET);
447         }
448         aSource->sup(defFeatures);
449         aSource->send(AdcCommand(AdcCommand::SEV_SUCCESS, AdcCommand::SUCCESS, Util::emptyString).addParam("RF", aSource->getHubUrl()));
450     }
451     aSource->setState(UserConnection::STATE_SUPNICK);
452 }
453 
on(UserConnectionListener::MyNick,UserConnection * aSource,const string & aNick)454 void ConnectionManager::on(UserConnectionListener::MyNick, UserConnection* aSource, const string& aNick) noexcept {
455     if(aSource->getState() != UserConnection::STATE_SUPNICK) {
456         // Already got this once, ignore...
457         dcdebug("CM::onMyNick %p sent nick twice\n", (void*)aSource);
458         return;
459     }
460 
461     dcassert(!aNick.empty());
462     dcdebug("ConnectionManager::onMyNick %p, %s\n", (void*)aSource, aNick.c_str());
463     dcassert(!aSource->getUser());
464 
465     if(aSource->isSet(UserConnection::FLAG_INCOMING)) {
466         // Try to guess where this came from...
467         pair<string, string> i = expectedConnections.remove(aNick);
468         if(i.second.empty()) {
469             dcassert(i.first.empty());
470             dcdebug("Unknown incoming connection from %s\n", aNick.c_str());
471             putConnection(aSource);
472             return;
473         }
474         aSource->setToken(i.first);
475         aSource->setHubUrl(i.second);
476         aSource->setEncoding(ClientManager::getInstance()->findHubEncoding(i.second));
477     }
478 
479     string nick = Text::toUtf8(aNick, aSource->getEncoding());
480     CID cid = ClientManager::getInstance()->makeCid(nick, aSource->getHubUrl());
481 
482     // First, we try looking in the pending downloads...hopefully it's one of them...
483     {
484         Lock l(cs);
485         for(auto i = downloads.begin(); i != downloads.end(); ++i) {
486             ConnectionQueueItem* cqi = *i;
487             cqi->setErrors(0);
488             if((cqi->getState() == ConnectionQueueItem::CONNECTING || cqi->getState() == ConnectionQueueItem::WAITING) &&
489                     cqi->getUser().user->getCID() == cid)
490             {
491                 aSource->setUser(cqi->getUser());
492                 // Indicate that we're interested in this file...
493                 aSource->setFlag(UserConnection::FLAG_DOWNLOAD);
494                 break;
495             }
496         }
497     }
498 
499     if(!aSource->getUser()) {
500         // Make sure we know who it is, i e that he/she is connected...
501 
502         aSource->setUser(ClientManager::getInstance()->findUser(cid));
503         if(!aSource->getUser() || !ClientManager::getInstance()->isOnline(aSource->getUser())) {
504             dcdebug("CM::onMyNick Incoming connection from unknown user %s\n", nick.c_str());
505             putConnection(aSource);
506             return;
507         }
508         // We don't need this connection for downloading...make it an upload connection instead...
509         aSource->setFlag(UserConnection::FLAG_UPLOAD);
510     }
511 
512     if(ClientManager::getInstance()->isOp(aSource->getUser(), aSource->getHubUrl()))
513         aSource->setFlag(UserConnection::FLAG_OP);
514 
515     ClientManager::getInstance()->setIPUser(aSource->getUser(), aSource->getRemoteIp());
516 
517     if( aSource->isSet(UserConnection::FLAG_INCOMING) ) {
518         aSource->myNick(aSource->getToken());
519         aSource->lock(CryptoManager::getInstance()->getLock(), CryptoManager::getInstance()->getPk());
520     }
521 
522     aSource->setState(UserConnection::STATE_LOCK);
523 }
524 
on(UserConnectionListener::CLock,UserConnection * aSource,const string & aLock,const string & aPk)525 void ConnectionManager::on(UserConnectionListener::CLock, UserConnection* aSource, const string& aLock, const string& aPk) noexcept {
526     if(aSource->getState() != UserConnection::STATE_LOCK) {
527         dcdebug("CM::onLock %p received lock twice, ignoring\n", (void*)aSource);
528         return;
529     }
530 
531     if( CryptoManager::getInstance()->isExtended(aLock) ) {
532         StringList defFeatures = features;
533         if(BOOLSETTING(COMPRESS_TRANSFERS)) {
534             defFeatures.push_back(UserConnection::FEATURE_ZLIB_GET);
535         }
536 
537         aSource->supports(defFeatures);
538     }
539 
540     aSource->setState(UserConnection::STATE_DIRECTION);
541     aSource->direction(aSource->getDirectionString(), aSource->getNumber());
542     aSource->key(CryptoManager::getInstance()->makeKey(aLock));
543 }
544 
on(UserConnectionListener::Direction,UserConnection * aSource,const string & dir,const string & num)545 void ConnectionManager::on(UserConnectionListener::Direction, UserConnection* aSource, const string& dir, const string& num) noexcept {
546     if(aSource->getState() != UserConnection::STATE_DIRECTION) {
547         dcdebug("CM::onDirection %p received direction twice, ignoring\n", (void*)aSource);
548         return;
549     }
550 
551     dcassert(aSource->isSet(UserConnection::FLAG_DOWNLOAD) ^ aSource->isSet(UserConnection::FLAG_UPLOAD));
552     if(dir == "Upload") {
553         // Fine, the other fellow want's to send us data...make sure we really want that...
554         if(aSource->isSet(UserConnection::FLAG_UPLOAD)) {
555             // Huh? Strange...disconnect...
556             putConnection(aSource);
557             return;
558         }
559     } else {
560         if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) {
561             int number = Util::toInt(num);
562             // Damn, both want to download...the one with the highest number wins...
563             if(aSource->getNumber() < number) {
564                 // Damn! We lost!
565                 aSource->unsetFlag(UserConnection::FLAG_DOWNLOAD);
566                 aSource->setFlag(UserConnection::FLAG_UPLOAD);
567             } else if(aSource->getNumber() == number) {
568                 putConnection(aSource);
569                 return;
570             }
571         }
572     }
573 
574     dcassert(aSource->isSet(UserConnection::FLAG_DOWNLOAD) ^ aSource->isSet(UserConnection::FLAG_UPLOAD));
575 
576     aSource->setState(UserConnection::STATE_KEY);
577 }
578 
addDownloadConnection(UserConnection * uc)579 void ConnectionManager::addDownloadConnection(UserConnection* uc) {
580     dcassert(uc->isSet(UserConnection::FLAG_DOWNLOAD));
581     bool addConn = false;
582     {
583         Lock l(cs);
584 
585         auto i = find(downloads.begin(), downloads.end(), uc->getUser());
586         if(i != downloads.end()) {
587             ConnectionQueueItem* cqi = *i;
588             if(cqi->getState() == ConnectionQueueItem::WAITING || cqi->getState() == ConnectionQueueItem::CONNECTING) {
589                 cqi->setState(ConnectionQueueItem::ACTIVE);
590                 uc->setFlag(UserConnection::FLAG_ASSOCIATED);
591 
592                 fire(ConnectionManagerListener::Connected(), cqi);
593 
594                 dcdebug("ConnectionManager::addDownloadConnection, leaving to downloadmanager\n");
595                 addConn = true;
596             }
597         }
598     }
599 
600     if(addConn) {
601         DownloadManager::getInstance()->addConnection(uc);
602     } else {
603         putConnection(uc);
604     }
605 }
606 
addUploadConnection(UserConnection * uc)607 void ConnectionManager::addUploadConnection(UserConnection* uc) {
608     dcassert(uc->isSet(UserConnection::FLAG_UPLOAD));
609 
610     bool addConn = false;
611     {
612         Lock l(cs);
613 
614         auto i = find(uploads.begin(), uploads.end(), uc->getUser());
615         if(i == uploads.end()) {
616             ConnectionQueueItem* cqi = getCQI(uc->getHintedUser(), false);
617 
618             cqi->setState(ConnectionQueueItem::ACTIVE);
619             uc->setFlag(UserConnection::FLAG_ASSOCIATED);
620 
621             fire(ConnectionManagerListener::Connected(), cqi);
622 
623             dcdebug("ConnectionManager::addUploadConnection, leaving to uploadmanager\n");
624             addConn = true;
625         }
626     }
627 
628     if(addConn) {
629         UploadManager::getInstance()->addConnection(uc);
630     } else {
631         putConnection(uc);
632     }
633 }
634 
on(UserConnectionListener::Key,UserConnection * aSource,const string &)635 void ConnectionManager::on(UserConnectionListener::Key, UserConnection* aSource, const string&/* aKey*/) noexcept {
636     if(aSource->getState() != UserConnection::STATE_KEY) {
637         dcdebug("CM::onKey Bad state, ignoring");
638         return;
639     }
640 
641     dcassert(aSource->getUser());
642 
643     if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) {
644         addDownloadConnection(aSource);
645     } else {
646         addUploadConnection(aSource);
647     }
648 }
649 
on(AdcCommand::INF,UserConnection * aSource,const AdcCommand & cmd)650 void ConnectionManager::on(AdcCommand::INF, UserConnection* aSource, const AdcCommand& cmd) noexcept {
651     if(aSource->getState() != UserConnection::STATE_INF) {
652         aSource->send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_PROTOCOL_GENERIC, "Expecting INF"));
653         aSource->disconnect();
654         return;
655     }
656 
657     string cid;
658     if(!cmd.getParam("ID", 0, cid)) {
659         aSource->send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_INF_MISSING, "ID missing").addParam("FL", "ID"));
660         dcdebug("CM::onINF missing ID\n");
661         aSource->disconnect();
662         return;
663     }
664 
665     aSource->setUser(ClientManager::getInstance()->findUser(CID(cid)));
666 
667     if(!aSource->getUser()) {
668         dcdebug("CM::onINF: User not found");
669         aSource->send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_GENERIC, "User not found"));
670         putConnection(aSource);
671         return;
672     }
673 
674     if(!checkKeyprint(aSource)) {
675         putConnection(aSource);
676         return;
677     }
678 
679     string token;
680     if(aSource->isSet(UserConnection::FLAG_INCOMING)) {
681         if(!cmd.getParam("TO", 0, token)) {
682             aSource->send(AdcCommand(AdcCommand::SEV_FATAL, AdcCommand::ERROR_GENERIC, "TO missing"));
683             putConnection(aSource);
684             return;
685         }
686     } else {
687         token = aSource->getToken();
688     }
689 
690     bool down = false;
691     {
692         Lock l(cs);
693         auto i = find(downloads.begin(), downloads.end(), aSource->getUser());
694 
695         if(i != downloads.end()) {
696             (*i)->setErrors(0);
697 
698             const string& to = (*i)->getToken();
699 
700             if(to == token) {
701                 down = true;
702             }
703         }
704         /** @todo check tokens for upload connections */
705     }
706 
707     if(down) {
708         aSource->setFlag(UserConnection::FLAG_DOWNLOAD);
709         addDownloadConnection(aSource);
710     } else {
711         aSource->setFlag(UserConnection::FLAG_UPLOAD);
712         addUploadConnection(aSource);
713     }
714 }
715 
force(const UserPtr & aUser)716 void ConnectionManager::force(const UserPtr& aUser) {
717     Lock l(cs);
718 
719     auto i = find(downloads.begin(), downloads.end(), aUser);
720     if(i == downloads.end()) {
721         return;
722     }
723 
724     (*i)->setLastAttempt(0);
725 }
726 
checkKeyprint(UserConnection * aSource)727 bool ConnectionManager::checkKeyprint(UserConnection *aSource) {
728     dcassert(aSource->getUser());
729 
730     auto kp = aSource->getKeyprint();
731     if(kp.empty()) {
732         return true;
733     }
734 
735     auto kp2 = ClientManager::getInstance()->getField(aSource->getUser()->getCID(), aSource->getHubUrl(), "KP");
736     if(kp2.empty()) {
737         // TODO false probably
738         return true;
739     }
740 
741     if(kp2.compare(0, 7, "SHA256/") != 0) {
742         // Unsupported hash
743         return true;
744     }
745 
746     dcdebug("Keyprint: %s vs %s\n", Encoder::toBase32(&kp[0], kp.size()).c_str(), kp2.c_str() + 7);
747 
748     vector<uint8_t> kp2v(kp.size());
749     Encoder::fromBase32(&kp2[7], &kp2v[0], kp2v.size());
750     if(!std::equal(kp.begin(), kp.end(), kp2v.begin())) {
751         dcdebug("Not equal...\n");
752         return false;
753     }
754 
755     return true;
756 }
757 
failed(UserConnection * aSource,const string & aError,bool protocolError)758 void ConnectionManager::failed(UserConnection* aSource, const string& aError, bool protocolError) {
759     Lock l(cs);
760 
761     if(aSource->isSet(UserConnection::FLAG_ASSOCIATED)) {
762         if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) {
763             auto i = find(downloads.begin(), downloads.end(), aSource->getUser());
764             dcassert(i != downloads.end());
765             ConnectionQueueItem* cqi = *i;
766             cqi->setState(ConnectionQueueItem::WAITING);
767             cqi->setLastAttempt(GET_TICK());
768             cqi->setErrors(protocolError ? -1 : (cqi->getErrors() + 1));
769             fire(ConnectionManagerListener::Failed(), cqi, aError);
770         } else if(aSource->isSet(UserConnection::FLAG_UPLOAD)) {
771             auto i = find(uploads.begin(), uploads.end(), aSource->getUser());
772             dcassert(i != uploads.end());
773             ConnectionQueueItem* cqi = *i;
774             putCQI(cqi);
775         }
776     }
777     putConnection(aSource);
778 }
779 
on(UserConnectionListener::Failed,UserConnection * aSource,const string & aError)780 void ConnectionManager::on(UserConnectionListener::Failed, UserConnection* aSource, const string& aError) noexcept {
781     failed(aSource, aError, false);
782 }
783 
on(UserConnectionListener::ProtocolError,UserConnection * aSource,const string & aError)784 void ConnectionManager::on(UserConnectionListener::ProtocolError, UserConnection* aSource, const string& aError) noexcept {
785     failed(aSource, aError, true);
786 }
787 
disconnect(const UserPtr & aUser)788 void ConnectionManager::disconnect(const UserPtr& aUser) {
789     Lock l(cs);
790     for(auto i = userConnections.begin(); i != userConnections.end(); ++i) {
791         UserConnection* uc = *i;
792         if(uc->getUser() == aUser)
793             uc->disconnect(true);
794     }
795 }
796 
disconnect(const UserPtr & aUser,int isDownload)797 void ConnectionManager::disconnect(const UserPtr& aUser, int isDownload) {
798     Lock l(cs);
799     for(auto i = userConnections.begin(); i != userConnections.end(); ++i) {
800         UserConnection* uc = *i;
801         if(uc->getUser() == aUser && uc->isSet(isDownload ? UserConnection::FLAG_DOWNLOAD : UserConnection::FLAG_UPLOAD)) {
802             uc->disconnect(true);
803             break;
804         }
805     }
806 }
807 
shutdown()808 void ConnectionManager::shutdown() {
809     TimerManager::getInstance()->removeListener(this);
810     shuttingDown = true;
811     disconnect();
812     {
813         Lock l(cs);
814         for(auto j = userConnections.begin(); j != userConnections.end(); ++j) {
815             (*j)->disconnect(true);
816         }
817     }
818     // Wait until all connections have died out...
819     while(true) {
820         {
821             Lock l(cs);
822             if(userConnections.empty()) {
823                 break;
824             }
825         }
826         Thread::sleep(50);
827     }
828 }
829 
830 // UserConnectionListener
on(UserConnectionListener::Supports,UserConnection * conn,const StringList & feat)831 void ConnectionManager::on(UserConnectionListener::Supports, UserConnection* conn, const StringList& feat) noexcept {
832     for(auto i = feat.begin(); i != feat.end(); ++i) {
833         if(*i == UserConnection::FEATURE_MINISLOTS) {
834             conn->setFlag(UserConnection::FLAG_SUPPORTS_MINISLOTS);
835         } else if(*i == UserConnection::FEATURE_XML_BZLIST) {
836             conn->setFlag(UserConnection::FLAG_SUPPORTS_XML_BZLIST);
837         } else if(*i == UserConnection::FEATURE_ADCGET) {
838             conn->setFlag(UserConnection::FLAG_SUPPORTS_ADCGET);
839         } else if(*i == UserConnection::FEATURE_ZLIB_GET) {
840             conn->setFlag(UserConnection::FLAG_SUPPORTS_ZLIB_GET);
841         } else if(*i == UserConnection::FEATURE_TTHL) {
842             conn->setFlag(UserConnection::FLAG_SUPPORTS_TTHL);
843         } else if(*i == UserConnection::FEATURE_TTHF) {
844             conn->setFlag(UserConnection::FLAG_SUPPORTS_TTHF);
845         }
846     }
847 }
848 
849 } // namespace dcpp
850