1 /*
2     SPDX-FileCopyrightText: 2005 Joris Guisson <joris.guisson@gmail.com>
3 
4     SPDX-License-Identifier: GPL-2.0-or-later
5 */
6 #include "peermanager.h"
7 
8 #include <QDateTime>
9 #include <QFile>
10 #include <QList>
11 #include <QSet>
12 #include <QTextStream>
13 #include <QtAlgorithms>
14 #include <klocalizedstring.h>
15 
16 #include "authenticate.h"
17 #include "authenticationmonitor.h"
18 #include "chunkcounter.h"
19 #include "connectionlimit.h"
20 #include "peer.h"
21 #include "peerconnector.h"
22 #include <dht/dhtbase.h>
23 #include <mse/encryptedauthenticate.h>
24 #include <mse/encryptedpacketsocket.h>
25 #include <net/address.h>
26 #include <peer/accessmanager.h>
27 #include <peer/peer.h>
28 #include <peer/peerid.h>
29 #include <torrent/globals.h>
30 #include <torrent/server.h>
31 #include <torrent/torrent.h>
32 #include <util/bitset.h>
33 #include <util/error.h>
34 #include <util/file.h>
35 #include <util/functions.h>
36 #include <util/log.h>
37 #include <util/ptrmap.h>
38 
39 using namespace KNetwork;
40 
41 namespace bt
42 {
43 typedef std::map<net::Address, bool>::iterator PPItr;
44 typedef QMap<Uint32, Peer::Ptr> PeerMap;
45 
46 static ConnectionLimit climit;
47 
48 class PeerManager::Private
49 {
50 public:
51     Private(PeerManager *p, Torrent &tor);
52     ~Private();
53 
54     void updateAvailableChunks();
55     bool killBadPeer();
56     void createPeer(mse::EncryptedPacketSocket::Ptr sock, const PeerID &peer_id, Uint32 support, bool local, ConnectionLimit::Token::Ptr token);
57     bool connectedTo(const net::Address &addr) const;
58     void update();
59     void have(Peer *peer, Uint32 index);
60     void connectToPeers();
61 
62 public:
63     PeerManager *p;
64     QMap<Uint32, Peer::Ptr> peer_map;
65     Torrent &tor;
66     bool started;
67     BitSet available_chunks, wanted_chunks;
68     ChunkCounter cnt;
69     bool pex_on;
70     bool wanted_changed;
71     PieceHandler *piece_handler;
72     bool paused;
73     QSet<PeerConnector::Ptr> connectors;
74     QScopedPointer<SuperSeeder> superseeder;
75     std::map<net::Address, bool> potential_peers;
76     bool partial_seed;
77     Uint32 num_cleared;
78 };
79 
PeerManager(Torrent & tor)80 PeerManager::PeerManager(Torrent &tor)
81     : d(new Private(this, tor))
82 {
83 }
84 
~PeerManager()85 PeerManager::~PeerManager()
86 {
87     delete d;
88 }
89 
connectionLimits()90 ConnectionLimit &PeerManager::connectionLimits()
91 {
92     return climit;
93 }
94 
pause()95 void PeerManager::pause()
96 {
97     if (d->paused)
98         return;
99 
100     for (Peer::Ptr p : qAsConst(d->peer_map)) {
101         p->pause();
102     }
103     d->paused = true;
104 }
105 
unpause()106 void PeerManager::unpause()
107 {
108     if (!d->paused)
109         return;
110 
111     for (Peer::Ptr p : qAsConst(d->peer_map)) {
112         p->unpause();
113         if (p->hasWantedChunks(d->wanted_chunks)) // send interested when it has wanted chunks
114             p->sendInterested();
115     }
116     d->paused = false;
117 }
118 
update()119 void PeerManager::update()
120 {
121     d->update();
122 }
123 
124 #if 0
125 
126 void PeerManager::setMaxTotalConnections(Uint32 max)
127 {
128 #ifndef Q_WS_WIN
129     Uint32 sys_max = bt::MaxOpenFiles() - 50; // leave about 50 free for regular files
130 #else
131     Uint32 sys_max = 9999; // there isn't a real limit on windows
132 #endif
133     max_total_connections = max;
134     if (max == 0 || max_total_connections > sys_max)
135         max_total_connections = sys_max;
136 }
137 #endif
138 
setWantedChunks(const BitSet & bs)139 void PeerManager::setWantedChunks(const BitSet &bs)
140 {
141     d->wanted_chunks = bs;
142     d->wanted_changed = true;
143 }
144 
addPotentialPeer(const net::Address & addr,bool local)145 void PeerManager::addPotentialPeer(const net::Address &addr, bool local)
146 {
147     if (d->potential_peers.size() < 500) {
148         d->potential_peers[addr] = local;
149     }
150 }
151 
killSeeders()152 void PeerManager::killSeeders()
153 {
154     for (Peer::Ptr peer : qAsConst(d->peer_map)) {
155         if (peer->isSeeder())
156             peer->kill();
157     }
158 }
159 
killUninterested()160 void PeerManager::killUninterested()
161 {
162     QTime now = QTime::currentTime();
163     for (Peer::Ptr peer : qAsConst(d->peer_map)) {
164         if (!peer->isInterested() && (peer->getConnectTime().secsTo(now) > 30))
165             peer->kill();
166     }
167 }
168 
have(Peer * p,Uint32 index)169 void PeerManager::have(Peer *p, Uint32 index)
170 {
171     d->have(p, index);
172 }
173 
bitSetReceived(Peer * p,const BitSet & bs)174 void PeerManager::bitSetReceived(Peer *p, const BitSet &bs)
175 {
176     bool interested = false;
177     for (Uint32 i = 0; i < bs.getNumBits(); i++) {
178         if (bs.get(i)) {
179             if (d->wanted_chunks.get(i))
180                 interested = true;
181             d->available_chunks.set(i, true);
182             d->cnt.inc(i);
183         }
184     }
185 
186     if (interested && !d->paused)
187         p->sendInterested();
188 
189     if (d->superseeder)
190         d->superseeder->bitset(p, bs);
191 }
192 
newConnection(mse::EncryptedPacketSocket::Ptr sock,const PeerID & peer_id,Uint32 support)193 void PeerManager::newConnection(mse::EncryptedPacketSocket::Ptr sock, const PeerID &peer_id, Uint32 support)
194 {
195     if (!d->started)
196         return;
197 
198     ConnectionLimit::Token::Ptr token = climit.acquire(d->tor.getInfoHash());
199     if (!token) {
200         d->killBadPeer();
201         token = climit.acquire(d->tor.getInfoHash());
202     }
203 
204     if (token)
205         d->createPeer(sock, peer_id, support, false, token);
206 }
207 
peerAuthenticated(bt::Authenticate * auth,bt::PeerConnector::WPtr pcon,bool ok,bt::ConnectionLimit::Token::Ptr token)208 void PeerManager::peerAuthenticated(bt::Authenticate *auth, bt::PeerConnector::WPtr pcon, bool ok, bt::ConnectionLimit::Token::Ptr token)
209 {
210     if (d->started) {
211         if (ok && !connectedTo(auth->getPeerID()))
212             d->createPeer(auth->getSocket(), auth->getPeerID(), auth->supportedExtensions(), auth->isLocal(), token);
213     }
214 
215     PeerConnector::Ptr ptr = pcon.toStrongRef();
216     d->connectors.remove(ptr);
217 }
218 
connectedTo(const PeerID & peer_id)219 bool PeerManager::connectedTo(const PeerID &peer_id)
220 {
221     if (!d->started)
222         return false;
223 
224     for (const Peer::Ptr &p : qAsConst(d->peer_map)) {
225         if (p->getPeerID() == peer_id)
226             return true;
227     }
228     return false;
229 }
230 
closeAllConnections()231 void PeerManager::closeAllConnections()
232 {
233     d->peer_map.clear();
234 }
235 
savePeerList(const QString & file)236 void PeerManager::savePeerList(const QString &file)
237 {
238     // Lets save the entries line based
239     QFile fptr(file);
240     if (!fptr.open(QIODevice::WriteOnly))
241         return;
242 
243     try {
244         Out(SYS_GEN | LOG_DEBUG) << "Saving list of peers to " << file << endl;
245 
246         QTextStream out(&fptr);
247         // first the active peers
248         for (const Peer::Ptr &p : qAsConst(d->peer_map)) {
249             const net::Address &addr = p->getAddress();
250             out << addr.toString() << " " << (unsigned short)addr.port() << Qt::endl;
251         }
252 
253         // now the potential_peers
254         std::map<net::Address, bool>::const_iterator i = d->potential_peers.cbegin();
255         while (i != d->potential_peers.cend()) {
256             out << i->first.toString() << " " << i->first.port() << Qt::endl;
257             ++i;
258         }
259     } catch (bt::Error &err) {
260         Out(SYS_GEN | LOG_DEBUG) << "Error happened during saving of peer list : " << err.toString() << endl;
261     }
262 }
263 
loadPeerList(const QString & file)264 void PeerManager::loadPeerList(const QString &file)
265 {
266     QFile fptr(file);
267     if (!fptr.open(QIODevice::ReadOnly))
268         return;
269 
270     try {
271         Out(SYS_GEN | LOG_DEBUG) << "Loading list of peers from " << file << endl;
272 
273         while (!fptr.atEnd()) {
274             QStringList sl = QString(fptr.readLine()).split(" ");
275             if (sl.count() != 2)
276                 continue;
277 
278             bool ok = false;
279             net::Address addr(sl[0], sl[1].toInt(&ok));
280             if (ok)
281                 addPotentialPeer(addr, false);
282         }
283     } catch (bt::Error &err) {
284         Out(SYS_GEN | LOG_DEBUG) << "Error happened during saving of peer list : " << err.toString() << endl;
285     }
286 }
287 
start(bool superseed)288 void PeerManager::start(bool superseed)
289 {
290     d->started = true;
291     if (superseed && !d->superseeder)
292         d->superseeder.reset(new SuperSeeder(d->cnt.getNumChunks()));
293 
294     unpause();
295     ServerInterface::addPeerManager(this);
296 }
297 
stop()298 void PeerManager::stop()
299 {
300     d->cnt.reset();
301     d->available_chunks.clear();
302     d->started = false;
303     ServerInterface::removePeerManager(this);
304     d->connectors.clear();
305     d->superseeder.reset();
306     closeAllConnections();
307 }
308 
findPeer(Uint32 peer_id)309 Peer::Ptr PeerManager::findPeer(Uint32 peer_id)
310 {
311     PeerMap::iterator i = d->peer_map.find(peer_id);
312     if (i == d->peer_map.end())
313         return Peer::Ptr();
314     else
315         return *i;
316 }
317 
findPeer(PieceDownloader * pd)318 Peer::Ptr PeerManager::findPeer(PieceDownloader *pd)
319 {
320     for (Peer::Ptr p : qAsConst(d->peer_map)) {
321         if ((PieceDownloader *)p->getPeerDownloader() == pd)
322             return p;
323     }
324     return Peer::Ptr();
325 }
326 
rerunChoker()327 void PeerManager::rerunChoker()
328 {
329     d->num_cleared++;
330 }
331 
chokerNeedsToRun() const332 bool PeerManager::chokerNeedsToRun() const
333 {
334     return d->num_cleared > 0;
335 }
336 
peerSourceReady(PeerSource * ps)337 void PeerManager::peerSourceReady(PeerSource *ps)
338 {
339     net::Address addr;
340     bool local = false;
341     while (ps->takePeer(addr, local))
342         addPotentialPeer(addr, local);
343 }
344 
pex(const QByteArray & arr)345 void PeerManager::pex(const QByteArray &arr)
346 {
347     if (!d->pex_on)
348         return;
349 
350     Out(SYS_CON | LOG_NOTICE) << "PEX: found " << (arr.size() / 6) << " peers" << endl;
351     for (int i = 0; i + 6 <= arr.size(); i += 6) {
352         const Uint8 *tmp = (const Uint8 *)arr.data() + i;
353         addPotentialPeer(net::Address(ReadUint32(tmp, 0), ReadUint16(tmp, 4)), false);
354     }
355 }
356 
setPexEnabled(bool on)357 void PeerManager::setPexEnabled(bool on)
358 {
359     if (on && d->tor.isPrivate())
360         return;
361 
362     if (d->pex_on == on)
363         return;
364 
365     for (Peer::Ptr p : qAsConst(d->peer_map)) {
366         if (!p->isKilled()) {
367             p->setPexEnabled(on);
368             bt::Uint16 port = ServerInterface::getPort();
369             p->sendExtProtHandshake(port, d->tor.getMetaData().size(), d->partial_seed);
370         }
371     }
372     d->pex_on = on;
373 }
374 
setGroupIDs(Uint32 up,Uint32 down)375 void PeerManager::setGroupIDs(Uint32 up, Uint32 down)
376 {
377     for (Peer::Ptr p : qAsConst(d->peer_map))
378         p->setGroupIDs(up, down);
379 }
380 
portPacketReceived(const QString & ip,Uint16 port)381 void PeerManager::portPacketReceived(const QString &ip, Uint16 port)
382 {
383     if (Globals::instance().getDHT().isRunning() && !d->tor.isPrivate())
384         Globals::instance().getDHT().portReceived(ip, port);
385 }
386 
pieceReceived(const Piece & p)387 void PeerManager::pieceReceived(const Piece &p)
388 {
389     if (d->piece_handler)
390         d->piece_handler->pieceReceived(p);
391 }
392 
setPieceHandler(PieceHandler * ph)393 void PeerManager::setPieceHandler(PieceHandler *ph)
394 {
395     d->piece_handler = ph;
396 }
397 
killStalePeers()398 void PeerManager::killStalePeers()
399 {
400     for (Peer::Ptr p : qAsConst(d->peer_map)) {
401         if (p->getDownloadRate() == 0 && p->getUploadRate() == 0)
402             p->kill();
403     }
404 }
405 
setSuperSeeding(bool on,const BitSet & chunks)406 void PeerManager::setSuperSeeding(bool on, const BitSet &chunks)
407 {
408     Q_UNUSED(chunks);
409     if ((d->superseeder && on) || (!d->superseeder && !on))
410         return;
411 
412     d->superseeder.reset(on ? new SuperSeeder(d->cnt.getNumChunks()) : nullptr);
413 
414     // When entering or exiting superseeding mode kill all peers
415     // but first add the current list to the potential_peers list, so we can reconnect later.
416     for (Peer::Ptr p : qAsConst(d->peer_map)) {
417         const net::Address &addr = p->getAddress();
418         addPotentialPeer(addr, false);
419         p->kill();
420     }
421 }
422 
sendHave(Uint32 index)423 void PeerManager::sendHave(Uint32 index)
424 {
425     if (d->superseeder)
426         return;
427 
428     for (Peer::Ptr peer : qAsConst(d->peer_map)) {
429         peer->sendHave(index);
430     }
431 }
432 
getNumConnectedPeers() const433 Uint32 PeerManager::getNumConnectedPeers() const
434 {
435     return d->peer_map.count();
436 }
437 
getNumConnectedLeechers() const438 Uint32 PeerManager::getNumConnectedLeechers() const
439 {
440     Uint32 cnt = 0;
441     for (const Peer::Ptr &peer : qAsConst(d->peer_map)) {
442         if (!peer->isSeeder())
443             cnt++;
444     }
445 
446     return cnt;
447 }
448 
getNumConnectedSeeders() const449 Uint32 PeerManager::getNumConnectedSeeders() const
450 {
451     Uint32 cnt = 0;
452     for (const Peer::Ptr &peer : qAsConst(d->peer_map)) {
453         if (peer->isSeeder())
454             cnt++;
455     }
456 
457     return cnt;
458 }
459 
getNumPending() const460 Uint32 PeerManager::getNumPending() const
461 {
462     return d->connectors.size();
463 }
464 
getAvailableChunksBitSet() const465 const bt::BitSet &PeerManager::getAvailableChunksBitSet() const
466 {
467     return d->available_chunks;
468 }
469 
getChunkCounter()470 ChunkCounter &PeerManager::getChunkCounter()
471 {
472     return d->cnt;
473 }
474 
isPexEnabled() const475 bool PeerManager::isPexEnabled() const
476 {
477     return d->pex_on;
478 }
479 
getTorrent() const480 const bt::Torrent &PeerManager::getTorrent() const
481 {
482     return d->tor;
483 }
484 
isStarted() const485 bool PeerManager::isStarted() const
486 {
487     return d->started;
488 }
489 
visit(PeerManager::PeerVisitor & visitor)490 void PeerManager::visit(PeerManager::PeerVisitor &visitor)
491 {
492     for (const Peer::Ptr &p : qAsConst(d->peer_map)) {
493         visitor.visit(p);
494     }
495 }
496 
uploadRate() const497 Uint32 PeerManager::uploadRate() const
498 {
499     Uint32 rate = 0;
500     for (const Peer::Ptr &p : qAsConst(d->peer_map)) {
501         rate += p->getUploadRate();
502     }
503     return rate;
504 }
505 
getPeers() const506 QList<Peer::Ptr> PeerManager::getPeers() const
507 {
508     return d->peer_map.values();
509 }
510 
setPartialSeed(bool partial_seed)511 void PeerManager::setPartialSeed(bool partial_seed)
512 {
513     if (d->partial_seed != partial_seed) {
514         d->partial_seed = partial_seed;
515 
516         // If partial seeding status changes, update all peers
517         bt::Uint16 port = ServerInterface::getPort();
518         for (Peer::Ptr peer : qAsConst(d->peer_map)) {
519             peer->sendExtProtHandshake(port, d->tor.getMetaData().size(), d->partial_seed);
520         }
521     }
522 }
523 
isPartialSeed() const524 bool PeerManager::isPartialSeed() const
525 {
526     return d->partial_seed;
527 }
528 
529 //////////////////////////////////////////////////
530 
Private(PeerManager * p,Torrent & tor)531 PeerManager::Private::Private(PeerManager *p, Torrent &tor)
532     : p(p)
533     , tor(tor)
534     , available_chunks(tor.getNumChunks())
535     , wanted_chunks(tor.getNumChunks())
536     , cnt(tor.getNumChunks())
537     , partial_seed(false)
538     , num_cleared(0)
539 {
540     started = false;
541     wanted_chunks.setAll(true);
542     wanted_changed = false;
543     pex_on = !tor.isPrivate();
544     piece_handler = nullptr;
545     paused = false;
546 }
547 
~Private()548 PeerManager::Private::~Private()
549 {
550     ServerInterface::removePeerManager(p);
551     started = false;
552     connectors.clear();
553 }
554 
update()555 void PeerManager::Private::update()
556 {
557     if (!started)
558         return;
559 
560     num_cleared = 0;
561 
562     // update the speed of each peer,
563     // and get rid of some killed peers
564     for (PeerMap::iterator i = peer_map.begin(); i != peer_map.end();) {
565         Peer::Ptr peer = i.value();
566         if (!peer->isKilled() && peer->isStalled()) {
567             p->addPotentialPeer(peer->getAddress(), peer->getStats().local);
568             peer->kill();
569         }
570 
571         if (peer->isKilled()) {
572             cnt.decBitSet(peer->getBitSet());
573             updateAvailableChunks();
574             i = peer_map.erase(i);
575             p->peerKilled(peer.data());
576             if (superseeder)
577                 superseeder->peerRemoved(peer.data());
578             num_cleared++;
579         } else {
580             peer->update();
581             if (wanted_changed) {
582                 if (peer->hasWantedChunks(wanted_chunks))
583                     peer->sendInterested();
584                 else
585                     peer->sendNotInterested();
586             }
587             ++i;
588         }
589     }
590 
591     wanted_changed = false;
592     connectToPeers();
593 }
594 
have(Peer * peer,Uint32 index)595 void PeerManager::Private::have(Peer *peer, Uint32 index)
596 {
597     if (wanted_chunks.get(index) && !paused)
598         peer->sendInterested();
599     available_chunks.set(index, true);
600     cnt.inc(index);
601     if (superseeder)
602         superseeder->have(peer, index);
603 }
604 
updateAvailableChunks()605 void PeerManager::Private::updateAvailableChunks()
606 {
607     for (Uint32 i = 0; i < available_chunks.getNumBits(); i++) {
608         available_chunks.set(i, cnt.get(i) > 0);
609     }
610 }
611 
killBadPeer()612 bool PeerManager::Private::killBadPeer()
613 {
614     for (Peer::Ptr peer : qAsConst(peer_map)) {
615         if (peer->getStats().aca_score <= -5.0 && peer->getStats().aca_score > -50.0) {
616             Out(SYS_GEN | LOG_DEBUG) << "Killing bad peer, to make room for other peers" << endl;
617             peer->kill();
618             return true;
619         }
620     }
621     return false;
622 }
623 
createPeer(mse::EncryptedPacketSocket::Ptr sock,const bt::PeerID & peer_id,Uint32 support,bool local,bt::ConnectionLimit::Token::Ptr token)624 void PeerManager::Private::createPeer(mse::EncryptedPacketSocket::Ptr sock,
625                                       const bt::PeerID &peer_id,
626                                       Uint32 support,
627                                       bool local,
628                                       bt::ConnectionLimit::Token::Ptr token)
629 {
630     Peer::Ptr peer(new Peer(sock, peer_id, tor.getNumChunks(), tor.getChunkSize(), support, local, token, p));
631     peer_map.insert(peer->getID(), peer);
632     p->newPeer(peer.data());
633     peer->setPexEnabled(pex_on);
634     // send extension protocol handshake
635     bt::Uint16 port = ServerInterface::getPort();
636     peer->sendExtProtHandshake(port, tor.getMetaData().size(), partial_seed);
637 
638     if (superseeder)
639         superseeder->peerAdded(peer.data());
640 }
641 
connectedTo(const net::Address & addr) const642 bool PeerManager::Private::connectedTo(const net::Address &addr) const
643 {
644     PeerMap::const_iterator i = peer_map.begin();
645     while (i != peer_map.end()) {
646         if (i.value()->getAddress() == addr)
647             return true;
648         ++i;
649     }
650     return false;
651 }
652 
connectToPeers()653 void PeerManager::Private::connectToPeers()
654 {
655     if (paused || potential_peers.size() == 0 || (Uint32)connectors.size() > MAX_SIMULTANIOUS_AUTHS)
656         return;
657 
658     while (!potential_peers.empty()) {
659         if ((Uint32)connectors.size() > MAX_SIMULTANIOUS_AUTHS)
660             return;
661 
662         PPItr itr = potential_peers.begin();
663 
664         AccessManager &aman = AccessManager::instance();
665 
666         if (aman.allowed(itr->first) && !connectedTo(itr->first)) {
667             ConnectionLimit::Token::Ptr token = climit.acquire(tor.getInfoHash());
668             if (!token)
669                 break;
670 
671             PeerConnector::Ptr pcon(new PeerConnector(itr->first, itr->second, p, token));
672             pcon->setWeakPointer(PeerConnector::WPtr(pcon));
673             connectors.insert(pcon);
674             pcon->start();
675         }
676 
677         potential_peers.erase(itr);
678     }
679 }
680 
681 }
682