1 /*
2 SPDX-FileCopyrightText: 2005 Joris Guisson <joris.guisson@gmail.com>
3
4 SPDX-License-Identifier: GPL-2.0-or-later
5 */
6 #include "peermanager.h"
7
8 #include <QDateTime>
9 #include <QFile>
10 #include <QList>
11 #include <QSet>
12 #include <QTextStream>
13 #include <QtAlgorithms>
14 #include <klocalizedstring.h>
15
16 #include "authenticate.h"
17 #include "authenticationmonitor.h"
18 #include "chunkcounter.h"
19 #include "connectionlimit.h"
20 #include "peer.h"
21 #include "peerconnector.h"
22 #include <dht/dhtbase.h>
23 #include <mse/encryptedauthenticate.h>
24 #include <mse/encryptedpacketsocket.h>
25 #include <net/address.h>
26 #include <peer/accessmanager.h>
27 #include <peer/peer.h>
28 #include <peer/peerid.h>
29 #include <torrent/globals.h>
30 #include <torrent/server.h>
31 #include <torrent/torrent.h>
32 #include <util/bitset.h>
33 #include <util/error.h>
34 #include <util/file.h>
35 #include <util/functions.h>
36 #include <util/log.h>
37 #include <util/ptrmap.h>
38
39 using namespace KNetwork;
40
41 namespace bt
42 {
43 typedef std::map<net::Address, bool>::iterator PPItr;
44 typedef QMap<Uint32, Peer::Ptr> PeerMap;
45
46 static ConnectionLimit climit;
47
48 class PeerManager::Private
49 {
50 public:
51 Private(PeerManager *p, Torrent &tor);
52 ~Private();
53
54 void updateAvailableChunks();
55 bool killBadPeer();
56 void createPeer(mse::EncryptedPacketSocket::Ptr sock, const PeerID &peer_id, Uint32 support, bool local, ConnectionLimit::Token::Ptr token);
57 bool connectedTo(const net::Address &addr) const;
58 void update();
59 void have(Peer *peer, Uint32 index);
60 void connectToPeers();
61
62 public:
63 PeerManager *p;
64 QMap<Uint32, Peer::Ptr> peer_map;
65 Torrent &tor;
66 bool started;
67 BitSet available_chunks, wanted_chunks;
68 ChunkCounter cnt;
69 bool pex_on;
70 bool wanted_changed;
71 PieceHandler *piece_handler;
72 bool paused;
73 QSet<PeerConnector::Ptr> connectors;
74 QScopedPointer<SuperSeeder> superseeder;
75 std::map<net::Address, bool> potential_peers;
76 bool partial_seed;
77 Uint32 num_cleared;
78 };
79
PeerManager(Torrent & tor)80 PeerManager::PeerManager(Torrent &tor)
81 : d(new Private(this, tor))
82 {
83 }
84
~PeerManager()85 PeerManager::~PeerManager()
86 {
87 delete d;
88 }
89
connectionLimits()90 ConnectionLimit &PeerManager::connectionLimits()
91 {
92 return climit;
93 }
94
pause()95 void PeerManager::pause()
96 {
97 if (d->paused)
98 return;
99
100 for (Peer::Ptr p : qAsConst(d->peer_map)) {
101 p->pause();
102 }
103 d->paused = true;
104 }
105
unpause()106 void PeerManager::unpause()
107 {
108 if (!d->paused)
109 return;
110
111 for (Peer::Ptr p : qAsConst(d->peer_map)) {
112 p->unpause();
113 if (p->hasWantedChunks(d->wanted_chunks)) // send interested when it has wanted chunks
114 p->sendInterested();
115 }
116 d->paused = false;
117 }
118
update()119 void PeerManager::update()
120 {
121 d->update();
122 }
123
124 #if 0
125
126 void PeerManager::setMaxTotalConnections(Uint32 max)
127 {
128 #ifndef Q_WS_WIN
129 Uint32 sys_max = bt::MaxOpenFiles() - 50; // leave about 50 free for regular files
130 #else
131 Uint32 sys_max = 9999; // there isn't a real limit on windows
132 #endif
133 max_total_connections = max;
134 if (max == 0 || max_total_connections > sys_max)
135 max_total_connections = sys_max;
136 }
137 #endif
138
setWantedChunks(const BitSet & bs)139 void PeerManager::setWantedChunks(const BitSet &bs)
140 {
141 d->wanted_chunks = bs;
142 d->wanted_changed = true;
143 }
144
addPotentialPeer(const net::Address & addr,bool local)145 void PeerManager::addPotentialPeer(const net::Address &addr, bool local)
146 {
147 if (d->potential_peers.size() < 500) {
148 d->potential_peers[addr] = local;
149 }
150 }
151
killSeeders()152 void PeerManager::killSeeders()
153 {
154 for (Peer::Ptr peer : qAsConst(d->peer_map)) {
155 if (peer->isSeeder())
156 peer->kill();
157 }
158 }
159
killUninterested()160 void PeerManager::killUninterested()
161 {
162 QTime now = QTime::currentTime();
163 for (Peer::Ptr peer : qAsConst(d->peer_map)) {
164 if (!peer->isInterested() && (peer->getConnectTime().secsTo(now) > 30))
165 peer->kill();
166 }
167 }
168
have(Peer * p,Uint32 index)169 void PeerManager::have(Peer *p, Uint32 index)
170 {
171 d->have(p, index);
172 }
173
bitSetReceived(Peer * p,const BitSet & bs)174 void PeerManager::bitSetReceived(Peer *p, const BitSet &bs)
175 {
176 bool interested = false;
177 for (Uint32 i = 0; i < bs.getNumBits(); i++) {
178 if (bs.get(i)) {
179 if (d->wanted_chunks.get(i))
180 interested = true;
181 d->available_chunks.set(i, true);
182 d->cnt.inc(i);
183 }
184 }
185
186 if (interested && !d->paused)
187 p->sendInterested();
188
189 if (d->superseeder)
190 d->superseeder->bitset(p, bs);
191 }
192
newConnection(mse::EncryptedPacketSocket::Ptr sock,const PeerID & peer_id,Uint32 support)193 void PeerManager::newConnection(mse::EncryptedPacketSocket::Ptr sock, const PeerID &peer_id, Uint32 support)
194 {
195 if (!d->started)
196 return;
197
198 ConnectionLimit::Token::Ptr token = climit.acquire(d->tor.getInfoHash());
199 if (!token) {
200 d->killBadPeer();
201 token = climit.acquire(d->tor.getInfoHash());
202 }
203
204 if (token)
205 d->createPeer(sock, peer_id, support, false, token);
206 }
207
peerAuthenticated(bt::Authenticate * auth,bt::PeerConnector::WPtr pcon,bool ok,bt::ConnectionLimit::Token::Ptr token)208 void PeerManager::peerAuthenticated(bt::Authenticate *auth, bt::PeerConnector::WPtr pcon, bool ok, bt::ConnectionLimit::Token::Ptr token)
209 {
210 if (d->started) {
211 if (ok && !connectedTo(auth->getPeerID()))
212 d->createPeer(auth->getSocket(), auth->getPeerID(), auth->supportedExtensions(), auth->isLocal(), token);
213 }
214
215 PeerConnector::Ptr ptr = pcon.toStrongRef();
216 d->connectors.remove(ptr);
217 }
218
connectedTo(const PeerID & peer_id)219 bool PeerManager::connectedTo(const PeerID &peer_id)
220 {
221 if (!d->started)
222 return false;
223
224 for (const Peer::Ptr &p : qAsConst(d->peer_map)) {
225 if (p->getPeerID() == peer_id)
226 return true;
227 }
228 return false;
229 }
230
closeAllConnections()231 void PeerManager::closeAllConnections()
232 {
233 d->peer_map.clear();
234 }
235
savePeerList(const QString & file)236 void PeerManager::savePeerList(const QString &file)
237 {
238 // Lets save the entries line based
239 QFile fptr(file);
240 if (!fptr.open(QIODevice::WriteOnly))
241 return;
242
243 try {
244 Out(SYS_GEN | LOG_DEBUG) << "Saving list of peers to " << file << endl;
245
246 QTextStream out(&fptr);
247 // first the active peers
248 for (const Peer::Ptr &p : qAsConst(d->peer_map)) {
249 const net::Address &addr = p->getAddress();
250 out << addr.toString() << " " << (unsigned short)addr.port() << Qt::endl;
251 }
252
253 // now the potential_peers
254 std::map<net::Address, bool>::const_iterator i = d->potential_peers.cbegin();
255 while (i != d->potential_peers.cend()) {
256 out << i->first.toString() << " " << i->first.port() << Qt::endl;
257 ++i;
258 }
259 } catch (bt::Error &err) {
260 Out(SYS_GEN | LOG_DEBUG) << "Error happened during saving of peer list : " << err.toString() << endl;
261 }
262 }
263
loadPeerList(const QString & file)264 void PeerManager::loadPeerList(const QString &file)
265 {
266 QFile fptr(file);
267 if (!fptr.open(QIODevice::ReadOnly))
268 return;
269
270 try {
271 Out(SYS_GEN | LOG_DEBUG) << "Loading list of peers from " << file << endl;
272
273 while (!fptr.atEnd()) {
274 QStringList sl = QString(fptr.readLine()).split(" ");
275 if (sl.count() != 2)
276 continue;
277
278 bool ok = false;
279 net::Address addr(sl[0], sl[1].toInt(&ok));
280 if (ok)
281 addPotentialPeer(addr, false);
282 }
283 } catch (bt::Error &err) {
284 Out(SYS_GEN | LOG_DEBUG) << "Error happened during saving of peer list : " << err.toString() << endl;
285 }
286 }
287
start(bool superseed)288 void PeerManager::start(bool superseed)
289 {
290 d->started = true;
291 if (superseed && !d->superseeder)
292 d->superseeder.reset(new SuperSeeder(d->cnt.getNumChunks()));
293
294 unpause();
295 ServerInterface::addPeerManager(this);
296 }
297
stop()298 void PeerManager::stop()
299 {
300 d->cnt.reset();
301 d->available_chunks.clear();
302 d->started = false;
303 ServerInterface::removePeerManager(this);
304 d->connectors.clear();
305 d->superseeder.reset();
306 closeAllConnections();
307 }
308
findPeer(Uint32 peer_id)309 Peer::Ptr PeerManager::findPeer(Uint32 peer_id)
310 {
311 PeerMap::iterator i = d->peer_map.find(peer_id);
312 if (i == d->peer_map.end())
313 return Peer::Ptr();
314 else
315 return *i;
316 }
317
findPeer(PieceDownloader * pd)318 Peer::Ptr PeerManager::findPeer(PieceDownloader *pd)
319 {
320 for (Peer::Ptr p : qAsConst(d->peer_map)) {
321 if ((PieceDownloader *)p->getPeerDownloader() == pd)
322 return p;
323 }
324 return Peer::Ptr();
325 }
326
rerunChoker()327 void PeerManager::rerunChoker()
328 {
329 d->num_cleared++;
330 }
331
chokerNeedsToRun() const332 bool PeerManager::chokerNeedsToRun() const
333 {
334 return d->num_cleared > 0;
335 }
336
peerSourceReady(PeerSource * ps)337 void PeerManager::peerSourceReady(PeerSource *ps)
338 {
339 net::Address addr;
340 bool local = false;
341 while (ps->takePeer(addr, local))
342 addPotentialPeer(addr, local);
343 }
344
pex(const QByteArray & arr)345 void PeerManager::pex(const QByteArray &arr)
346 {
347 if (!d->pex_on)
348 return;
349
350 Out(SYS_CON | LOG_NOTICE) << "PEX: found " << (arr.size() / 6) << " peers" << endl;
351 for (int i = 0; i + 6 <= arr.size(); i += 6) {
352 const Uint8 *tmp = (const Uint8 *)arr.data() + i;
353 addPotentialPeer(net::Address(ReadUint32(tmp, 0), ReadUint16(tmp, 4)), false);
354 }
355 }
356
setPexEnabled(bool on)357 void PeerManager::setPexEnabled(bool on)
358 {
359 if (on && d->tor.isPrivate())
360 return;
361
362 if (d->pex_on == on)
363 return;
364
365 for (Peer::Ptr p : qAsConst(d->peer_map)) {
366 if (!p->isKilled()) {
367 p->setPexEnabled(on);
368 bt::Uint16 port = ServerInterface::getPort();
369 p->sendExtProtHandshake(port, d->tor.getMetaData().size(), d->partial_seed);
370 }
371 }
372 d->pex_on = on;
373 }
374
setGroupIDs(Uint32 up,Uint32 down)375 void PeerManager::setGroupIDs(Uint32 up, Uint32 down)
376 {
377 for (Peer::Ptr p : qAsConst(d->peer_map))
378 p->setGroupIDs(up, down);
379 }
380
portPacketReceived(const QString & ip,Uint16 port)381 void PeerManager::portPacketReceived(const QString &ip, Uint16 port)
382 {
383 if (Globals::instance().getDHT().isRunning() && !d->tor.isPrivate())
384 Globals::instance().getDHT().portReceived(ip, port);
385 }
386
pieceReceived(const Piece & p)387 void PeerManager::pieceReceived(const Piece &p)
388 {
389 if (d->piece_handler)
390 d->piece_handler->pieceReceived(p);
391 }
392
setPieceHandler(PieceHandler * ph)393 void PeerManager::setPieceHandler(PieceHandler *ph)
394 {
395 d->piece_handler = ph;
396 }
397
killStalePeers()398 void PeerManager::killStalePeers()
399 {
400 for (Peer::Ptr p : qAsConst(d->peer_map)) {
401 if (p->getDownloadRate() == 0 && p->getUploadRate() == 0)
402 p->kill();
403 }
404 }
405
setSuperSeeding(bool on,const BitSet & chunks)406 void PeerManager::setSuperSeeding(bool on, const BitSet &chunks)
407 {
408 Q_UNUSED(chunks);
409 if ((d->superseeder && on) || (!d->superseeder && !on))
410 return;
411
412 d->superseeder.reset(on ? new SuperSeeder(d->cnt.getNumChunks()) : nullptr);
413
414 // When entering or exiting superseeding mode kill all peers
415 // but first add the current list to the potential_peers list, so we can reconnect later.
416 for (Peer::Ptr p : qAsConst(d->peer_map)) {
417 const net::Address &addr = p->getAddress();
418 addPotentialPeer(addr, false);
419 p->kill();
420 }
421 }
422
sendHave(Uint32 index)423 void PeerManager::sendHave(Uint32 index)
424 {
425 if (d->superseeder)
426 return;
427
428 for (Peer::Ptr peer : qAsConst(d->peer_map)) {
429 peer->sendHave(index);
430 }
431 }
432
getNumConnectedPeers() const433 Uint32 PeerManager::getNumConnectedPeers() const
434 {
435 return d->peer_map.count();
436 }
437
getNumConnectedLeechers() const438 Uint32 PeerManager::getNumConnectedLeechers() const
439 {
440 Uint32 cnt = 0;
441 for (const Peer::Ptr &peer : qAsConst(d->peer_map)) {
442 if (!peer->isSeeder())
443 cnt++;
444 }
445
446 return cnt;
447 }
448
getNumConnectedSeeders() const449 Uint32 PeerManager::getNumConnectedSeeders() const
450 {
451 Uint32 cnt = 0;
452 for (const Peer::Ptr &peer : qAsConst(d->peer_map)) {
453 if (peer->isSeeder())
454 cnt++;
455 }
456
457 return cnt;
458 }
459
getNumPending() const460 Uint32 PeerManager::getNumPending() const
461 {
462 return d->connectors.size();
463 }
464
getAvailableChunksBitSet() const465 const bt::BitSet &PeerManager::getAvailableChunksBitSet() const
466 {
467 return d->available_chunks;
468 }
469
getChunkCounter()470 ChunkCounter &PeerManager::getChunkCounter()
471 {
472 return d->cnt;
473 }
474
isPexEnabled() const475 bool PeerManager::isPexEnabled() const
476 {
477 return d->pex_on;
478 }
479
getTorrent() const480 const bt::Torrent &PeerManager::getTorrent() const
481 {
482 return d->tor;
483 }
484
isStarted() const485 bool PeerManager::isStarted() const
486 {
487 return d->started;
488 }
489
visit(PeerManager::PeerVisitor & visitor)490 void PeerManager::visit(PeerManager::PeerVisitor &visitor)
491 {
492 for (const Peer::Ptr &p : qAsConst(d->peer_map)) {
493 visitor.visit(p);
494 }
495 }
496
uploadRate() const497 Uint32 PeerManager::uploadRate() const
498 {
499 Uint32 rate = 0;
500 for (const Peer::Ptr &p : qAsConst(d->peer_map)) {
501 rate += p->getUploadRate();
502 }
503 return rate;
504 }
505
getPeers() const506 QList<Peer::Ptr> PeerManager::getPeers() const
507 {
508 return d->peer_map.values();
509 }
510
setPartialSeed(bool partial_seed)511 void PeerManager::setPartialSeed(bool partial_seed)
512 {
513 if (d->partial_seed != partial_seed) {
514 d->partial_seed = partial_seed;
515
516 // If partial seeding status changes, update all peers
517 bt::Uint16 port = ServerInterface::getPort();
518 for (Peer::Ptr peer : qAsConst(d->peer_map)) {
519 peer->sendExtProtHandshake(port, d->tor.getMetaData().size(), d->partial_seed);
520 }
521 }
522 }
523
isPartialSeed() const524 bool PeerManager::isPartialSeed() const
525 {
526 return d->partial_seed;
527 }
528
529 //////////////////////////////////////////////////
530
Private(PeerManager * p,Torrent & tor)531 PeerManager::Private::Private(PeerManager *p, Torrent &tor)
532 : p(p)
533 , tor(tor)
534 , available_chunks(tor.getNumChunks())
535 , wanted_chunks(tor.getNumChunks())
536 , cnt(tor.getNumChunks())
537 , partial_seed(false)
538 , num_cleared(0)
539 {
540 started = false;
541 wanted_chunks.setAll(true);
542 wanted_changed = false;
543 pex_on = !tor.isPrivate();
544 piece_handler = nullptr;
545 paused = false;
546 }
547
~Private()548 PeerManager::Private::~Private()
549 {
550 ServerInterface::removePeerManager(p);
551 started = false;
552 connectors.clear();
553 }
554
update()555 void PeerManager::Private::update()
556 {
557 if (!started)
558 return;
559
560 num_cleared = 0;
561
562 // update the speed of each peer,
563 // and get rid of some killed peers
564 for (PeerMap::iterator i = peer_map.begin(); i != peer_map.end();) {
565 Peer::Ptr peer = i.value();
566 if (!peer->isKilled() && peer->isStalled()) {
567 p->addPotentialPeer(peer->getAddress(), peer->getStats().local);
568 peer->kill();
569 }
570
571 if (peer->isKilled()) {
572 cnt.decBitSet(peer->getBitSet());
573 updateAvailableChunks();
574 i = peer_map.erase(i);
575 p->peerKilled(peer.data());
576 if (superseeder)
577 superseeder->peerRemoved(peer.data());
578 num_cleared++;
579 } else {
580 peer->update();
581 if (wanted_changed) {
582 if (peer->hasWantedChunks(wanted_chunks))
583 peer->sendInterested();
584 else
585 peer->sendNotInterested();
586 }
587 ++i;
588 }
589 }
590
591 wanted_changed = false;
592 connectToPeers();
593 }
594
have(Peer * peer,Uint32 index)595 void PeerManager::Private::have(Peer *peer, Uint32 index)
596 {
597 if (wanted_chunks.get(index) && !paused)
598 peer->sendInterested();
599 available_chunks.set(index, true);
600 cnt.inc(index);
601 if (superseeder)
602 superseeder->have(peer, index);
603 }
604
updateAvailableChunks()605 void PeerManager::Private::updateAvailableChunks()
606 {
607 for (Uint32 i = 0; i < available_chunks.getNumBits(); i++) {
608 available_chunks.set(i, cnt.get(i) > 0);
609 }
610 }
611
killBadPeer()612 bool PeerManager::Private::killBadPeer()
613 {
614 for (Peer::Ptr peer : qAsConst(peer_map)) {
615 if (peer->getStats().aca_score <= -5.0 && peer->getStats().aca_score > -50.0) {
616 Out(SYS_GEN | LOG_DEBUG) << "Killing bad peer, to make room for other peers" << endl;
617 peer->kill();
618 return true;
619 }
620 }
621 return false;
622 }
623
createPeer(mse::EncryptedPacketSocket::Ptr sock,const bt::PeerID & peer_id,Uint32 support,bool local,bt::ConnectionLimit::Token::Ptr token)624 void PeerManager::Private::createPeer(mse::EncryptedPacketSocket::Ptr sock,
625 const bt::PeerID &peer_id,
626 Uint32 support,
627 bool local,
628 bt::ConnectionLimit::Token::Ptr token)
629 {
630 Peer::Ptr peer(new Peer(sock, peer_id, tor.getNumChunks(), tor.getChunkSize(), support, local, token, p));
631 peer_map.insert(peer->getID(), peer);
632 p->newPeer(peer.data());
633 peer->setPexEnabled(pex_on);
634 // send extension protocol handshake
635 bt::Uint16 port = ServerInterface::getPort();
636 peer->sendExtProtHandshake(port, tor.getMetaData().size(), partial_seed);
637
638 if (superseeder)
639 superseeder->peerAdded(peer.data());
640 }
641
connectedTo(const net::Address & addr) const642 bool PeerManager::Private::connectedTo(const net::Address &addr) const
643 {
644 PeerMap::const_iterator i = peer_map.begin();
645 while (i != peer_map.end()) {
646 if (i.value()->getAddress() == addr)
647 return true;
648 ++i;
649 }
650 return false;
651 }
652
connectToPeers()653 void PeerManager::Private::connectToPeers()
654 {
655 if (paused || potential_peers.size() == 0 || (Uint32)connectors.size() > MAX_SIMULTANIOUS_AUTHS)
656 return;
657
658 while (!potential_peers.empty()) {
659 if ((Uint32)connectors.size() > MAX_SIMULTANIOUS_AUTHS)
660 return;
661
662 PPItr itr = potential_peers.begin();
663
664 AccessManager &aman = AccessManager::instance();
665
666 if (aman.allowed(itr->first) && !connectedTo(itr->first)) {
667 ConnectionLimit::Token::Ptr token = climit.acquire(tor.getInfoHash());
668 if (!token)
669 break;
670
671 PeerConnector::Ptr pcon(new PeerConnector(itr->first, itr->second, p, token));
672 pcon->setWeakPointer(PeerConnector::WPtr(pcon));
673 connectors.insert(pcon);
674 pcon->start();
675 }
676
677 potential_peers.erase(itr);
678 }
679 }
680
681 }
682