1 /*
2  * This file Copyright (C) 2007-2014 Mnemosyne LLC
3  *
4  * It may be used under the GNU GPL versions 2 or 3
5  * or any future license endorsed by Mnemosyne LLC.
6  *
7  */
8 
9 #include <errno.h> /* error codes ERANGE, ... */
10 #include <limits.h> /* INT_MAX */
11 #include <string.h> /* memcpy, memcmp, strstr */
12 #include <stdlib.h> /* qsort */
13 
14 #include <event2/event.h>
15 
16 #include <libutp/utp.h>
17 
18 #include "transmission.h"
19 #include "announcer.h"
20 #include "bandwidth.h"
21 #include "blocklist.h"
22 #include "cache.h"
23 #include "clients.h"
24 #include "completion.h"
25 #include "crypto-utils.h"
26 #include "handshake.h"
27 #include "log.h"
28 #include "net.h"
29 #include "peer-io.h"
30 #include "peer-mgr.h"
31 #include "peer-msgs.h"
32 #include "ptrarray.h"
33 #include "session.h"
34 #include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
35 #include "torrent.h"
36 #include "tr-assert.h"
37 #include "tr-utp.h"
38 #include "utils.h"
39 #include "webseed.h"
40 
41 enum
42 {
43     /* how frequently to cull old atoms */
44     ATOM_PERIOD_MSEC = (60 * 1000),
45     /* how frequently to change which peers are choked */
46     RECHOKE_PERIOD_MSEC = (10 * 1000),
47     /* an optimistically unchoked peer is immune from rechoking
48        for this many calls to rechokeUploads(). */
49     OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
50     /* how frequently to reallocate bandwidth */
51     BANDWIDTH_PERIOD_MSEC = 500,
52     /* how frequently to age out old piece request lists */
53     REFILL_UPKEEP_PERIOD_MSEC = (10 * 1000),
54     /* how frequently to decide which peers live and die */
55     RECONNECT_PERIOD_MSEC = 500,
56     /* when many peers are available, keep idle ones this long */
57     MIN_UPLOAD_IDLE_SECS = (60),
58     /* when few peers are available, keep idle ones this long */
59     MAX_UPLOAD_IDLE_SECS = (60 * 5),
60     /* max number of peers to ask for per second overall.
61      * this throttle is to avoid overloading the router */
62     MAX_CONNECTIONS_PER_SECOND = 12,
63     /* */
64     MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC / 1000.0)),
65     /* number of bad pieces a peer is allowed to send before we ban them */
66     MAX_BAD_PIECES_PER_PEER = 5,
67     /* amount of time to keep a list of request pieces lying around
68        before it's considered too old and needs to be rebuilt */
69     PIECE_LIST_SHELF_LIFE_SECS = 60,
70     /* use for bitwise operations w/peer_atom.flags2 */
71     MYFLAG_BANNED = 1,
72     /* use for bitwise operations w/peer_atom.flags2 */
73     /* unreachable for now... but not banned.
74      * if they try to connect to us it's okay */
75     MYFLAG_UNREACHABLE = 2,
76     /* the minimum we'll wait before attempting to reconnect to a peer */
77     MINIMUM_RECONNECT_INTERVAL_SECS = 5,
78     /** how long we'll let requests we've made linger before we cancel them */
79     REQUEST_TTL_SECS = 90,
80     /* */
81     NO_BLOCKS_CANCEL_HISTORY = 120,
82     /* */
83     CANCEL_HISTORY_SEC = 60
84 };
85 
86 tr_peer_event const TR_PEER_EVENT_INIT =
87 {
88     .eventType = TR_PEER_CLIENT_GOT_BLOCK,
89     .pieceIndex = 0,
90     .bitfield = NULL,
91     .offset = 0,
92     .length = 0,
93     .err = 0,
94     .port = 0
95 };
96 
97 tr_swarm_stats const TR_SWARM_STATS_INIT =
98 {
99     .activePeerCount = { 0, 0 },
100     .activeWebseedCount = 0,
101     .peerCount = 0,
102     .peerFromCount = { 0, 0, 0, 0, 0, 0, 0 }
103 };
104 
105 /**
106 ***
107 **/
108 
109 /**
110  * Peer information that should be kept even before we've connected and
111  * after we've disconnected. These are kept in a pool of peer_atoms to decide
112  * which ones would make good candidates for connecting to, and to watch out
113  * for banned peers.
114  *
115  * @see tr_peer
116  * @see tr_peerMsgs
117  */
118 struct peer_atom
119 {
120     uint8_t fromFirst; /* where the peer was first found */
121     uint8_t fromBest; /* the "best" value of where the peer has been found */
122     uint8_t flags; /* these match the added_f flags */
123     uint8_t flags2; /* flags that aren't defined in added_f */
124     int8_t seedProbability; /* how likely is this to be a seed... [0..100] or -1 for unknown */
125     int8_t blocklisted; /* -1 for unknown, true for blocklisted, false for not blocklisted */
126 
127     tr_port port;
128     bool utp_failed; /* We recently failed to connect over uTP */
129     uint16_t numFails;
130     time_t time; /* when the peer's connection status last changed */
131     time_t piece_data_time;
132 
133     time_t lastConnectionAttemptAt;
134     time_t lastConnectionAt;
135 
136     /* similar to a TTL field, but less rigid --
137      * if the swarm is small, the atom will be kept past this date. */
138     time_t shelf_date;
139     tr_peer* peer; /* will be NULL if not connected */
140     tr_address addr;
141 };
142 
143 #ifndef TR_ENABLE_ASSERTS
144 
145 #define tr_isAtom(a) (true)
146 
147 #else
148 
tr_isAtom(struct peer_atom const * atom)149 static bool tr_isAtom(struct peer_atom const* atom)
150 {
151     return atom != NULL && atom->fromFirst < TR_PEER_FROM__MAX && atom->fromBest < TR_PEER_FROM__MAX &&
152         tr_address_is_valid(&atom->addr);
153 }
154 
155 #endif
156 
tr_atomAddrStr(struct peer_atom const * atom)157 static char const* tr_atomAddrStr(struct peer_atom const* atom)
158 {
159     return atom != NULL ? tr_peerIoAddrStr(&atom->addr, atom->port) : "[no atom]";
160 }
161 
162 struct block_request
163 {
164     tr_block_index_t block;
165     tr_peer* peer;
166     time_t sentAt;
167 };
168 
169 struct weighted_piece
170 {
171     tr_piece_index_t index;
172     int16_t salt;
173     int16_t requestCount;
174 };
175 
176 enum piece_sort_state
177 {
178     PIECES_UNSORTED,
179     PIECES_SORTED_BY_INDEX,
180     PIECES_SORTED_BY_WEIGHT
181 };
182 
183 /** @brief Opaque, per-torrent data structure for peer connection information */
184 typedef struct tr_swarm
185 {
186     tr_swarm_stats stats;
187 
188     tr_ptrArray outgoingHandshakes; /* tr_handshake */
189     tr_ptrArray pool; /* struct peer_atom */
190     tr_ptrArray peers; /* tr_peerMsgs */
191     tr_ptrArray webseeds; /* tr_webseed */
192 
193     tr_torrent* tor;
194     struct tr_peerMgr* manager;
195 
196     tr_peerMsgs* optimistic; /* the optimistic peer, or NULL if none */
197     int optimisticUnchokeTimeScaler;
198 
199     bool isRunning;
200     bool needsCompletenessCheck;
201 
202     struct block_request* requests;
203     int requestCount;
204     int requestAlloc;
205 
206     struct weighted_piece* pieces;
207     int pieceCount;
208     enum piece_sort_state pieceSortState;
209 
210     /* An array of pieceCount items stating how many peers have each piece.
211        This is used to help us for downloading pieces "rarest first."
212        This may be NULL if we don't have metainfo yet, or if we're not
213        downloading and don't care about rarity */
214     uint16_t* pieceReplication;
215     size_t pieceReplicationSize;
216 
217     int interestedCount;
218     int maxPeers;
219     time_t lastCancel;
220 
221     /* Before the endgame this should be 0. In endgame, is contains the average
222      * number of pending requests per peer. Only peers which have more pending
223      * requests are considered 'fast' are allowed to request a block that's
224      * already been requested from another (slower?) peer. */
225     int endgame;
226 }
227 tr_swarm;
228 
229 struct tr_peerMgr
230 {
231     tr_session* session;
232     tr_ptrArray incomingHandshakes; /* tr_handshake */
233     struct event* bandwidthTimer;
234     struct event* rechokeTimer;
235     struct event* refillUpkeepTimer;
236     struct event* atomTimer;
237 };
238 
239 #define tordbg(t, ...) tr_logAddDeepNamed(tr_torrentName((t)->tor), __VA_ARGS__)
240 
241 #define dbgmsg(...) tr_logAddDeepNamed(NULL, __VA_ARGS__)
242 
243 /**
244 *** tr_peer virtual functions
245 **/
246 
tr_peerIsTransferringPieces(tr_peer const * peer,uint64_t now,tr_direction direction,unsigned int * Bps)247 static bool tr_peerIsTransferringPieces(tr_peer const* peer, uint64_t now, tr_direction direction, unsigned int* Bps)
248 {
249     TR_ASSERT(peer != NULL);
250     TR_ASSERT(peer->funcs != NULL);
251 
252     return (*peer->funcs->is_transferring_pieces)(peer, now, direction, Bps);
253 }
254 
tr_peerGetPieceSpeed_Bps(tr_peer const * peer,uint64_t now,tr_direction direction)255 unsigned int tr_peerGetPieceSpeed_Bps(tr_peer const* peer, uint64_t now, tr_direction direction)
256 {
257     unsigned int Bps = 0;
258     tr_peerIsTransferringPieces(peer, now, direction, &Bps);
259     return Bps;
260 }
261 
tr_peerFree(tr_peer * peer)262 static void tr_peerFree(tr_peer* peer)
263 {
264     TR_ASSERT(peer != NULL);
265     TR_ASSERT(peer->funcs != NULL);
266 
267     (*peer->funcs->destruct)(peer);
268 
269     tr_free(peer);
270 }
271 
tr_peerConstruct(tr_peer * peer,tr_torrent const * tor)272 void tr_peerConstruct(tr_peer* peer, tr_torrent const* tor)
273 {
274     TR_ASSERT(peer != NULL);
275     TR_ASSERT(tr_isTorrent(tor));
276 
277     memset(peer, 0, sizeof(tr_peer));
278 
279     peer->client = TR_KEY_NONE;
280     peer->swarm = tor->swarm;
281     tr_bitfieldConstruct(&peer->have, tor->info.pieceCount);
282     tr_bitfieldConstruct(&peer->blame, tor->blockCount);
283 }
284 
285 static void peerDeclinedAllRequests(tr_swarm*, tr_peer const*);
286 
tr_peerDestruct(tr_peer * peer)287 void tr_peerDestruct(tr_peer* peer)
288 {
289     TR_ASSERT(peer != NULL);
290 
291     if (peer->swarm != NULL)
292     {
293         peerDeclinedAllRequests(peer->swarm, peer);
294     }
295 
296     tr_bitfieldDestruct(&peer->have);
297     tr_bitfieldDestruct(&peer->blame);
298 
299     if (peer->atom != NULL)
300     {
301         peer->atom->peer = NULL;
302     }
303 }
304 
305 /**
306 ***
307 **/
308 
managerLock(struct tr_peerMgr const * manager)309 static inline void managerLock(struct tr_peerMgr const* manager)
310 {
311     tr_sessionLock(manager->session);
312 }
313 
managerUnlock(struct tr_peerMgr const * manager)314 static inline void managerUnlock(struct tr_peerMgr const* manager)
315 {
316     tr_sessionUnlock(manager->session);
317 }
318 
swarmLock(tr_swarm * swarm)319 static inline void swarmLock(tr_swarm* swarm)
320 {
321     managerLock(swarm->manager);
322 }
323 
swarmUnlock(tr_swarm * swarm)324 static inline void swarmUnlock(tr_swarm* swarm)
325 {
326     managerUnlock(swarm->manager);
327 }
328 
329 #ifdef TR_ENABLE_ASSERTS
330 
swarmIsLocked(tr_swarm const * swarm)331 static inline bool swarmIsLocked(tr_swarm const* swarm)
332 {
333     return tr_sessionIsLocked(swarm->manager->session);
334 }
335 
336 #endif /* TR_ENABLE_ASSERTS */
337 
338 /**
339 ***
340 **/
341 
handshakeCompareToAddr(void const * va,void const * vb)342 static int handshakeCompareToAddr(void const* va, void const* vb)
343 {
344     tr_handshake const* a = va;
345 
346     return tr_address_compare(tr_handshakeGetAddr(a, NULL), vb);
347 }
348 
handshakeCompare(void const * a,void const * b)349 static int handshakeCompare(void const* a, void const* b)
350 {
351     return handshakeCompareToAddr(a, tr_handshakeGetAddr(b, NULL));
352 }
353 
getExistingHandshake(tr_ptrArray * handshakes,tr_address const * addr)354 static inline tr_handshake* getExistingHandshake(tr_ptrArray* handshakes, tr_address const* addr)
355 {
356     if (tr_ptrArrayEmpty(handshakes))
357     {
358         return NULL;
359     }
360 
361     return tr_ptrArrayFindSorted(handshakes, addr, handshakeCompareToAddr);
362 }
363 
comparePeerAtomToAddress(void const * va,void const * vb)364 static int comparePeerAtomToAddress(void const* va, void const* vb)
365 {
366     struct peer_atom const* a = va;
367 
368     return tr_address_compare(&a->addr, vb);
369 }
370 
compareAtomsByAddress(void const * va,void const * vb)371 static int compareAtomsByAddress(void const* va, void const* vb)
372 {
373     struct peer_atom const* b = vb;
374 
375     TR_ASSERT(tr_isAtom(b));
376 
377     return comparePeerAtomToAddress(va, &b->addr);
378 }
379 
380 /**
381 ***
382 **/
383 
tr_peerAddress(tr_peer const * peer)384 tr_address const* tr_peerAddress(tr_peer const* peer)
385 {
386     return &peer->atom->addr;
387 }
388 
getExistingSwarm(tr_peerMgr * manager,uint8_t const * hash)389 static tr_swarm* getExistingSwarm(tr_peerMgr* manager, uint8_t const* hash)
390 {
391     tr_torrent* tor = tr_torrentFindFromHash(manager->session, hash);
392 
393     return tor == NULL ? NULL : tor->swarm;
394 }
395 
peerCompare(void const * a,void const * b)396 static int peerCompare(void const* a, void const* b)
397 {
398     return tr_address_compare(tr_peerAddress(a), tr_peerAddress(b));
399 }
400 
getExistingAtom(tr_swarm const * cswarm,tr_address const * addr)401 static struct peer_atom* getExistingAtom(tr_swarm const* cswarm, tr_address const* addr)
402 {
403     tr_swarm* swarm = (tr_swarm*)cswarm;
404     return tr_ptrArrayFindSorted(&swarm->pool, addr, comparePeerAtomToAddress);
405 }
406 
peerIsInUse(tr_swarm const * cs,struct peer_atom const * atom)407 static bool peerIsInUse(tr_swarm const* cs, struct peer_atom const* atom)
408 {
409     tr_swarm* s = (tr_swarm*)cs;
410 
411     TR_ASSERT(swarmIsLocked(s));
412 
413     return atom->peer != NULL || getExistingHandshake(&s->outgoingHandshakes, &atom->addr) != NULL ||
414         getExistingHandshake(&s->manager->incomingHandshakes, &atom->addr) != NULL;
415 }
416 
replicationExists(tr_swarm const * s)417 static inline bool replicationExists(tr_swarm const* s)
418 {
419     return s->pieceReplication != NULL;
420 }
421 
replicationFree(tr_swarm * s)422 static void replicationFree(tr_swarm* s)
423 {
424     tr_free(s->pieceReplication);
425     s->pieceReplication = NULL;
426     s->pieceReplicationSize = 0;
427 }
428 
replicationNew(tr_swarm * s)429 static void replicationNew(tr_swarm* s)
430 {
431     TR_ASSERT(!replicationExists(s));
432 
433     tr_piece_index_t const piece_count = s->tor->info.pieceCount;
434     int const n = tr_ptrArraySize(&s->peers);
435 
436     s->pieceReplicationSize = piece_count;
437     s->pieceReplication = tr_new0(uint16_t, piece_count);
438 
439     for (tr_piece_index_t piece_i = 0; piece_i < piece_count; ++piece_i)
440     {
441         uint16_t r = 0;
442 
443         for (int peer_i = 0; peer_i < n; ++peer_i)
444         {
445             tr_peer* peer = tr_ptrArrayNth(&s->peers, peer_i);
446 
447             if (tr_bitfieldHas(&peer->have, piece_i))
448             {
449                 ++r;
450             }
451         }
452 
453         s->pieceReplication[piece_i] = r;
454     }
455 }
456 
swarmFree(void * vs)457 static void swarmFree(void* vs)
458 {
459     tr_swarm* s = vs;
460 
461     TR_ASSERT(s != NULL);
462     TR_ASSERT(!s->isRunning);
463     TR_ASSERT(swarmIsLocked(s));
464     TR_ASSERT(tr_ptrArrayEmpty(&s->outgoingHandshakes));
465     TR_ASSERT(tr_ptrArrayEmpty(&s->peers));
466 
467     tr_ptrArrayDestruct(&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
468     tr_ptrArrayDestruct(&s->pool, (PtrArrayForeachFunc)tr_free);
469     tr_ptrArrayDestruct(&s->outgoingHandshakes, NULL);
470     tr_ptrArrayDestruct(&s->peers, NULL);
471     s->stats = TR_SWARM_STATS_INIT;
472 
473     replicationFree(s);
474 
475     tr_free(s->requests);
476     tr_free(s->pieces);
477     tr_free(s);
478 }
479 
480 static void peerCallbackFunc(tr_peer*, tr_peer_event const*, void*);
481 
rebuildWebseedArray(tr_swarm * s,tr_torrent * tor)482 static void rebuildWebseedArray(tr_swarm* s, tr_torrent* tor)
483 {
484     tr_info const* inf = &tor->info;
485 
486     /* clear the array */
487     tr_ptrArrayDestruct(&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
488     s->webseeds = TR_PTR_ARRAY_INIT;
489     s->stats.activeWebseedCount = 0;
490 
491     /* repopulate it */
492     for (unsigned int i = 0; i < inf->webseedCount; ++i)
493     {
494         tr_webseed* w = tr_webseedNew(tor, inf->webseeds[i], peerCallbackFunc, s);
495         tr_ptrArrayAppend(&s->webseeds, w);
496     }
497 }
498 
swarmNew(tr_peerMgr * manager,tr_torrent * tor)499 static tr_swarm* swarmNew(tr_peerMgr* manager, tr_torrent* tor)
500 {
501     tr_swarm* s;
502 
503     s = tr_new0(tr_swarm, 1);
504     s->manager = manager;
505     s->tor = tor;
506     s->pool = TR_PTR_ARRAY_INIT;
507     s->peers = TR_PTR_ARRAY_INIT;
508     s->webseeds = TR_PTR_ARRAY_INIT;
509     s->outgoingHandshakes = TR_PTR_ARRAY_INIT;
510 
511     rebuildWebseedArray(s, tor);
512 
513     return s;
514 }
515 
516 static void ensureMgrTimersExist(struct tr_peerMgr* m);
517 
tr_peerMgrNew(tr_session * session)518 tr_peerMgr* tr_peerMgrNew(tr_session* session)
519 {
520     tr_peerMgr* m = tr_new0(tr_peerMgr, 1);
521     m->session = session;
522     m->incomingHandshakes = TR_PTR_ARRAY_INIT;
523     ensureMgrTimersExist(m);
524     return m;
525 }
526 
deleteTimer(struct event ** t)527 static void deleteTimer(struct event** t)
528 {
529     if (*t != NULL)
530     {
531         event_free(*t);
532         *t = NULL;
533     }
534 }
535 
deleteTimers(struct tr_peerMgr * m)536 static void deleteTimers(struct tr_peerMgr* m)
537 {
538     deleteTimer(&m->atomTimer);
539     deleteTimer(&m->bandwidthTimer);
540     deleteTimer(&m->rechokeTimer);
541     deleteTimer(&m->refillUpkeepTimer);
542 }
543 
tr_peerMgrFree(tr_peerMgr * manager)544 void tr_peerMgrFree(tr_peerMgr* manager)
545 {
546     managerLock(manager);
547 
548     deleteTimers(manager);
549 
550     /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
551      * the item from manager->handshakes, so this is a little roundabout... */
552     while (!tr_ptrArrayEmpty(&manager->incomingHandshakes))
553     {
554         tr_handshakeAbort(tr_ptrArrayNth(&manager->incomingHandshakes, 0));
555     }
556 
557     tr_ptrArrayDestruct(&manager->incomingHandshakes, NULL);
558 
559     managerUnlock(manager);
560     tr_free(manager);
561 }
562 
563 /***
564 ****
565 ***/
566 
tr_peerMgrOnBlocklistChanged(tr_peerMgr * mgr)567 void tr_peerMgrOnBlocklistChanged(tr_peerMgr* mgr)
568 {
569     tr_torrent* tor = NULL;
570     tr_session* session = mgr->session;
571 
572     /* we cache whether or not a peer is blocklisted...
573        since the blocklist has changed, erase that cached value */
574     while ((tor = tr_torrentNext(session, tor)) != NULL)
575     {
576         tr_swarm* s = tor->swarm;
577 
578         for (int i = 0, n = tr_ptrArraySize(&s->pool); i < n; ++i)
579         {
580             struct peer_atom* atom = tr_ptrArrayNth(&s->pool, i);
581             atom->blocklisted = -1;
582         }
583     }
584 }
585 
isAtomBlocklisted(tr_session * session,struct peer_atom * atom)586 static bool isAtomBlocklisted(tr_session* session, struct peer_atom* atom)
587 {
588     if (atom->blocklisted < 0)
589     {
590         atom->blocklisted = (int8_t)tr_sessionIsAddressBlocked(session, &atom->addr);
591     }
592 
593     return atom->blocklisted != 0;
594 }
595 
596 /***
597 ****
598 ***/
599 
atomSetSeedProbability(struct peer_atom * atom,int seedProbability)600 static void atomSetSeedProbability(struct peer_atom* atom, int seedProbability)
601 {
602     TR_ASSERT(atom != NULL);
603     TR_ASSERT(seedProbability >= -1);
604     TR_ASSERT(seedProbability <= 100);
605 
606     atom->seedProbability = seedProbability;
607 
608     if (seedProbability == 100)
609     {
610         atom->flags |= ADDED_F_SEED_FLAG;
611     }
612     else if (seedProbability != -1)
613     {
614         atom->flags &= ~ADDED_F_SEED_FLAG;
615     }
616 }
617 
atomIsSeed(struct peer_atom const * atom)618 static inline bool atomIsSeed(struct peer_atom const* atom)
619 {
620     return atom->seedProbability == 100;
621 }
622 
atomSetSeed(tr_swarm const * s,struct peer_atom * atom)623 static void atomSetSeed(tr_swarm const* s, struct peer_atom* atom)
624 {
625     if (!atomIsSeed(atom))
626     {
627         tordbg(s, "marking peer %s as a seed", tr_atomAddrStr(atom));
628 
629         atomSetSeedProbability(atom, 100);
630     }
631 }
632 
tr_peerMgrPeerIsSeed(tr_torrent const * tor,tr_address const * addr)633 bool tr_peerMgrPeerIsSeed(tr_torrent const* tor, tr_address const* addr)
634 {
635     bool isSeed = false;
636     tr_swarm const* s = tor->swarm;
637     struct peer_atom const* atom = getExistingAtom(s, addr);
638 
639     if (atom != NULL)
640     {
641         isSeed = atomIsSeed(atom);
642     }
643 
644     return isSeed;
645 }
646 
tr_peerMgrSetUtpSupported(tr_torrent * tor,tr_address const * addr)647 void tr_peerMgrSetUtpSupported(tr_torrent* tor, tr_address const* addr)
648 {
649     struct peer_atom* atom = getExistingAtom(tor->swarm, addr);
650 
651     if (atom != NULL)
652     {
653         atom->flags |= ADDED_F_UTP_FLAGS;
654     }
655 }
656 
tr_peerMgrSetUtpFailed(tr_torrent * tor,tr_address const * addr,bool failed)657 void tr_peerMgrSetUtpFailed(tr_torrent* tor, tr_address const* addr, bool failed)
658 {
659     struct peer_atom* atom = getExistingAtom(tor->swarm, addr);
660 
661     if (atom != NULL)
662     {
663         atom->utp_failed = failed;
664     }
665 }
666 
667 /**
668 ***  REQUESTS
669 ***
670 *** There are two data structures associated with managing block requests:
671 ***
672 *** 1. tr_swarm::requests, an array of "struct block_request" which keeps
673 ***    track of which blocks have been requested, and when, and by which peers.
674 ***    This is list is used for (a) cancelling requests that have been pending
675 ***    for too long and (b) avoiding duplicate requests before endgame.
676 ***
677 *** 2. tr_swarm::pieces, an array of "struct weighted_piece" which lists the
678 ***    pieces that we want to request. It's used to decide which blocks to
679 ***    return next when tr_peerMgrGetBlockRequests() is called.
680 **/
681 
682 /**
683 *** struct block_request
684 **/
685 
compareReqByBlock(void const * va,void const * vb)686 static int compareReqByBlock(void const* va, void const* vb)
687 {
688     struct block_request const* a = va;
689     struct block_request const* b = vb;
690 
691     /* primary key: block */
692     if (a->block < b->block)
693     {
694         return -1;
695     }
696 
697     if (a->block > b->block)
698     {
699         return 1;
700     }
701 
702     /* secondary key: peer */
703     if (a->peer < b->peer)
704     {
705         return -1;
706     }
707 
708     if (a->peer > b->peer)
709     {
710         return 1;
711     }
712 
713     return 0;
714 }
715 
requestListAdd(tr_swarm * s,tr_block_index_t block,tr_peer * peer)716 static void requestListAdd(tr_swarm* s, tr_block_index_t block, tr_peer* peer)
717 {
718     struct block_request key;
719 
720     /* ensure enough room is available... */
721     if (s->requestCount + 1 >= s->requestAlloc)
722     {
723         int const CHUNK_SIZE = 128;
724         s->requestAlloc += CHUNK_SIZE;
725         s->requests = tr_renew(struct block_request, s->requests, s->requestAlloc);
726     }
727 
728     /* populate the record we're inserting */
729     key.block = block;
730     key.peer = peer;
731     key.sentAt = tr_time();
732 
733     /* insert the request to our array... */
734     {
735         bool exact;
736         int const pos = tr_lowerBound(&key, s->requests, s->requestCount, sizeof(struct block_request), compareReqByBlock,
737             &exact);
738         TR_ASSERT(!exact);
739         memmove(s->requests + pos + 1, s->requests + pos, sizeof(struct block_request) * (s->requestCount++ - pos));
740         s->requests[pos] = key;
741     }
742 
743     if (peer != NULL)
744     {
745         ++peer->pendingReqsToPeer;
746         TR_ASSERT(peer->pendingReqsToPeer >= 0);
747     }
748 
749     // fprintf(stderr, "added request of block %lu from peer %s... there are now %d block\n", (unsigned long)block,
750     //     tr_atomAddrStr(peer->atom), s->requestCount);
751 }
752 
requestListLookup(tr_swarm * s,tr_block_index_t block,tr_peer const * peer)753 static struct block_request* requestListLookup(tr_swarm* s, tr_block_index_t block, tr_peer const* peer)
754 {
755     struct block_request key;
756     key.block = block;
757     key.peer = (tr_peer*)peer;
758 
759     return bsearch(&key, s->requests, s->requestCount, sizeof(struct block_request), compareReqByBlock);
760 }
761 
762 /**
763  * Find the peers are we currently requesting the block
764  * with index @a block from and append them to @a peerArr.
765  */
getBlockRequestPeers(tr_swarm * s,tr_block_index_t block,tr_ptrArray * peerArr)766 static void getBlockRequestPeers(tr_swarm* s, tr_block_index_t block, tr_ptrArray* peerArr)
767 {
768     bool exact;
769     int pos;
770     struct block_request key;
771 
772     key.block = block;
773     key.peer = NULL;
774     pos = tr_lowerBound(&key, s->requests, s->requestCount, sizeof(struct block_request), compareReqByBlock, &exact);
775 
776     TR_ASSERT(!exact); /* shouldn't have a request with .peer == NULL */
777 
778     for (int i = pos; i < s->requestCount; ++i)
779     {
780         if (s->requests[i].block != block)
781         {
782             break;
783         }
784 
785         tr_ptrArrayAppend(peerArr, s->requests[i].peer);
786     }
787 }
788 
decrementPendingReqCount(struct block_request const * b)789 static void decrementPendingReqCount(struct block_request const* b)
790 {
791     if (b->peer != NULL)
792     {
793         if (b->peer->pendingReqsToPeer > 0)
794         {
795             --b->peer->pendingReqsToPeer;
796         }
797     }
798 }
799 
requestListRemove(tr_swarm * s,tr_block_index_t block,tr_peer const * peer)800 static void requestListRemove(tr_swarm* s, tr_block_index_t block, tr_peer const* peer)
801 {
802     struct block_request const* b = requestListLookup(s, block, peer);
803 
804     if (b != NULL)
805     {
806         int const pos = b - s->requests;
807         TR_ASSERT(pos < s->requestCount);
808 
809         decrementPendingReqCount(b);
810 
811         tr_removeElementFromArray(s->requests, pos, sizeof(struct block_request), s->requestCount);
812         --s->requestCount;
813 
814         // fprintf(stderr, "removing request of block %lu from peer %s... there are now %d block requests left\n", (unsigned long)block,
815         //     tr_atomAddrStr(peer->atom), t->requestCount);
816     }
817 }
818 
countActiveWebseeds(tr_swarm * s)819 static int countActiveWebseeds(tr_swarm* s)
820 {
821     int activeCount = 0;
822 
823     if (s->tor->isRunning && !tr_torrentIsSeed(s->tor))
824     {
825         uint64_t const now = tr_time_msec();
826 
827         for (int i = 0, n = tr_ptrArraySize(&s->webseeds); i < n; ++i)
828         {
829             if (tr_peerIsTransferringPieces(tr_ptrArrayNth(&s->webseeds, i), now, TR_DOWN, NULL))
830             {
831                 ++activeCount;
832             }
833         }
834     }
835 
836     return activeCount;
837 }
838 
testForEndgame(tr_swarm const * s)839 static bool testForEndgame(tr_swarm const* s)
840 {
841     /* we consider ourselves to be in endgame if the number of bytes
842        we've got requested is >= the number of bytes left to download */
843     return (uint64_t)s->requestCount * s->tor->blockSize >= tr_torrentGetLeftUntilDone(s->tor);
844 }
845 
updateEndgame(tr_swarm * s)846 static void updateEndgame(tr_swarm* s)
847 {
848     TR_ASSERT(s->requestCount >= 0);
849 
850     if (!testForEndgame(s))
851     {
852         /* not in endgame */
853         s->endgame = 0;
854     }
855     else if (s->endgame == 0) /* only recalculate when endgame first begins */
856     {
857         int numDownloading = 0;
858 
859         /* add the active bittorrent peers... */
860         for (int i = 0, n = tr_ptrArraySize(&s->peers); i < n; ++i)
861         {
862             tr_peer const* p = tr_ptrArrayNth(&s->peers, i);
863 
864             if (p->pendingReqsToPeer > 0)
865             {
866                 ++numDownloading;
867             }
868         }
869 
870         /* add the active webseeds... */
871         numDownloading += countActiveWebseeds(s);
872 
873         /* average number of pending requests per downloading peer */
874         s->endgame = s->requestCount / MAX(numDownloading, 1);
875     }
876 }
877 
878 /****
879 *****
880 *****  Piece List Manipulation / Accessors
881 *****
882 ****/
883 
invalidatePieceSorting(tr_swarm * s)884 static inline void invalidatePieceSorting(tr_swarm* s)
885 {
886     s->pieceSortState = PIECES_UNSORTED;
887 }
888 
889 static tr_torrent const* weightTorrent;
890 
891 static uint16_t const* weightReplication;
892 
setComparePieceByWeightTorrent(tr_swarm * s)893 static void setComparePieceByWeightTorrent(tr_swarm* s)
894 {
895     if (!replicationExists(s))
896     {
897         replicationNew(s);
898     }
899 
900     weightTorrent = s->tor;
901     weightReplication = s->pieceReplication;
902 }
903 
904 /* we try to create a "weight" s.t. high-priority pieces come before others,
905  * and that partially-complete pieces come before empty ones. */
comparePieceByWeight(void const * va,void const * vb)906 static int comparePieceByWeight(void const* va, void const* vb)
907 {
908     struct weighted_piece const* a = va;
909     struct weighted_piece const* b = vb;
910     int ia;
911     int ib;
912     int missing;
913     int pending;
914     tr_torrent const* tor = weightTorrent;
915     uint16_t const* rep = weightReplication;
916 
917     /* primary key: weight */
918     missing = tr_torrentMissingBlocksInPiece(tor, a->index);
919     pending = a->requestCount;
920     ia = missing > pending ? missing - pending : tor->blockCountInPiece + pending;
921     missing = tr_torrentMissingBlocksInPiece(tor, b->index);
922     pending = b->requestCount;
923     ib = missing > pending ? missing - pending : tor->blockCountInPiece + pending;
924 
925     if (ia < ib)
926     {
927         return -1;
928     }
929 
930     if (ia > ib)
931     {
932         return 1;
933     }
934 
935     /* secondary key: higher priorities go first */
936     ia = tor->info.pieces[a->index].priority;
937     ib = tor->info.pieces[b->index].priority;
938 
939     if (ia > ib)
940     {
941         return -1;
942     }
943 
944     if (ia < ib)
945     {
946         return 1;
947     }
948 
949     /* tertiary key: rarest first. */
950     ia = rep[a->index];
951     ib = rep[b->index];
952 
953     if (ia < ib)
954     {
955         return -1;
956     }
957 
958     if (ia > ib)
959     {
960         return 1;
961     }
962 
963     /* quaternary key: random */
964     if (a->salt < b->salt)
965     {
966         return -1;
967     }
968 
969     if (a->salt > b->salt)
970     {
971         return 1;
972     }
973 
974     /* okay, they're equal */
975     return 0;
976 }
977 
comparePieceByIndex(void const * va,void const * vb)978 static int comparePieceByIndex(void const* va, void const* vb)
979 {
980     struct weighted_piece const* a = va;
981     struct weighted_piece const* b = vb;
982 
983     if (a->index < b->index)
984     {
985         return -1;
986     }
987 
988     if (a->index > b->index)
989     {
990         return 1;
991     }
992 
993     return 0;
994 }
995 
pieceListSort(tr_swarm * s,enum piece_sort_state state)996 static void pieceListSort(tr_swarm* s, enum piece_sort_state state)
997 {
998     TR_ASSERT(state == PIECES_SORTED_BY_INDEX || state == PIECES_SORTED_BY_WEIGHT);
999 
1000     if (state == PIECES_SORTED_BY_WEIGHT)
1001     {
1002         setComparePieceByWeightTorrent(s);
1003         qsort(s->pieces, s->pieceCount, sizeof(struct weighted_piece), comparePieceByWeight);
1004     }
1005     else
1006     {
1007         qsort(s->pieces, s->pieceCount, sizeof(struct weighted_piece), comparePieceByIndex);
1008     }
1009 
1010     s->pieceSortState = state;
1011 }
1012 
1013 /**
1014  * These functions are useful for testing, but too expensive for nightly builds.
1015  * let's leave it disabled but add an easy hook to compile it back in
1016  */
1017 #if 1
1018 
1019 #define assertWeightedPiecesAreSorted(t)
1020 #define assertReplicationCountIsExact(t)
1021 
1022 #else
1023 
assertWeightedPiecesAreSorted(Torrent * t)1024 static void assertWeightedPiecesAreSorted(Torrent* t)
1025 {
1026     if (t->endgame == 0)
1027     {
1028         setComparePieceByWeightTorrent(t);
1029 
1030         for (int i = 0; i < t->pieceCount - 1; ++i)
1031         {
1032             TR_ASSERT(comparePieceByWeight(&t->pieces[i], &t->pieces[i + 1]) <= 0);
1033         }
1034     }
1035 }
1036 
assertReplicationCountIsExact(Torrent * t)1037 static void assertReplicationCountIsExact(Torrent* t)
1038 {
1039     /* This assert might fail due to errors of implementations in other
1040      * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1041      * from a client. If a such a behavior is noticed,
1042      * a bug report should be filled to the faulty client. */
1043 
1044     uint16_t const* rep = t->pieceReplication;
1045     size_t const piece_count = t->pieceReplicationSize;
1046     tr_peer const** peers = (tr_peer const**)tr_ptrArrayBase(&t->peers);
1047     int const peer_count = tr_ptrArraySize(&t->peers);
1048 
1049     TR_ASSERT(piece_count == t->tor->info.pieceCount);
1050 
1051     for (size_t piece_i = 0; piece_i < piece_count; ++piece_i)
1052     {
1053         uint16_t r = 0;
1054 
1055         for (int peer_i = 0; peer_i < peer_count; ++peer_i)
1056         {
1057             if (tr_bitsetHas(&peers[peer_i]->have, piece_i))
1058             {
1059                 ++r;
1060             }
1061         }
1062 
1063         TR_ASSERT(rep[piece_i] == r);
1064     }
1065 }
1066 
1067 #endif
1068 
pieceListLookup(tr_swarm * s,tr_piece_index_t index)1069 static struct weighted_piece* pieceListLookup(tr_swarm* s, tr_piece_index_t index)
1070 {
1071     for (int i = 0; i < s->pieceCount; ++i)
1072     {
1073         if (s->pieces[i].index == index)
1074         {
1075             return &s->pieces[i];
1076         }
1077     }
1078 
1079     return NULL;
1080 }
1081 
pieceListRebuild(tr_swarm * s)1082 static void pieceListRebuild(tr_swarm* s)
1083 {
1084     if (!tr_torrentIsSeed(s->tor))
1085     {
1086         tr_piece_index_t* pool;
1087         tr_piece_index_t poolCount = 0;
1088         tr_torrent const* tor = s->tor;
1089         tr_info const* inf = tr_torrentInfo(tor);
1090         struct weighted_piece* pieces;
1091         int pieceCount;
1092 
1093         /* build the new list */
1094         pool = tr_new(tr_piece_index_t, inf->pieceCount);
1095 
1096         for (tr_piece_index_t i = 0; i < inf->pieceCount; ++i)
1097         {
1098             if (!inf->pieces[i].dnd)
1099             {
1100                 if (!tr_torrentPieceIsComplete(tor, i))
1101                 {
1102                     pool[poolCount++] = i;
1103                 }
1104             }
1105         }
1106 
1107         pieceCount = poolCount;
1108         pieces = tr_new0(struct weighted_piece, pieceCount);
1109 
1110         for (tr_piece_index_t i = 0; i < poolCount; ++i)
1111         {
1112             struct weighted_piece* piece = pieces + i;
1113             piece->index = pool[i];
1114             piece->requestCount = 0;
1115             piece->salt = tr_rand_int_weak(4096);
1116         }
1117 
1118         /* if we already had a list of pieces, merge it into
1119          * the new list so we don't lose its requestCounts */
1120         if (s->pieces != NULL)
1121         {
1122             struct weighted_piece* o = s->pieces;
1123             struct weighted_piece* oend = o + s->pieceCount;
1124             struct weighted_piece* n = pieces;
1125             struct weighted_piece* nend = n + pieceCount;
1126 
1127             pieceListSort(s, PIECES_SORTED_BY_INDEX);
1128 
1129             while (o != oend && n != nend)
1130             {
1131                 if (o->index < n->index)
1132                 {
1133                     ++o;
1134                 }
1135                 else if (o->index > n->index)
1136                 {
1137                     ++n;
1138                 }
1139                 else
1140                 {
1141                     *n++ = *o++;
1142                 }
1143             }
1144 
1145             tr_free(s->pieces);
1146         }
1147 
1148         s->pieces = pieces;
1149         s->pieceCount = pieceCount;
1150 
1151         pieceListSort(s, PIECES_SORTED_BY_WEIGHT);
1152 
1153         /* cleanup */
1154         tr_free(pool);
1155     }
1156 }
1157 
pieceListRemovePiece(tr_swarm * s,tr_piece_index_t piece)1158 static void pieceListRemovePiece(tr_swarm* s, tr_piece_index_t piece)
1159 {
1160     struct weighted_piece* p;
1161 
1162     if ((p = pieceListLookup(s, piece)) != NULL)
1163     {
1164         int const pos = p - s->pieces;
1165 
1166         tr_removeElementFromArray(s->pieces, pos, sizeof(struct weighted_piece), s->pieceCount);
1167         --s->pieceCount;
1168 
1169         if (s->pieceCount == 0)
1170         {
1171             tr_free(s->pieces);
1172             s->pieces = NULL;
1173         }
1174     }
1175 }
1176 
pieceListResortPiece(tr_swarm * s,struct weighted_piece * p)1177 static void pieceListResortPiece(tr_swarm* s, struct weighted_piece* p)
1178 {
1179     int pos;
1180     bool isSorted = true;
1181 
1182     if (p == NULL)
1183     {
1184         return;
1185     }
1186 
1187     /* is the torrent already sorted? */
1188     pos = p - s->pieces;
1189     setComparePieceByWeightTorrent(s);
1190 
1191     if (isSorted && pos > 0 && comparePieceByWeight(p - 1, p) > 0)
1192     {
1193         isSorted = false;
1194     }
1195 
1196     if (isSorted && pos < s->pieceCount - 1 && comparePieceByWeight(p, p + 1) > 0)
1197     {
1198         isSorted = false;
1199     }
1200 
1201     if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1202     {
1203         pieceListSort(s, PIECES_SORTED_BY_WEIGHT);
1204         isSorted = true;
1205     }
1206 
1207     /* if it's not sorted, move it around */
1208     if (!isSorted)
1209     {
1210         bool exact;
1211         struct weighted_piece const tmp = *p;
1212 
1213         tr_removeElementFromArray(s->pieces, pos, sizeof(struct weighted_piece), s->pieceCount);
1214         --s->pieceCount;
1215 
1216         pos = tr_lowerBound(&tmp, s->pieces, s->pieceCount, sizeof(struct weighted_piece), comparePieceByWeight, &exact);
1217 
1218         memmove(&s->pieces[pos + 1], &s->pieces[pos], sizeof(struct weighted_piece) * (s->pieceCount - pos));
1219         ++s->pieceCount;
1220 
1221         s->pieces[pos] = tmp;
1222     }
1223 
1224     assertWeightedPiecesAreSorted(s);
1225 }
1226 
pieceListRemoveRequest(tr_swarm * s,tr_block_index_t block)1227 static void pieceListRemoveRequest(tr_swarm* s, tr_block_index_t block)
1228 {
1229     struct weighted_piece* p;
1230     tr_piece_index_t const index = tr_torBlockPiece(s->tor, block);
1231 
1232     if ((p = pieceListLookup(s, index)) != NULL && p->requestCount > 0)
1233     {
1234         --p->requestCount;
1235         pieceListResortPiece(s, p);
1236     }
1237 }
1238 
1239 /****
1240 *****
1241 *****  Replication count (for rarest first policy)
1242 *****
1243 ****/
1244 
1245 /**
1246  * Increase the replication count of this piece and sort it if the
1247  * piece list is already sorted
1248  */
tr_incrReplicationOfPiece(tr_swarm * s,size_t const index)1249 static void tr_incrReplicationOfPiece(tr_swarm* s, size_t const index)
1250 {
1251     TR_ASSERT(replicationExists(s));
1252     TR_ASSERT(s->pieceReplicationSize == s->tor->info.pieceCount);
1253 
1254     /* One more replication of this piece is present in the swarm */
1255     ++s->pieceReplication[index];
1256 
1257     /* we only resort the piece if the list is already sorted */
1258     if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1259     {
1260         pieceListResortPiece(s, pieceListLookup(s, index));
1261     }
1262 }
1263 
1264 /**
1265  * Increases the replication count of pieces present in the bitfield
1266  */
tr_incrReplicationFromBitfield(tr_swarm * s,tr_bitfield const * b)1267 static void tr_incrReplicationFromBitfield(tr_swarm* s, tr_bitfield const* b)
1268 {
1269     TR_ASSERT(replicationExists(s));
1270 
1271     uint16_t* rep = s->pieceReplication;
1272 
1273     for (size_t i = 0, n = s->tor->info.pieceCount; i < n; ++i)
1274     {
1275         if (tr_bitfieldHas(b, i))
1276         {
1277             ++rep[i];
1278         }
1279     }
1280 
1281     if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1282     {
1283         invalidatePieceSorting(s);
1284     }
1285 }
1286 
1287 /**
1288  * Increase the replication count of every piece
1289  */
tr_incrReplication(tr_swarm * s)1290 static void tr_incrReplication(tr_swarm* s)
1291 {
1292     TR_ASSERT(replicationExists(s));
1293     TR_ASSERT(s->pieceReplicationSize == s->tor->info.pieceCount);
1294 
1295     for (size_t i = 0; i < s->pieceReplicationSize; ++i)
1296     {
1297         ++s->pieceReplication[i];
1298     }
1299 }
1300 
1301 /**
1302  * Decrease the replication count of pieces present in the bitset.
1303  */
tr_decrReplicationFromBitfield(tr_swarm * s,tr_bitfield const * b)1304 static void tr_decrReplicationFromBitfield(tr_swarm* s, tr_bitfield const* b)
1305 {
1306     TR_ASSERT(replicationExists(s));
1307     TR_ASSERT(s->pieceReplicationSize == s->tor->info.pieceCount);
1308 
1309     if (tr_bitfieldHasAll(b))
1310     {
1311         for (size_t i = 0; i < s->pieceReplicationSize; ++i)
1312         {
1313             --s->pieceReplication[i];
1314         }
1315     }
1316     else if (!tr_bitfieldHasNone(b))
1317     {
1318         for (size_t i = 0; i < s->pieceReplicationSize; ++i)
1319         {
1320             if (tr_bitfieldHas(b, i))
1321             {
1322                 --s->pieceReplication[i];
1323             }
1324         }
1325 
1326         if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1327         {
1328             invalidatePieceSorting(s);
1329         }
1330     }
1331 }
1332 
1333 /**
1334 ***
1335 **/
1336 
tr_peerMgrRebuildRequests(tr_torrent * tor)1337 void tr_peerMgrRebuildRequests(tr_torrent* tor)
1338 {
1339     TR_ASSERT(tr_isTorrent(tor));
1340 
1341     pieceListRebuild(tor->swarm);
1342 }
1343 
tr_peerMgrGetNextRequests(tr_torrent * tor,tr_peer * peer,int numwant,tr_block_index_t * setme,int * numgot,bool get_intervals)1344 void tr_peerMgrGetNextRequests(tr_torrent* tor, tr_peer* peer, int numwant, tr_block_index_t* setme, int* numgot,
1345     bool get_intervals)
1346 {
1347     /* sanity clause */
1348     TR_ASSERT(tr_isTorrent(tor));
1349     TR_ASSERT(numwant > 0);
1350 
1351     tr_swarm* s;
1352     tr_bitfield const* const have = &peer->have;
1353 
1354     /* walk through the pieces and find blocks that should be requested */
1355     s = tor->swarm;
1356 
1357     /* prep the pieces list */
1358     if (s->pieces == NULL)
1359     {
1360         pieceListRebuild(s);
1361     }
1362 
1363     if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1364     {
1365         pieceListSort(s, PIECES_SORTED_BY_WEIGHT);
1366     }
1367 
1368     assertReplicationCountIsExact(s);
1369     assertWeightedPiecesAreSorted(s);
1370 
1371     updateEndgame(s);
1372 
1373     struct weighted_piece* pieces = s->pieces;
1374     int got = 0;
1375     int checkedPieceCount = 0;
1376 
1377     for (int i = 0; i < s->pieceCount && got < numwant; ++i, ++checkedPieceCount)
1378     {
1379         struct weighted_piece* p = pieces + i;
1380 
1381         /* if the peer has this piece that we want... */
1382         if (tr_bitfieldHas(have, p->index))
1383         {
1384             tr_block_index_t first;
1385             tr_block_index_t last;
1386             tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1387 
1388             tr_torGetPieceBlockRange(tor, p->index, &first, &last);
1389 
1390             for (tr_block_index_t b = first; b <= last && (got < numwant || (get_intervals && setme[2 * got - 1] == b - 1));
1391                 ++b)
1392             {
1393                 int peerCount;
1394                 tr_peer** peers;
1395 
1396                 /* don't request blocks we've already got */
1397                 if (tr_torrentBlockIsComplete(tor, b))
1398                 {
1399                     continue;
1400                 }
1401 
1402                 /* always add peer if this block has no peers yet */
1403                 tr_ptrArrayClear(&peerArr);
1404                 getBlockRequestPeers(s, b, &peerArr);
1405                 peers = (tr_peer**)tr_ptrArrayPeek(&peerArr, &peerCount);
1406 
1407                 if (peerCount != 0)
1408                 {
1409                     /* don't make a second block request until the endgame */
1410                     if (s->endgame == 0)
1411                     {
1412                         continue;
1413                     }
1414 
1415                     /* don't have more than two peers requesting this block */
1416                     if (peerCount > 1)
1417                     {
1418                         continue;
1419                     }
1420 
1421                     /* don't send the same request to the same peer twice */
1422                     if (peer == peers[0])
1423                     {
1424                         continue;
1425                     }
1426 
1427                     /* in the endgame allow an additional peer to download a
1428                        block but only if the peer seems to be handling requests
1429                        relatively fast */
1430                     if (peer->pendingReqsToPeer + numwant - got < s->endgame)
1431                     {
1432                         continue;
1433                     }
1434                 }
1435 
1436                 /* update the caller's table */
1437                 if (!get_intervals)
1438                 {
1439                     setme[got++] = b;
1440                 }
1441                 /* if intervals are requested two array entries are necessarry:
1442                    one for the interval's starting block and one for its end block */
1443                 else if (got != 0 && setme[2 * got - 1] == b - 1 && b != first)
1444                 {
1445                     /* expand the last interval */
1446                     ++setme[2 * got - 1];
1447                 }
1448                 else
1449                 {
1450                     /* begin a new interval */
1451                     setme[2 * got] = b;
1452                     setme[2 * got + 1] = b;
1453                     ++got;
1454                 }
1455 
1456                 /* update our own tables */
1457                 requestListAdd(s, b, peer);
1458                 ++p->requestCount;
1459             }
1460 
1461             tr_ptrArrayDestruct(&peerArr, NULL);
1462         }
1463     }
1464 
1465     /* In most cases we've just changed the weights of a small number of pieces.
1466      * So rather than qsort()ing the entire array, it's faster to apply an
1467      * adaptive insertion sort algorithm. */
1468     if (got > 0)
1469     {
1470         /* not enough requests || last piece modified */
1471         if (checkedPieceCount == s->pieceCount)
1472         {
1473             --checkedPieceCount;
1474         }
1475 
1476         setComparePieceByWeightTorrent(s);
1477 
1478         for (int i = checkedPieceCount - 1; i >= 0; --i)
1479         {
1480             bool exact;
1481 
1482             /* relative position! */
1483             int const newpos = tr_lowerBound(&s->pieces[i], &s->pieces[i + 1], s->pieceCount - (i + 1),
1484                 sizeof(struct weighted_piece), comparePieceByWeight, &exact);
1485 
1486             if (newpos > 0)
1487             {
1488                 struct weighted_piece const piece = s->pieces[i];
1489                 memmove(&s->pieces[i], &s->pieces[i + 1], sizeof(struct weighted_piece) * newpos);
1490                 s->pieces[i + newpos] = piece;
1491             }
1492         }
1493     }
1494 
1495     assertWeightedPiecesAreSorted(t);
1496     *numgot = got;
1497 }
1498 
tr_peerMgrDidPeerRequest(tr_torrent const * tor,tr_peer const * peer,tr_block_index_t block)1499 bool tr_peerMgrDidPeerRequest(tr_torrent const* tor, tr_peer const* peer, tr_block_index_t block)
1500 {
1501     return requestListLookup((tr_swarm*)tor->swarm, block, peer) != NULL;
1502 }
1503 
1504 /* cancel requests that are too old */
refillUpkeep(evutil_socket_t foo UNUSED,short bar UNUSED,void * vmgr)1505 static void refillUpkeep(evutil_socket_t foo UNUSED, short bar UNUSED, void* vmgr)
1506 {
1507     time_t now;
1508     time_t too_old;
1509     tr_torrent* tor;
1510     int cancel_buflen = 0;
1511     struct block_request* cancel = NULL;
1512     tr_peerMgr* mgr = vmgr;
1513     managerLock(mgr);
1514 
1515     now = tr_time();
1516     too_old = now - REQUEST_TTL_SECS;
1517 
1518     /* alloc the temporary "cancel" buffer */
1519     tor = NULL;
1520 
1521     while ((tor = tr_torrentNext(mgr->session, tor)) != NULL)
1522     {
1523         cancel_buflen = MAX(cancel_buflen, tor->swarm->requestCount);
1524     }
1525 
1526     if (cancel_buflen > 0)
1527     {
1528         cancel = tr_new(struct block_request, cancel_buflen);
1529     }
1530 
1531     /* prune requests that are too old */
1532     tor = NULL;
1533 
1534     while ((tor = tr_torrentNext(mgr->session, tor)) != NULL)
1535     {
1536         tr_swarm* s = tor->swarm;
1537         int const n = s->requestCount;
1538 
1539         if (n > 0)
1540         {
1541             int keepCount = 0;
1542             int cancelCount = 0;
1543 
1544             for (int i = 0; i < n; ++i)
1545             {
1546                 struct block_request const* const request = &s->requests[i];
1547                 tr_peerMsgs* msgs = PEER_MSGS(request->peer);
1548 
1549                 if (msgs != NULL && request->sentAt <= too_old && !tr_peerMsgsIsReadingBlock(msgs, request->block))
1550                 {
1551                     TR_ASSERT(cancel != NULL);
1552                     TR_ASSERT(cancelCount < cancel_buflen);
1553 
1554                     cancel[cancelCount++] = *request;
1555                 }
1556                 else
1557                 {
1558                     if (i != keepCount)
1559                     {
1560                         s->requests[keepCount] = *request;
1561                     }
1562 
1563                     keepCount++;
1564                 }
1565             }
1566 
1567             /* prune out the ones we aren't keeping */
1568             s->requestCount = keepCount;
1569 
1570             /* send cancel messages for all the "cancel" ones */
1571             for (int i = 0; i < cancelCount; ++i)
1572             {
1573                 struct block_request const* const request = &cancel[i];
1574                 tr_peerMsgs* msgs = PEER_MSGS(request->peer);
1575 
1576                 if (msgs != NULL)
1577                 {
1578                     tr_historyAdd(&request->peer->cancelsSentToPeer, now, 1);
1579                     tr_peerMsgsCancel(msgs, request->block);
1580                     decrementPendingReqCount(request);
1581                 }
1582             }
1583 
1584             /* decrement the pending request counts for the timed-out blocks */
1585             for (int i = 0; i < cancelCount; ++i)
1586             {
1587                 struct block_request const* const request = &cancel[i];
1588 
1589                 pieceListRemoveRequest(s, request->block);
1590             }
1591         }
1592     }
1593 
1594     tr_free(cancel);
1595     tr_timerAddMsec(mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC);
1596     managerUnlock(mgr);
1597 }
1598 
addStrike(tr_swarm * s,tr_peer * peer)1599 static void addStrike(tr_swarm* s, tr_peer* peer)
1600 {
1601     tordbg(s, "increasing peer %s strike count to %d", tr_atomAddrStr(peer->atom), peer->strikes + 1);
1602 
1603     if (++peer->strikes >= MAX_BAD_PIECES_PER_PEER)
1604     {
1605         struct peer_atom* atom = peer->atom;
1606         atom->flags2 |= MYFLAG_BANNED;
1607         peer->doPurge = true;
1608         tordbg(s, "banning peer %s", tr_atomAddrStr(atom));
1609     }
1610 }
1611 
peerSuggestedPiece(tr_swarm * s UNUSED,tr_peer * peer UNUSED,tr_piece_index_t pieceIndex UNUSED,int isFastAllowed UNUSED)1612 static void peerSuggestedPiece(tr_swarm* s UNUSED, tr_peer* peer UNUSED, tr_piece_index_t pieceIndex UNUSED,
1613     int isFastAllowed UNUSED)
1614 {
1615 #if 0
1616 
1617     TR_ASSERT(t != NULL);
1618     TR_ASSERT(peer != NULL);
1619     TR_ASSERT(peer->msgs != NULL);
1620 
1621     /* is this a valid piece? */
1622     if (pieceIndex >= t->tor->info.pieceCount)
1623     {
1624         return;
1625     }
1626 
1627     /* don't ask for it if we've already got it */
1628     if (tr_torrentPieceIsComplete(t->tor, pieceIndex))
1629     {
1630         return;
1631     }
1632 
1633     /* don't ask for it if they don't have it */
1634     if (!tr_bitfieldHas(peer->have, pieceIndex))
1635     {
1636         return;
1637     }
1638 
1639     /* don't ask for it if we're choked and it's not fast */
1640     if (!isFastAllowed && peer->clientIsChoked)
1641     {
1642         return;
1643     }
1644 
1645     /* request the blocks that we don't have in this piece */
1646     {
1647         tr_block_index_t first;
1648         tr_block_index_t last;
1649         tr_torrent const* tor = t->tor;
1650 
1651         tr_torGetPieceBlockRange(t->tor, pieceIndex, &first, &last);
1652 
1653         for (tr_block_index_t b = first; b <= last; ++b)
1654         {
1655             if (tr_torrentBlockIsComplete(tor, b))
1656             {
1657                 uint32_t const offset = getBlockOffsetInPiece(tor, b);
1658                 uint32_t const length = tr_torBlockCountBytes(tor, b);
1659                 tr_peerMsgsAddRequest(peer->msgs, pieceIndex, offset, length);
1660                 incrementPieceRequests(t, pieceIndex);
1661             }
1662         }
1663     }
1664 
1665 #endif
1666 }
1667 
removeRequestFromTables(tr_swarm * s,tr_block_index_t block,tr_peer const * peer)1668 static void removeRequestFromTables(tr_swarm* s, tr_block_index_t block, tr_peer const* peer)
1669 {
1670     requestListRemove(s, block, peer);
1671     pieceListRemoveRequest(s, block);
1672 }
1673 
1674 /* peer choked us, or maybe it disconnected.
1675    either way we need to remove all its requests */
peerDeclinedAllRequests(tr_swarm * s,tr_peer const * peer)1676 static void peerDeclinedAllRequests(tr_swarm* s, tr_peer const* peer)
1677 {
1678     int n = 0;
1679     tr_block_index_t* blocks = tr_new(tr_block_index_t, s->requestCount);
1680 
1681     for (int i = 0; i < s->requestCount; ++i)
1682     {
1683         if (peer == s->requests[i].peer)
1684         {
1685             blocks[n++] = s->requests[i].block;
1686         }
1687     }
1688 
1689     for (int i = 0; i < n; ++i)
1690     {
1691         removeRequestFromTables(s, blocks[i], peer);
1692     }
1693 
1694     tr_free(blocks);
1695 }
1696 
cancelAllRequestsForBlock(tr_swarm * s,tr_block_index_t block,tr_peer * no_notify)1697 static void cancelAllRequestsForBlock(tr_swarm* s, tr_block_index_t block, tr_peer* no_notify)
1698 {
1699     int peerCount;
1700     tr_peer** peers;
1701     tr_ptrArray peerArr;
1702 
1703     peerArr = TR_PTR_ARRAY_INIT;
1704     getBlockRequestPeers(s, block, &peerArr);
1705     peers = (tr_peer**)tr_ptrArrayPeek(&peerArr, &peerCount);
1706 
1707     for (int i = 0; i < peerCount; ++i)
1708     {
1709         tr_peer* p = peers[i];
1710 
1711         if (p != no_notify && tr_isPeerMsgs(p))
1712         {
1713             tr_historyAdd(&p->cancelsSentToPeer, tr_time(), 1);
1714             tr_peerMsgsCancel(PEER_MSGS(p), block);
1715         }
1716 
1717         removeRequestFromTables(s, block, p);
1718     }
1719 
1720     tr_ptrArrayDestruct(&peerArr, NULL);
1721 }
1722 
tr_peerMgrPieceCompleted(tr_torrent * tor,tr_piece_index_t p)1723 void tr_peerMgrPieceCompleted(tr_torrent* tor, tr_piece_index_t p)
1724 {
1725     bool pieceCameFromPeers = false;
1726     tr_swarm* const s = tor->swarm;
1727 
1728     /* walk through our peers */
1729     for (int i = 0, n = tr_ptrArraySize(&s->peers); i < n; ++i)
1730     {
1731         tr_peer* peer = tr_ptrArrayNth(&s->peers, i);
1732 
1733         /* notify the peer that we now have this piece */
1734         tr_peerMsgsHave(PEER_MSGS(peer), p);
1735 
1736         if (!pieceCameFromPeers)
1737         {
1738             pieceCameFromPeers = tr_bitfieldHas(&peer->blame, p);
1739         }
1740     }
1741 
1742     if (pieceCameFromPeers) /* webseed downloads don't belong in announce totals */
1743     {
1744         tr_announcerAddBytes(tor, TR_ANN_DOWN, tr_torPieceCountBytes(tor, p));
1745     }
1746 
1747     /* bookkeeping */
1748     pieceListRemovePiece(s, p);
1749     s->needsCompletenessCheck = true;
1750 }
1751 
peerCallbackFunc(tr_peer * peer,tr_peer_event const * e,void * vs)1752 static void peerCallbackFunc(tr_peer* peer, tr_peer_event const* e, void* vs)
1753 {
1754     TR_ASSERT(peer != NULL);
1755 
1756     tr_swarm* s = vs;
1757 
1758     swarmLock(s);
1759 
1760     switch (e->eventType)
1761     {
1762     case TR_PEER_PEER_GOT_PIECE_DATA:
1763         {
1764             time_t const now = tr_time();
1765             tr_torrent* tor = s->tor;
1766 
1767             tor->uploadedCur += e->length;
1768             tr_announcerAddBytes(tor, TR_ANN_UP, e->length);
1769             tr_torrentSetDateActive(tor, now);
1770             tr_torrentSetDirty(tor);
1771             tr_statsAddUploaded(tor->session, e->length);
1772 
1773             if (peer->atom != NULL)
1774             {
1775                 peer->atom->piece_data_time = now;
1776             }
1777 
1778             break;
1779         }
1780 
1781     case TR_PEER_CLIENT_GOT_PIECE_DATA:
1782         {
1783             time_t const now = tr_time();
1784             tr_torrent* tor = s->tor;
1785 
1786             tor->downloadedCur += e->length;
1787             tr_torrentSetDateActive(tor, now);
1788             tr_torrentSetDirty(tor);
1789 
1790             tr_statsAddDownloaded(tor->session, e->length);
1791 
1792             if (peer->atom != NULL)
1793             {
1794                 peer->atom->piece_data_time = now;
1795             }
1796 
1797             break;
1798         }
1799 
1800     case TR_PEER_CLIENT_GOT_HAVE:
1801         if (replicationExists(s))
1802         {
1803             tr_incrReplicationOfPiece(s, e->pieceIndex);
1804             assertReplicationCountIsExact(s);
1805         }
1806 
1807         break;
1808 
1809     case TR_PEER_CLIENT_GOT_HAVE_ALL:
1810         if (replicationExists(s))
1811         {
1812             tr_incrReplication(s);
1813             assertReplicationCountIsExact(s);
1814         }
1815 
1816         break;
1817 
1818     case TR_PEER_CLIENT_GOT_HAVE_NONE:
1819         /* noop */
1820         break;
1821 
1822     case TR_PEER_CLIENT_GOT_BITFIELD:
1823         TR_ASSERT(e->bitfield != NULL);
1824 
1825         if (replicationExists(s))
1826         {
1827             tr_incrReplicationFromBitfield(s, e->bitfield);
1828             assertReplicationCountIsExact(s);
1829         }
1830 
1831         break;
1832 
1833     case TR_PEER_CLIENT_GOT_REJ:
1834         {
1835             tr_block_index_t b = _tr_block(s->tor, e->pieceIndex, e->offset);
1836 
1837             if (b < s->tor->blockCount)
1838             {
1839                 removeRequestFromTables(s, b, peer);
1840             }
1841             else
1842             {
1843                 tordbg(s, "Peer %s sent an out-of-range reject message", tr_atomAddrStr(peer->atom));
1844             }
1845 
1846             break;
1847         }
1848 
1849     case TR_PEER_CLIENT_GOT_CHOKE:
1850         peerDeclinedAllRequests(s, peer);
1851         break;
1852 
1853     case TR_PEER_CLIENT_GOT_PORT:
1854         if (peer->atom != NULL)
1855         {
1856             peer->atom->port = e->port;
1857         }
1858 
1859         break;
1860 
1861     case TR_PEER_CLIENT_GOT_SUGGEST:
1862         peerSuggestedPiece(s, peer, e->pieceIndex, false);
1863         break;
1864 
1865     case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1866         peerSuggestedPiece(s, peer, e->pieceIndex, true);
1867         break;
1868 
1869     case TR_PEER_CLIENT_GOT_BLOCK:
1870         {
1871             tr_torrent* tor = s->tor;
1872             tr_piece_index_t const p = e->pieceIndex;
1873             tr_block_index_t const block = _tr_block(tor, p, e->offset);
1874             cancelAllRequestsForBlock(s, block, peer);
1875             tr_historyAdd(&peer->blocksSentToClient, tr_time(), 1);
1876             pieceListResortPiece(s, pieceListLookup(s, p));
1877             tr_torrentGotBlock(tor, block);
1878             break;
1879         }
1880 
1881     case TR_PEER_ERROR:
1882         if (e->err == ERANGE || e->err == EMSGSIZE || e->err == ENOTCONN)
1883         {
1884             /* some protocol error from the peer */
1885             peer->doPurge = true;
1886             tordbg(s, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_atomAddrStr(
1887                 peer->atom));
1888         }
1889         else
1890         {
1891             tordbg(s, "unhandled error: %s", tr_strerror(e->err));
1892         }
1893 
1894         break;
1895 
1896     default:
1897         TR_ASSERT_MSG(false, "unhandled peer event type %d", (int)e->eventType);
1898     }
1899 
1900     swarmUnlock(s);
1901 }
1902 
getDefaultShelfLife(uint8_t from)1903 static int getDefaultShelfLife(uint8_t from)
1904 {
1905     /* in general, peers obtained from firsthand contact
1906      * are better than those from secondhand, etc etc */
1907     switch (from)
1908     {
1909     case TR_PEER_FROM_INCOMING:
1910         return 60 * 60 * 6;
1911 
1912     case TR_PEER_FROM_LTEP:
1913         return 60 * 60 * 6;
1914 
1915     case TR_PEER_FROM_TRACKER:
1916         return 60 * 60 * 3;
1917 
1918     case TR_PEER_FROM_DHT:
1919         return 60 * 60 * 3;
1920 
1921     case TR_PEER_FROM_PEX:
1922         return 60 * 60 * 2;
1923 
1924     case TR_PEER_FROM_RESUME:
1925         return 60 * 60;
1926 
1927     case TR_PEER_FROM_LPD:
1928         return 10 * 60;
1929 
1930     default:
1931         return 60 * 60;
1932     }
1933 }
1934 
ensureAtomExists(tr_swarm * s,tr_address const * addr,tr_port const port,uint8_t const flags,int8_t const seedProbability,uint8_t const from)1935 static void ensureAtomExists(tr_swarm* s, tr_address const* addr, tr_port const port, uint8_t const flags,
1936     int8_t const seedProbability, uint8_t const from)
1937 {
1938     TR_ASSERT(tr_address_is_valid(addr));
1939     TR_ASSERT(from < TR_PEER_FROM__MAX);
1940 
1941     struct peer_atom* a = getExistingAtom(s, addr);
1942 
1943     if (a == NULL)
1944     {
1945         int const jitter = tr_rand_int_weak(60 * 10);
1946         a = tr_new0(struct peer_atom, 1);
1947         a->addr = *addr;
1948         a->port = port;
1949         a->flags = flags;
1950         a->fromFirst = from;
1951         a->fromBest = from;
1952         a->shelf_date = tr_time() + getDefaultShelfLife(from) + jitter;
1953         a->blocklisted = -1;
1954         atomSetSeedProbability(a, seedProbability);
1955         tr_ptrArrayInsertSorted(&s->pool, a, compareAtomsByAddress);
1956 
1957         tordbg(s, "got a new atom: %s", tr_atomAddrStr(a));
1958     }
1959     else
1960     {
1961         if (from < a->fromBest)
1962         {
1963             a->fromBest = from;
1964         }
1965 
1966         if (a->seedProbability == -1)
1967         {
1968             atomSetSeedProbability(a, seedProbability);
1969         }
1970 
1971         a->flags |= flags;
1972     }
1973 }
1974 
getMaxPeerCount(tr_torrent const * tor)1975 static int getMaxPeerCount(tr_torrent const* tor)
1976 {
1977     return tor->maxConnectedPeers;
1978 }
1979 
getPeerCount(tr_swarm const * s)1980 static int getPeerCount(tr_swarm const* s)
1981 {
1982     return tr_ptrArraySize(&s->peers); /* + tr_ptrArraySize(&t->outgoingHandshakes); */
1983 }
1984 
createBitTorrentPeer(tr_torrent * tor,struct tr_peerIo * io,struct peer_atom * atom,tr_quark client)1985 static void createBitTorrentPeer(tr_torrent* tor, struct tr_peerIo* io, struct peer_atom* atom, tr_quark client)
1986 {
1987     TR_ASSERT(atom != NULL);
1988     TR_ASSERT(tr_isTorrent(tor));
1989     TR_ASSERT(tor->swarm != NULL);
1990 
1991     tr_swarm* swarm = tor->swarm;
1992 
1993     tr_peer* peer = (tr_peer*)tr_peerMsgsNew(tor, io, peerCallbackFunc, swarm);
1994     peer->atom = atom;
1995     peer->client = client;
1996     atom->peer = peer;
1997 
1998     tr_ptrArrayInsertSorted(&swarm->peers, peer, peerCompare);
1999     ++swarm->stats.peerCount;
2000     ++swarm->stats.peerFromCount[atom->fromFirst];
2001 
2002     TR_ASSERT(swarm->stats.peerCount == tr_ptrArraySize(&swarm->peers));
2003     TR_ASSERT(swarm->stats.peerFromCount[atom->fromFirst] <= swarm->stats.peerCount);
2004 
2005     tr_peerMsgs* msgs = PEER_MSGS(peer);
2006     tr_peerMsgsUpdateActive(msgs, TR_UP);
2007     tr_peerMsgsUpdateActive(msgs, TR_DOWN);
2008 }
2009 
2010 /* FIXME: this is kind of a mess. */
myHandshakeDoneCB(tr_handshake * handshake,tr_peerIo * io,bool readAnythingFromPeer,bool isConnected,uint8_t const * peer_id,void * vmanager)2011 static bool myHandshakeDoneCB(tr_handshake* handshake, tr_peerIo* io, bool readAnythingFromPeer, bool isConnected,
2012     uint8_t const* peer_id, void* vmanager)
2013 {
2014     TR_ASSERT(io != NULL);
2015 
2016     bool ok = isConnected;
2017     bool success = false;
2018     tr_port port;
2019     tr_address const* addr;
2020     tr_peerMgr* manager = vmanager;
2021     tr_swarm* s = tr_peerIoHasTorrentHash(io) ? getExistingSwarm(manager, tr_peerIoGetTorrentHash(io)) : NULL;
2022 
2023     if (tr_peerIoIsIncoming(io))
2024     {
2025         tr_ptrArrayRemoveSortedPointer(&manager->incomingHandshakes, handshake, handshakeCompare);
2026     }
2027     else if (s != NULL)
2028     {
2029         tr_ptrArrayRemoveSortedPointer(&s->outgoingHandshakes, handshake, handshakeCompare);
2030     }
2031 
2032     if (s != NULL)
2033     {
2034         swarmLock(s);
2035     }
2036 
2037     addr = tr_peerIoGetAddress(io, &port);
2038 
2039     if (!ok || s == NULL || !s->isRunning)
2040     {
2041         if (s != NULL)
2042         {
2043             struct peer_atom* atom = getExistingAtom(s, addr);
2044 
2045             if (atom != NULL)
2046             {
2047                 ++atom->numFails;
2048 
2049                 if (!readAnythingFromPeer)
2050                 {
2051                     tordbg(s, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr(atom), (int)atom->numFails);
2052                     atom->flags2 |= MYFLAG_UNREACHABLE;
2053                 }
2054             }
2055         }
2056     }
2057     else /* looking good */
2058     {
2059         struct peer_atom* atom;
2060 
2061         ensureAtomExists(s, addr, port, 0, -1, TR_PEER_FROM_INCOMING);
2062         atom = getExistingAtom(s, addr);
2063 
2064         TR_ASSERT(atom != NULL);
2065 
2066         atom->time = tr_time();
2067         atom->piece_data_time = 0;
2068         atom->lastConnectionAt = tr_time();
2069 
2070         if (!tr_peerIoIsIncoming(io))
2071         {
2072             atom->flags |= ADDED_F_CONNECTABLE;
2073             atom->flags2 &= ~MYFLAG_UNREACHABLE;
2074         }
2075 
2076         /* In principle, this flag specifies whether the peer groks uTP,
2077            not whether it's currently connected over uTP. */
2078         if (io->socket.type == TR_PEER_SOCKET_TYPE_UTP)
2079         {
2080             atom->flags |= ADDED_F_UTP_FLAGS;
2081         }
2082 
2083         if ((atom->flags2 & MYFLAG_BANNED) != 0)
2084         {
2085             tordbg(s, "banned peer %s tried to reconnect", tr_atomAddrStr(atom));
2086         }
2087         else if (tr_peerIoIsIncoming(io) && getPeerCount(s) >= getMaxPeerCount(s->tor))
2088         {
2089         }
2090         else
2091         {
2092             tr_peer* peer = atom->peer;
2093 
2094             if (peer != NULL)
2095             {
2096                 /* we already have this peer */
2097             }
2098             else
2099             {
2100                 tr_quark client;
2101                 tr_peerIo* io;
2102                 char buf[128];
2103 
2104                 if (peer_id != NULL)
2105                 {
2106                     client = tr_quark_new(tr_clientForId(buf, sizeof(buf), peer_id), TR_BAD_SIZE);
2107                 }
2108                 else
2109                 {
2110                     client = TR_KEY_NONE;
2111                 }
2112 
2113                 io = tr_handshakeStealIO(handshake); /* this steals its refcount too, which is balanced by our unref in peerDelete() */
2114                 tr_peerIoSetParent(io, &s->tor->bandwidth);
2115                 createBitTorrentPeer(s->tor, io, atom, client);
2116 
2117                 success = true;
2118             }
2119         }
2120     }
2121 
2122     if (s != NULL)
2123     {
2124         swarmUnlock(s);
2125     }
2126 
2127     return success;
2128 }
2129 
close_peer_socket(struct tr_peer_socket const socket,tr_session * session)2130 static void close_peer_socket(struct tr_peer_socket const socket, tr_session* session)
2131 {
2132     switch (socket.type)
2133     {
2134     case TR_PEER_SOCKET_TYPE_NONE:
2135         break;
2136 
2137     case TR_PEER_SOCKET_TYPE_TCP:
2138         tr_netClose(session, socket.handle.tcp);
2139         break;
2140 
2141 #ifdef WITH_UTP
2142 
2143     case TR_PEER_SOCKET_TYPE_UTP:
2144         UTP_Close(socket.handle.utp);
2145         break;
2146 
2147 #endif
2148 
2149     default:
2150         TR_ASSERT_MSG(false, "unsupported peer socket type %d", socket.type);
2151     }
2152 }
2153 
tr_peerMgrAddIncoming(tr_peerMgr * manager,tr_address * addr,tr_port port,struct tr_peer_socket const socket)2154 void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_address* addr, tr_port port, struct tr_peer_socket const socket)
2155 {
2156     TR_ASSERT(tr_isSession(manager->session));
2157 
2158     managerLock(manager);
2159 
2160     tr_session* session = manager->session;
2161 
2162     if (tr_sessionIsAddressBlocked(session, addr))
2163     {
2164         tr_logAddDebug("Banned IP address \"%s\" tried to connect to us", tr_address_to_string(addr));
2165         close_peer_socket(socket, session);
2166     }
2167     else if (getExistingHandshake(&manager->incomingHandshakes, addr) != NULL)
2168     {
2169         close_peer_socket(socket, session);
2170     }
2171     else /* we don't have a connection to them yet... */
2172     {
2173         tr_peerIo* io;
2174         tr_handshake* handshake;
2175 
2176         io = tr_peerIoNewIncoming(session, &session->bandwidth, addr, port, socket);
2177 
2178         handshake = tr_handshakeNew(io, session->encryptionMode, myHandshakeDoneCB, manager);
2179 
2180         tr_peerIoUnref(io); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2181 
2182         tr_ptrArrayInsertSorted(&manager->incomingHandshakes, handshake, handshakeCompare);
2183     }
2184 
2185     managerUnlock(manager);
2186 }
2187 
tr_peerMgrAddPex(tr_torrent * tor,uint8_t from,tr_pex const * pex,int8_t seedProbability)2188 void tr_peerMgrAddPex(tr_torrent* tor, uint8_t from, tr_pex const* pex, int8_t seedProbability)
2189 {
2190     if (tr_isPex(pex)) /* safeguard against corrupt data */
2191     {
2192         tr_swarm* s = tor->swarm;
2193         managerLock(s->manager);
2194 
2195         if (!tr_sessionIsAddressBlocked(s->manager->session, &pex->addr))
2196         {
2197             if (tr_address_is_valid_for_peers(&pex->addr, pex->port))
2198             {
2199                 ensureAtomExists(s, &pex->addr, pex->port, pex->flags, seedProbability, from);
2200             }
2201         }
2202 
2203         managerUnlock(s->manager);
2204     }
2205 }
2206 
tr_peerMgrCompactToPex(void const * compact,size_t compactLen,uint8_t const * added_f,size_t added_f_len,size_t * pexCount)2207 tr_pex* tr_peerMgrCompactToPex(void const* compact, size_t compactLen, uint8_t const* added_f, size_t added_f_len,
2208     size_t* pexCount)
2209 {
2210     size_t n = compactLen / 6;
2211     uint8_t const* walk = compact;
2212     tr_pex* pex = tr_new0(tr_pex, n);
2213 
2214     for (size_t i = 0; i < n; ++i)
2215     {
2216         pex[i].addr.type = TR_AF_INET;
2217         memcpy(&pex[i].addr.addr, walk, 4);
2218         walk += 4;
2219         memcpy(&pex[i].port, walk, 2);
2220         walk += 2;
2221 
2222         if (added_f != NULL && n == added_f_len)
2223         {
2224             pex[i].flags = added_f[i];
2225         }
2226     }
2227 
2228     *pexCount = n;
2229     return pex;
2230 }
2231 
tr_peerMgrCompact6ToPex(void const * compact,size_t compactLen,uint8_t const * added_f,size_t added_f_len,size_t * pexCount)2232 tr_pex* tr_peerMgrCompact6ToPex(void const* compact, size_t compactLen, uint8_t const* added_f, size_t added_f_len,
2233     size_t* pexCount)
2234 {
2235     size_t n = compactLen / 18;
2236     uint8_t const* walk = compact;
2237     tr_pex* pex = tr_new0(tr_pex, n);
2238 
2239     for (size_t i = 0; i < n; ++i)
2240     {
2241         pex[i].addr.type = TR_AF_INET6;
2242         memcpy(&pex[i].addr.addr.addr6.s6_addr, walk, 16);
2243         walk += 16;
2244         memcpy(&pex[i].port, walk, 2);
2245         walk += 2;
2246 
2247         if (added_f != NULL && n == added_f_len)
2248         {
2249             pex[i].flags = added_f[i];
2250         }
2251     }
2252 
2253     *pexCount = n;
2254     return pex;
2255 }
2256 
2257 /**
2258 ***
2259 **/
2260 
tr_peerMgrGotBadPiece(tr_torrent * tor,tr_piece_index_t pieceIndex)2261 void tr_peerMgrGotBadPiece(tr_torrent* tor, tr_piece_index_t pieceIndex)
2262 {
2263     tr_swarm* s = tor->swarm;
2264     uint32_t const byteCount = tr_torPieceCountBytes(tor, pieceIndex);
2265 
2266     for (int i = 0, n = tr_ptrArraySize(&s->peers); i != n; ++i)
2267     {
2268         tr_peer* peer = tr_ptrArrayNth(&s->peers, i);
2269 
2270         if (tr_bitfieldHas(&peer->blame, pieceIndex))
2271         {
2272             tordbg(s, "peer %s contributed to corrupt piece (%d); now has %d strikes", tr_atomAddrStr(peer->atom), pieceIndex,
2273                 (int)peer->strikes + 1);
2274             addStrike(s, peer);
2275         }
2276     }
2277 
2278     tr_announcerAddBytes(tor, TR_ANN_CORRUPT, byteCount);
2279 }
2280 
tr_pexCompare(void const * va,void const * vb)2281 int tr_pexCompare(void const* va, void const* vb)
2282 {
2283     tr_pex const* a = va;
2284     tr_pex const* b = vb;
2285 
2286     TR_ASSERT(tr_isPex(a));
2287     TR_ASSERT(tr_isPex(b));
2288 
2289     int i;
2290 
2291     if ((i = tr_address_compare(&a->addr, &b->addr)) != 0)
2292     {
2293         return i;
2294     }
2295 
2296     if (a->port != b->port)
2297     {
2298         return a->port < b->port ? -1 : 1;
2299     }
2300 
2301     return 0;
2302 }
2303 
2304 /* better goes first */
compareAtomsByUsefulness(void const * va,void const * vb)2305 static int compareAtomsByUsefulness(void const* va, void const* vb)
2306 {
2307     struct peer_atom const* a = *(struct peer_atom const* const*)va;
2308     struct peer_atom const* b = *(struct peer_atom const* const*)vb;
2309 
2310     TR_ASSERT(tr_isAtom(a));
2311     TR_ASSERT(tr_isAtom(b));
2312 
2313     if (a->piece_data_time != b->piece_data_time)
2314     {
2315         return a->piece_data_time > b->piece_data_time ? -1 : 1;
2316     }
2317 
2318     if (a->fromBest != b->fromBest)
2319     {
2320         return a->fromBest < b->fromBest ? -1 : 1;
2321     }
2322 
2323     if (a->numFails != b->numFails)
2324     {
2325         return a->numFails < b->numFails ? -1 : 1;
2326     }
2327 
2328     return 0;
2329 }
2330 
isAtomInteresting(tr_torrent const * tor,struct peer_atom * atom)2331 static bool isAtomInteresting(tr_torrent const* tor, struct peer_atom* atom)
2332 {
2333     if (tr_torrentIsSeed(tor) && atomIsSeed(atom))
2334     {
2335         return false;
2336     }
2337 
2338     if (peerIsInUse(tor->swarm, atom))
2339     {
2340         return true;
2341     }
2342 
2343     if (isAtomBlocklisted(tor->session, atom))
2344     {
2345         return false;
2346     }
2347 
2348     if ((atom->flags2 & MYFLAG_BANNED) != 0)
2349     {
2350         return false;
2351     }
2352 
2353     return true;
2354 }
2355 
tr_peerMgrGetPeers(tr_torrent * tor,tr_pex ** setme_pex,uint8_t af,uint8_t list_mode,int maxCount)2356 int tr_peerMgrGetPeers(tr_torrent* tor, tr_pex** setme_pex, uint8_t af, uint8_t list_mode, int maxCount)
2357 {
2358     TR_ASSERT(tr_isTorrent(tor));
2359     TR_ASSERT(setme_pex != NULL);
2360     TR_ASSERT(af == TR_AF_INET || af == TR_AF_INET6);
2361     TR_ASSERT(list_mode == TR_PEERS_CONNECTED || list_mode == TR_PEERS_INTERESTING);
2362 
2363     int n;
2364     int count = 0;
2365     int atomCount = 0;
2366     tr_swarm const* s = tor->swarm;
2367     struct peer_atom** atoms = NULL;
2368     tr_pex* pex;
2369     tr_pex* walk;
2370 
2371     managerLock(s->manager);
2372 
2373     /**
2374     ***  build a list of atoms
2375     **/
2376 
2377     if (list_mode == TR_PEERS_CONNECTED) /* connected peers only */
2378     {
2379         tr_peer const** peers = (tr_peer const**)tr_ptrArrayBase(&s->peers);
2380         atomCount = tr_ptrArraySize(&s->peers);
2381         atoms = tr_new(struct peer_atom*, atomCount);
2382 
2383         for (int i = 0; i < atomCount; ++i)
2384         {
2385             atoms[i] = peers[i]->atom;
2386         }
2387     }
2388     else /* TR_PEERS_INTERESTING */
2389     {
2390         struct peer_atom** atomBase = (struct peer_atom**)tr_ptrArrayBase(&s->pool);
2391         n = tr_ptrArraySize(&s->pool);
2392         atoms = tr_new(struct peer_atom*, n);
2393 
2394         for (int i = 0; i < n; ++i)
2395         {
2396             if (isAtomInteresting(tor, atomBase[i]))
2397             {
2398                 atoms[atomCount++] = atomBase[i];
2399             }
2400         }
2401     }
2402 
2403     qsort(atoms, atomCount, sizeof(struct peer_atom*), compareAtomsByUsefulness);
2404 
2405     /**
2406     ***  add the first N of them into our return list
2407     **/
2408 
2409     n = MIN(atomCount, maxCount);
2410     pex = walk = tr_new0(tr_pex, n);
2411 
2412     for (int i = 0; i < atomCount && count < n; ++i)
2413     {
2414         struct peer_atom const* atom = atoms[i];
2415 
2416         if (atom->addr.type == af)
2417         {
2418             TR_ASSERT(tr_address_is_valid(&atom->addr));
2419 
2420             walk->addr = atom->addr;
2421             walk->port = atom->port;
2422             walk->flags = atom->flags;
2423             ++count;
2424             ++walk;
2425         }
2426     }
2427 
2428     qsort(pex, count, sizeof(tr_pex), tr_pexCompare);
2429 
2430     TR_ASSERT(walk - pex == count);
2431     *setme_pex = pex;
2432 
2433     /* cleanup */
2434     tr_free(atoms);
2435     managerUnlock(s->manager);
2436     return count;
2437 }
2438 
2439 static void atomPulse(evutil_socket_t, short, void*);
2440 static void bandwidthPulse(evutil_socket_t, short, void*);
2441 static void rechokePulse(evutil_socket_t, short, void*);
2442 static void reconnectPulse(evutil_socket_t, short, void*);
2443 
createTimer(tr_session * session,int msec,event_callback_fn callback,void * cbdata)2444 static struct event* createTimer(tr_session* session, int msec, event_callback_fn callback, void* cbdata)
2445 {
2446     struct event* timer = evtimer_new(session->event_base, callback, cbdata);
2447     tr_timerAddMsec(timer, msec);
2448     return timer;
2449 }
2450 
ensureMgrTimersExist(struct tr_peerMgr * m)2451 static void ensureMgrTimersExist(struct tr_peerMgr* m)
2452 {
2453     if (m->atomTimer == NULL)
2454     {
2455         m->atomTimer = createTimer(m->session, ATOM_PERIOD_MSEC, atomPulse, m);
2456     }
2457 
2458     if (m->bandwidthTimer == NULL)
2459     {
2460         m->bandwidthTimer = createTimer(m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m);
2461     }
2462 
2463     if (m->rechokeTimer == NULL)
2464     {
2465         m->rechokeTimer = createTimer(m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m);
2466     }
2467 
2468     if (m->refillUpkeepTimer == NULL)
2469     {
2470         m->refillUpkeepTimer = createTimer(m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m);
2471     }
2472 }
2473 
tr_peerMgrStartTorrent(tr_torrent * tor)2474 void tr_peerMgrStartTorrent(tr_torrent* tor)
2475 {
2476     TR_ASSERT(tr_isTorrent(tor));
2477     TR_ASSERT(tr_torrentIsLocked(tor));
2478 
2479     tr_swarm* s = tor->swarm;
2480 
2481     ensureMgrTimersExist(s->manager);
2482 
2483     s->isRunning = true;
2484     s->maxPeers = tor->maxConnectedPeers;
2485     s->pieceSortState = PIECES_UNSORTED;
2486 
2487     rechokePulse(0, 0, s->manager);
2488 }
2489 
2490 static void removeAllPeers(tr_swarm*);
2491 
stopSwarm(tr_swarm * swarm)2492 static void stopSwarm(tr_swarm* swarm)
2493 {
2494     swarm->isRunning = false;
2495 
2496     replicationFree(swarm);
2497     invalidatePieceSorting(swarm);
2498 
2499     removeAllPeers(swarm);
2500 
2501     /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2502      * which removes the handshake from t->outgoingHandshakes... */
2503     while (!tr_ptrArrayEmpty(&swarm->outgoingHandshakes))
2504     {
2505         tr_handshakeAbort(tr_ptrArrayNth(&swarm->outgoingHandshakes, 0));
2506     }
2507 }
2508 
tr_peerMgrStopTorrent(tr_torrent * tor)2509 void tr_peerMgrStopTorrent(tr_torrent* tor)
2510 {
2511     TR_ASSERT(tr_isTorrent(tor));
2512     TR_ASSERT(tr_torrentIsLocked(tor));
2513 
2514     stopSwarm(tor->swarm);
2515 }
2516 
tr_peerMgrAddTorrent(tr_peerMgr * manager,tr_torrent * tor)2517 void tr_peerMgrAddTorrent(tr_peerMgr* manager, tr_torrent* tor)
2518 {
2519     TR_ASSERT(tr_isTorrent(tor));
2520     TR_ASSERT(tr_torrentIsLocked(tor));
2521     TR_ASSERT(tor->swarm == NULL);
2522 
2523     tor->swarm = swarmNew(manager, tor);
2524 }
2525 
tr_peerMgrRemoveTorrent(tr_torrent * tor)2526 void tr_peerMgrRemoveTorrent(tr_torrent* tor)
2527 {
2528     TR_ASSERT(tr_isTorrent(tor));
2529     TR_ASSERT(tr_torrentIsLocked(tor));
2530 
2531     stopSwarm(tor->swarm);
2532     swarmFree(tor->swarm);
2533 }
2534 
tr_peerUpdateProgress(tr_torrent * tor,tr_peer * peer)2535 void tr_peerUpdateProgress(tr_torrent* tor, tr_peer* peer)
2536 {
2537     tr_bitfield const* have = &peer->have;
2538 
2539     if (tr_bitfieldHasAll(have))
2540     {
2541         peer->progress = 1.0;
2542     }
2543     else if (tr_bitfieldHasNone(have))
2544     {
2545         peer->progress = 0.0;
2546     }
2547     else
2548     {
2549         float const true_count = tr_bitfieldCountTrueBits(have);
2550 
2551         if (tr_torrentHasMetadata(tor))
2552         {
2553             peer->progress = true_count / tor->info.pieceCount;
2554         }
2555         else /* without pieceCount, this result is only a best guess... */
2556         {
2557             peer->progress = true_count / (have->bit_count + 1);
2558         }
2559     }
2560 
2561     /* clamp the progress range */
2562     if (peer->progress < 0.0)
2563     {
2564         peer->progress = 0.0;
2565     }
2566 
2567     if (peer->progress > 1.0)
2568     {
2569         peer->progress = 1.0;
2570     }
2571 
2572     if (peer->atom != NULL && peer->progress >= 1.0)
2573     {
2574         atomSetSeed(tor->swarm, peer->atom);
2575     }
2576 }
2577 
tr_peerMgrOnTorrentGotMetainfo(tr_torrent * tor)2578 void tr_peerMgrOnTorrentGotMetainfo(tr_torrent* tor)
2579 {
2580     int peerCount;
2581     tr_peer** peers;
2582 
2583     /* the webseed list may have changed... */
2584     rebuildWebseedArray(tor->swarm, tor);
2585 
2586     /* some peer_msgs' progress fields may not be accurate if we
2587        didn't have the metadata before now... so refresh them all... */
2588     peerCount = tr_ptrArraySize(&tor->swarm->peers);
2589     peers = (tr_peer**)tr_ptrArrayBase(&tor->swarm->peers);
2590 
2591     for (int i = 0; i < peerCount; ++i)
2592     {
2593         tr_peerUpdateProgress(tor, peers[i]);
2594     }
2595 
2596     /* update the bittorrent peers' willingnes... */
2597     for (int i = 0; i < peerCount; ++i)
2598     {
2599         tr_peerMsgsUpdateActive(tr_peerMsgsCast(peers[i]), TR_UP);
2600         tr_peerMsgsUpdateActive(tr_peerMsgsCast(peers[i]), TR_DOWN);
2601     }
2602 }
2603 
tr_peerMgrTorrentAvailability(tr_torrent const * tor,int8_t * tab,unsigned int tabCount)2604 void tr_peerMgrTorrentAvailability(tr_torrent const* tor, int8_t* tab, unsigned int tabCount)
2605 {
2606     TR_ASSERT(tr_isTorrent(tor));
2607     TR_ASSERT(tab != NULL);
2608     TR_ASSERT(tabCount > 0);
2609 
2610     memset(tab, 0, tabCount);
2611 
2612     if (tr_torrentHasMetadata(tor))
2613     {
2614         int const peerCount = tr_ptrArraySize(&tor->swarm->peers);
2615         tr_peer const** peers = (tr_peer const**)tr_ptrArrayBase(&tor->swarm->peers);
2616         float const interval = tor->info.pieceCount / (float)tabCount;
2617         bool const isSeed = tr_torrentGetCompleteness(tor) == TR_SEED;
2618 
2619         for (tr_piece_index_t i = 0; i < tabCount; ++i)
2620         {
2621             int const piece = i * interval;
2622 
2623             if (isSeed || tr_torrentPieceIsComplete(tor, piece))
2624             {
2625                 tab[i] = -1;
2626             }
2627             else if (peerCount != 0)
2628             {
2629                 for (int j = 0; j < peerCount; ++j)
2630                 {
2631                     if (tr_bitfieldHas(&peers[j]->have, piece))
2632                     {
2633                         ++tab[i];
2634                     }
2635                 }
2636             }
2637         }
2638     }
2639 }
2640 
tr_swarmGetStats(tr_swarm const * swarm,tr_swarm_stats * setme)2641 void tr_swarmGetStats(tr_swarm const* swarm, tr_swarm_stats* setme)
2642 {
2643     TR_ASSERT(swarm != NULL);
2644     TR_ASSERT(setme != NULL);
2645 
2646     *setme = swarm->stats;
2647 }
2648 
tr_swarmIncrementActivePeers(tr_swarm * swarm,tr_direction direction,bool is_active)2649 void tr_swarmIncrementActivePeers(tr_swarm* swarm, tr_direction direction, bool is_active)
2650 {
2651     int n = swarm->stats.activePeerCount[direction];
2652 
2653     if (is_active)
2654     {
2655         ++n;
2656     }
2657     else
2658     {
2659         --n;
2660     }
2661 
2662     TR_ASSERT(n >= 0);
2663     TR_ASSERT(n <= swarm->stats.peerCount);
2664 
2665     swarm->stats.activePeerCount[direction] = n;
2666 }
2667 
tr_peerIsSeed(tr_peer const * peer)2668 bool tr_peerIsSeed(tr_peer const* peer)
2669 {
2670     if (peer->progress >= 1.0)
2671     {
2672         return true;
2673     }
2674 
2675     if (peer->atom != NULL && atomIsSeed(peer->atom))
2676     {
2677         return true;
2678     }
2679 
2680     return false;
2681 }
2682 
2683 /* count how many bytes we want that connected peers have */
tr_peerMgrGetDesiredAvailable(tr_torrent const * tor)2684 uint64_t tr_peerMgrGetDesiredAvailable(tr_torrent const* tor)
2685 {
2686     TR_ASSERT(tr_isTorrent(tor));
2687 
2688     /* common shortcuts... */
2689 
2690     if (!tor->isRunning || tor->isStopping || tr_torrentIsSeed(tor) || !tr_torrentHasMetadata(tor))
2691     {
2692         return 0;
2693     }
2694 
2695     tr_swarm const* s = tor->swarm;
2696 
2697     if (s == NULL || !s->isRunning)
2698     {
2699         return 0;
2700     }
2701 
2702     size_t const n = tr_ptrArraySize(&s->peers);
2703 
2704     if (n == 0)
2705     {
2706         return 0;
2707     }
2708     else
2709     {
2710         tr_peer const** peers = (tr_peer const**)tr_ptrArrayBase(&s->peers);
2711 
2712         for (size_t i = 0; i < n; ++i)
2713         {
2714             if (peers[i]->atom != NULL && atomIsSeed(peers[i]->atom))
2715             {
2716                 return tr_torrentGetLeftUntilDone(tor);
2717             }
2718         }
2719     }
2720 
2721     if (s->pieceReplication == NULL || s->pieceReplicationSize == 0)
2722     {
2723         return 0;
2724     }
2725 
2726     /* do it the hard way */
2727 
2728     uint64_t desiredAvailable = 0;
2729 
2730     for (size_t i = 0, n = MIN(tor->info.pieceCount, s->pieceReplicationSize); i < n; ++i)
2731     {
2732         if (!tor->info.pieces[i].dnd && s->pieceReplication[i] > 0)
2733         {
2734             desiredAvailable += tr_torrentMissingBytesInPiece(tor, i);
2735         }
2736     }
2737 
2738     TR_ASSERT(desiredAvailable <= tor->info.totalSize);
2739     return desiredAvailable;
2740 }
2741 
tr_peerMgrWebSpeeds_KBps(tr_torrent const * tor)2742 double* tr_peerMgrWebSpeeds_KBps(tr_torrent const* tor)
2743 {
2744     TR_ASSERT(tr_isTorrent(tor));
2745 
2746     uint64_t const now = tr_time_msec();
2747 
2748     tr_swarm* s = tor->swarm;
2749     TR_ASSERT(s->manager != NULL);
2750 
2751     unsigned int n = tr_ptrArraySize(&s->webseeds);
2752     TR_ASSERT(n == tor->info.webseedCount);
2753 
2754     double* ret = tr_new0(double, n);
2755 
2756     for (unsigned int i = 0; i < n; ++i)
2757     {
2758         unsigned int Bps = 0;
2759 
2760         if (tr_peerIsTransferringPieces(tr_ptrArrayNth(&s->webseeds, i), now, TR_DOWN, &Bps))
2761         {
2762             ret[i] = Bps / (double)tr_speed_K;
2763         }
2764         else
2765         {
2766             ret[i] = -1.0;
2767         }
2768     }
2769 
2770     return ret;
2771 }
2772 
tr_peerMgrPeerStats(tr_torrent const * tor,int * setmeCount)2773 struct tr_peer_stat* tr_peerMgrPeerStats(tr_torrent const* tor, int* setmeCount)
2774 {
2775     TR_ASSERT(tr_isTorrent(tor));
2776     TR_ASSERT(tor->swarm->manager != NULL);
2777 
2778     time_t const now = tr_time();
2779     uint64_t const now_msec = tr_time_msec();
2780 
2781     tr_swarm const* s = tor->swarm;
2782     tr_peer** peers = (tr_peer**)tr_ptrArrayBase(&s->peers);
2783     int size = tr_ptrArraySize(&s->peers);
2784     tr_peer_stat* ret = tr_new0(tr_peer_stat, size);
2785 
2786     for (int i = 0; i < size; ++i)
2787     {
2788         char* pch;
2789         tr_peer* peer = peers[i];
2790         tr_peerMsgs* msgs = PEER_MSGS(peer);
2791         struct peer_atom const* atom = peer->atom;
2792         tr_peer_stat* stat = ret + i;
2793 
2794         tr_address_to_string_with_buf(&atom->addr, stat->addr, sizeof(stat->addr));
2795         tr_strlcpy(stat->client, tr_quark_get_string(peer->client, NULL), sizeof(stat->client));
2796         stat->port = ntohs(peer->atom->port);
2797         stat->from = atom->fromFirst;
2798         stat->progress = peer->progress;
2799         stat->isUTP = tr_peerMsgsIsUtpConnection(msgs);
2800         stat->isEncrypted = tr_peerMsgsIsEncrypted(msgs);
2801         stat->rateToPeer_KBps = toSpeedKBps(tr_peerGetPieceSpeed_Bps(peer, now_msec, TR_CLIENT_TO_PEER));
2802         stat->rateToClient_KBps = toSpeedKBps(tr_peerGetPieceSpeed_Bps(peer, now_msec, TR_PEER_TO_CLIENT));
2803         stat->peerIsChoked = tr_peerMsgsIsPeerChoked(msgs);
2804         stat->peerIsInterested = tr_peerMsgsIsPeerInterested(msgs);
2805         stat->clientIsChoked = tr_peerMsgsIsClientChoked(msgs);
2806         stat->clientIsInterested = tr_peerMsgsIsClientInterested(msgs);
2807         stat->isIncoming = tr_peerMsgsIsIncomingConnection(msgs);
2808         stat->isDownloadingFrom = tr_peerMsgsIsActive(msgs, TR_PEER_TO_CLIENT);
2809         stat->isUploadingTo = tr_peerMsgsIsActive(msgs, TR_CLIENT_TO_PEER);
2810         stat->isSeed = tr_peerIsSeed(peer);
2811 
2812         stat->blocksToPeer = tr_historyGet(&peer->blocksSentToPeer, now, CANCEL_HISTORY_SEC);
2813         stat->blocksToClient = tr_historyGet(&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2814         stat->cancelsToPeer = tr_historyGet(&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2815         stat->cancelsToClient = tr_historyGet(&peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC);
2816 
2817         stat->pendingReqsToPeer = peer->pendingReqsToPeer;
2818         stat->pendingReqsToClient = peer->pendingReqsToClient;
2819 
2820         pch = stat->flagStr;
2821 
2822         if (stat->isUTP)
2823         {
2824             *pch++ = 'T';
2825         }
2826 
2827         if (s->optimistic == msgs)
2828         {
2829             *pch++ = 'O';
2830         }
2831 
2832         if (stat->isDownloadingFrom)
2833         {
2834             *pch++ = 'D';
2835         }
2836         else if (stat->clientIsInterested)
2837         {
2838             *pch++ = 'd';
2839         }
2840 
2841         if (stat->isUploadingTo)
2842         {
2843             *pch++ = 'U';
2844         }
2845         else if (stat->peerIsInterested)
2846         {
2847             *pch++ = 'u';
2848         }
2849 
2850         if (!stat->clientIsChoked && !stat->clientIsInterested)
2851         {
2852             *pch++ = 'K';
2853         }
2854 
2855         if (!stat->peerIsChoked && !stat->peerIsInterested)
2856         {
2857             *pch++ = '?';
2858         }
2859 
2860         if (stat->isEncrypted)
2861         {
2862             *pch++ = 'E';
2863         }
2864 
2865         if (stat->from == TR_PEER_FROM_DHT)
2866         {
2867             *pch++ = 'H';
2868         }
2869         else if (stat->from == TR_PEER_FROM_PEX)
2870         {
2871             *pch++ = 'X';
2872         }
2873 
2874         if (stat->isIncoming)
2875         {
2876             *pch++ = 'I';
2877         }
2878 
2879         *pch = '\0';
2880     }
2881 
2882     *setmeCount = size;
2883     return ret;
2884 }
2885 
2886 /***
2887 ****
2888 ****
2889 ***/
2890 
tr_peerMgrClearInterest(tr_torrent * tor)2891 void tr_peerMgrClearInterest(tr_torrent* tor)
2892 {
2893     TR_ASSERT(tr_isTorrent(tor));
2894     TR_ASSERT(tr_torrentIsLocked(tor));
2895 
2896     tr_swarm* s = tor->swarm;
2897     int const peerCount = tr_ptrArraySize(&s->peers);
2898 
2899     for (int i = 0; i < peerCount; ++i)
2900     {
2901         tr_peerMsgsSetInterested(tr_ptrArrayNth(&s->peers, i), false);
2902     }
2903 }
2904 
2905 /* does this peer have any pieces that we want? */
isPeerInteresting(tr_torrent * const tor,bool const * const piece_is_interesting,tr_peer const * const peer)2906 static bool isPeerInteresting(tr_torrent* const tor, bool const* const piece_is_interesting, tr_peer const* const peer)
2907 {
2908     /* these cases should have already been handled by the calling code... */
2909     TR_ASSERT(!tr_torrentIsSeed(tor));
2910     TR_ASSERT(tr_torrentIsPieceTransferAllowed(tor, TR_PEER_TO_CLIENT));
2911 
2912     if (tr_peerIsSeed(peer))
2913     {
2914         return true;
2915     }
2916 
2917     for (tr_piece_index_t i = 0; i < tor->info.pieceCount; ++i)
2918     {
2919         if (piece_is_interesting[i] && tr_bitfieldHas(&peer->have, i))
2920         {
2921             return true;
2922         }
2923     }
2924 
2925     return false;
2926 }
2927 
2928 typedef enum
2929 {
2930     RECHOKE_STATE_GOOD,
2931     RECHOKE_STATE_UNTESTED,
2932     RECHOKE_STATE_BAD
2933 }
2934 tr_rechoke_state;
2935 
2936 struct tr_rechoke_info
2937 {
2938     tr_peer* peer;
2939     int salt;
2940     int rechoke_state;
2941 };
2942 
compare_rechoke_info(void const * va,void const * vb)2943 static int compare_rechoke_info(void const* va, void const* vb)
2944 {
2945     struct tr_rechoke_info const* a = va;
2946     struct tr_rechoke_info const* b = vb;
2947 
2948     if (a->rechoke_state != b->rechoke_state)
2949     {
2950         return a->rechoke_state - b->rechoke_state;
2951     }
2952 
2953     return a->salt - b->salt;
2954 }
2955 
2956 /* determines who we send "interested" messages to */
rechokeDownloads(tr_swarm * s)2957 static void rechokeDownloads(tr_swarm* s)
2958 {
2959     int maxPeers = 0;
2960     int rechoke_count = 0;
2961     struct tr_rechoke_info* rechoke = NULL;
2962     int const MIN_INTERESTING_PEERS = 5;
2963     int const peerCount = tr_ptrArraySize(&s->peers);
2964     time_t const now = tr_time();
2965 
2966     /* some cases where this function isn't necessary */
2967     if (tr_torrentIsSeed(s->tor))
2968     {
2969         return;
2970     }
2971 
2972     if (!tr_torrentIsPieceTransferAllowed(s->tor, TR_PEER_TO_CLIENT))
2973     {
2974         return;
2975     }
2976 
2977     /* decide HOW MANY peers to be interested in */
2978     {
2979         int blocks = 0;
2980         int cancels = 0;
2981         time_t timeSinceCancel;
2982 
2983         /* Count up how many blocks & cancels each peer has.
2984          *
2985          * There are two situations where we send out cancels --
2986          *
2987          * 1. We've got unresponsive peers, which is handled by deciding
2988          *    -which- peers to be interested in.
2989          *
2990          * 2. We've hit our bandwidth cap, which is handled by deciding
2991          *    -how many- peers to be interested in.
2992          *
2993          * We're working on 2. here, so we need to ignore unresponsive
2994          * peers in our calculations lest they confuse Transmission into
2995          * thinking it's hit its bandwidth cap.
2996          */
2997         for (int i = 0; i < peerCount; ++i)
2998         {
2999             tr_peer const* peer = tr_ptrArrayNth(&s->peers, i);
3000             int const b = tr_historyGet(&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
3001             int const c = tr_historyGet(&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
3002 
3003             if (b == 0) /* ignore unresponsive peers, as described above */
3004             {
3005                 continue;
3006             }
3007 
3008             blocks += b;
3009             cancels += c;
3010         }
3011 
3012         if (cancels > 0)
3013         {
3014             /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
3015              * higher values indicate more congestion. */
3016             double const cancelRate = cancels / (double)(cancels + blocks);
3017             double const mult = 1 - MIN(cancelRate, 0.5);
3018             maxPeers = s->interestedCount * mult;
3019             tordbg(s, "cancel rate is %.3f -- reducing the number of peers we're interested in by %.0f percent", cancelRate,
3020                 mult * 100);
3021             s->lastCancel = now;
3022         }
3023 
3024         timeSinceCancel = now - s->lastCancel;
3025 
3026         if (timeSinceCancel != 0)
3027         {
3028             int const maxIncrease = 15;
3029             time_t const maxHistory = 2 * CANCEL_HISTORY_SEC;
3030             double const mult = MIN(timeSinceCancel, maxHistory) / (double)maxHistory;
3031             int const inc = maxIncrease * mult;
3032             maxPeers = s->maxPeers + inc;
3033             tordbg(s, "time since last cancel is %jd -- increasing the number of peers we're interested in by %d",
3034                 (intmax_t)timeSinceCancel, inc);
3035         }
3036     }
3037 
3038     /* don't let the previous section's number tweaking go too far... */
3039     if (maxPeers < MIN_INTERESTING_PEERS)
3040     {
3041         maxPeers = MIN_INTERESTING_PEERS;
3042     }
3043 
3044     if (maxPeers > s->tor->maxConnectedPeers)
3045     {
3046         maxPeers = s->tor->maxConnectedPeers;
3047     }
3048 
3049     s->maxPeers = maxPeers;
3050 
3051     if (peerCount > 0)
3052     {
3053         bool* piece_is_interesting;
3054         tr_torrent const* const tor = s->tor;
3055         int const n = tor->info.pieceCount;
3056 
3057         /* build a bitfield of interesting pieces... */
3058         piece_is_interesting = tr_new(bool, n);
3059 
3060         for (int i = 0; i < n; ++i)
3061         {
3062             piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_torrentPieceIsComplete(tor, i);
3063         }
3064 
3065         /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
3066         for (int i = 0; i < peerCount; ++i)
3067         {
3068             tr_peer* peer = tr_ptrArrayNth(&s->peers, i);
3069 
3070             if (!isPeerInteresting(s->tor, piece_is_interesting, peer))
3071             {
3072                 tr_peerMsgsSetInterested(PEER_MSGS(peer), false);
3073             }
3074             else
3075             {
3076                 tr_rechoke_state rechoke_state;
3077                 int const blocks = tr_historyGet(&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
3078                 int const cancels = tr_historyGet(&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
3079 
3080                 if (blocks == 0 && cancels == 0)
3081                 {
3082                     rechoke_state = RECHOKE_STATE_UNTESTED;
3083                 }
3084                 else if (cancels == 0)
3085                 {
3086                     rechoke_state = RECHOKE_STATE_GOOD;
3087                 }
3088                 else if (blocks == 0)
3089                 {
3090                     rechoke_state = RECHOKE_STATE_BAD;
3091                 }
3092                 else if (cancels * 10 < blocks)
3093                 {
3094                     rechoke_state = RECHOKE_STATE_GOOD;
3095                 }
3096                 else
3097                 {
3098                     rechoke_state = RECHOKE_STATE_BAD;
3099                 }
3100 
3101                 if (rechoke == NULL)
3102                 {
3103                     rechoke = tr_new(struct tr_rechoke_info, peerCount);
3104                 }
3105 
3106                 rechoke[rechoke_count].peer = peer;
3107                 rechoke[rechoke_count].rechoke_state = rechoke_state;
3108                 rechoke[rechoke_count].salt = tr_rand_int_weak(INT_MAX);
3109                 rechoke_count++;
3110             }
3111         }
3112 
3113         tr_free(piece_is_interesting);
3114     }
3115 
3116     /* now that we know which & how many peers to be interested in... update the peer interest */
3117     qsort(rechoke, rechoke_count, sizeof(struct tr_rechoke_info), compare_rechoke_info);
3118     s->interestedCount = MIN(maxPeers, rechoke_count);
3119 
3120     for (int i = 0; i < rechoke_count; ++i)
3121     {
3122         tr_peerMsgsSetInterested(PEER_MSGS(rechoke[i].peer), i < s->interestedCount);
3123     }
3124 
3125     /* cleanup */
3126     tr_free(rechoke);
3127 }
3128 
3129 /**
3130 ***
3131 **/
3132 
3133 struct ChokeData
3134 {
3135     bool isInterested;
3136     bool wasChoked;
3137     bool isChoked;
3138     int rate;
3139     int salt;
3140     tr_peerMsgs* msgs;
3141 };
3142 
compareChoke(void const * va,void const * vb)3143 static int compareChoke(void const* va, void const* vb)
3144 {
3145     struct ChokeData const* a = va;
3146     struct ChokeData const* b = vb;
3147 
3148     if (a->rate != b->rate) /* prefer higher overall speeds */
3149     {
3150         return a->rate > b->rate ? -1 : 1;
3151     }
3152 
3153     if (a->wasChoked != b->wasChoked) /* prefer unchoked */
3154     {
3155         return a->wasChoked ? 1 : -1;
3156     }
3157 
3158     if (a->salt != b->salt) /* random order */
3159     {
3160         return a->salt - b->salt;
3161     }
3162 
3163     return 0;
3164 }
3165 
3166 /* is this a new connection? */
isNew(tr_peerMsgs const * msgs)3167 static bool isNew(tr_peerMsgs const* msgs)
3168 {
3169     return msgs != NULL && tr_peerMsgsGetConnectionAge(msgs) < 45;
3170 }
3171 
3172 /* get a rate for deciding which peers to choke and unchoke. */
getRate(tr_torrent const * tor,struct peer_atom * atom,uint64_t now)3173 static int getRate(tr_torrent const* tor, struct peer_atom* atom, uint64_t now)
3174 {
3175     unsigned int Bps;
3176 
3177     if (tr_torrentIsSeed(tor))
3178     {
3179         Bps = tr_peerGetPieceSpeed_Bps(atom->peer, now, TR_CLIENT_TO_PEER);
3180     }
3181     /* downloading a private torrent... take upload speed into account
3182      * because there may only be a small window of opportunity to share */
3183     else if (tr_torrentIsPrivate(tor))
3184     {
3185         Bps = tr_peerGetPieceSpeed_Bps(atom->peer, now, TR_PEER_TO_CLIENT) + tr_peerGetPieceSpeed_Bps(atom->peer, now,
3186             TR_CLIENT_TO_PEER);
3187     }
3188     /* downloading a public torrent */
3189     else
3190     {
3191         Bps = tr_peerGetPieceSpeed_Bps(atom->peer, now, TR_PEER_TO_CLIENT);
3192     }
3193 
3194     /* convert it to bytes per second */
3195     return Bps;
3196 }
3197 
isBandwidthMaxedOut(tr_bandwidth const * b,uint64_t const now_msec,tr_direction dir)3198 static inline bool isBandwidthMaxedOut(tr_bandwidth const* b, uint64_t const now_msec, tr_direction dir)
3199 {
3200     if (!tr_bandwidthIsLimited(b, dir))
3201     {
3202         return false;
3203     }
3204     else
3205     {
3206         unsigned int const got = tr_bandwidthGetPieceSpeed_Bps(b, now_msec, dir);
3207         unsigned int const want = tr_bandwidthGetDesiredSpeed_Bps(b, dir);
3208         return got >= want;
3209     }
3210 }
3211 
rechokeUploads(tr_swarm * s,uint64_t const now)3212 static void rechokeUploads(tr_swarm* s, uint64_t const now)
3213 {
3214     TR_ASSERT(swarmIsLocked(s));
3215 
3216     int const peerCount = tr_ptrArraySize(&s->peers);
3217     tr_peer** peers = (tr_peer**)tr_ptrArrayBase(&s->peers);
3218     struct ChokeData* choke = tr_new0(struct ChokeData, peerCount);
3219     tr_session const* session = s->manager->session;
3220     bool const chokeAll = !tr_torrentIsPieceTransferAllowed(s->tor, TR_CLIENT_TO_PEER);
3221     bool const isMaxedOut = isBandwidthMaxedOut(&s->tor->bandwidth, now, TR_UP);
3222 
3223     /* an optimistic unchoke peer's "optimistic"
3224      * state lasts for N calls to rechokeUploads(). */
3225     if (s->optimisticUnchokeTimeScaler > 0)
3226     {
3227         s->optimisticUnchokeTimeScaler--;
3228     }
3229     else
3230     {
3231         s->optimistic = NULL;
3232     }
3233 
3234     int size = 0;
3235 
3236     /* sort the peers by preference and rate */
3237     for (int i = 0; i < peerCount; ++i)
3238     {
3239         tr_peer* peer = peers[i];
3240         tr_peerMsgs* msgs = PEER_MSGS(peer);
3241 
3242         struct peer_atom* atom = peer->atom;
3243 
3244         if (tr_peerIsSeed(peer))
3245         {
3246             /* choke seeds and partial seeds */
3247             tr_peerMsgsSetChoke(PEER_MSGS(peer), true);
3248         }
3249         else if (chokeAll)
3250         {
3251             /* choke everyone if we're not uploading */
3252             tr_peerMsgsSetChoke(PEER_MSGS(peer), true);
3253         }
3254         else if (msgs != s->optimistic)
3255         {
3256             struct ChokeData* n = &choke[size++];
3257             n->msgs = msgs;
3258             n->isInterested = tr_peerMsgsIsPeerInterested(msgs);
3259             n->wasChoked = tr_peerMsgsIsPeerChoked(msgs);
3260             n->rate = getRate(s->tor, atom, now);
3261             n->salt = tr_rand_int_weak(INT_MAX);
3262             n->isChoked = true;
3263         }
3264     }
3265 
3266     qsort(choke, size, sizeof(struct ChokeData), compareChoke);
3267 
3268     /**
3269      * Reciprocation and number of uploads capping is managed by unchoking
3270      * the N peers which have the best upload rate and are interested.
3271      * This maximizes the client's download rate. These N peers are
3272      * referred to as downloaders, because they are interested in downloading
3273      * from the client.
3274      *
3275      * Peers which have a better upload rate (as compared to the downloaders)
3276      * but aren't interested get unchoked. If they become interested, the
3277      * downloader with the worst upload rate gets choked. If a client has
3278      * a complete file, it uses its upload rate rather than its download
3279      * rate to decide which peers to unchoke.
3280      *
3281      * If our bandwidth is maxed out, don't unchoke any more peers.
3282      */
3283     int unchokedInterested = 0;
3284     int checkedChokeCount = 0;
3285 
3286     for (int i = 0; i < size && unchokedInterested < session->uploadSlotsPerTorrent; ++i, ++checkedChokeCount)
3287     {
3288         choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3289 
3290         if (choke[i].isInterested)
3291         {
3292             ++unchokedInterested;
3293         }
3294     }
3295 
3296     /* optimistic unchoke */
3297     if (s->optimistic == NULL && !isMaxedOut && checkedChokeCount < size)
3298     {
3299         int n;
3300         struct ChokeData* c;
3301         tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3302 
3303         for (int i = checkedChokeCount; i < size; ++i)
3304         {
3305             if (choke[i].isInterested)
3306             {
3307                 tr_peerMsgs const* msgs = choke[i].msgs;
3308                 int const x = isNew(msgs) ? 3 : 1;
3309 
3310                 for (int y = 0; y < x; ++y)
3311                 {
3312                     tr_ptrArrayAppend(&randPool, &choke[i]);
3313                 }
3314             }
3315         }
3316 
3317         if ((n = tr_ptrArraySize(&randPool)) != 0)
3318         {
3319             c = tr_ptrArrayNth(&randPool, tr_rand_int_weak(n));
3320             c->isChoked = false;
3321             s->optimistic = c->msgs;
3322             s->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3323         }
3324 
3325         tr_ptrArrayDestruct(&randPool, NULL);
3326     }
3327 
3328     for (int i = 0; i < size; ++i)
3329     {
3330         tr_peerMsgsSetChoke(choke[i].msgs, choke[i].isChoked);
3331     }
3332 
3333     /* cleanup */
3334     tr_free(choke);
3335 }
3336 
rechokePulse(evutil_socket_t foo UNUSED,short bar UNUSED,void * vmgr)3337 static void rechokePulse(evutil_socket_t foo UNUSED, short bar UNUSED, void* vmgr)
3338 {
3339     tr_torrent* tor = NULL;
3340     tr_peerMgr* mgr = vmgr;
3341     uint64_t const now = tr_time_msec();
3342 
3343     managerLock(mgr);
3344 
3345     while ((tor = tr_torrentNext(mgr->session, tor)) != NULL)
3346     {
3347         if (tor->isRunning)
3348         {
3349             tr_swarm* s = tor->swarm;
3350 
3351             if (s->stats.peerCount > 0)
3352             {
3353                 rechokeUploads(s, now);
3354                 rechokeDownloads(s);
3355             }
3356         }
3357     }
3358 
3359     tr_timerAddMsec(mgr->rechokeTimer, RECHOKE_PERIOD_MSEC);
3360     managerUnlock(mgr);
3361 }
3362 
3363 /***
3364 ****
3365 ****  Life and Death
3366 ****
3367 ***/
3368 
shouldPeerBeClosed(tr_swarm const * s,tr_peer const * peer,int peerCount,time_t const now)3369 static bool shouldPeerBeClosed(tr_swarm const* s, tr_peer const* peer, int peerCount, time_t const now)
3370 {
3371     tr_torrent const* tor = s->tor;
3372     struct peer_atom const* atom = peer->atom;
3373 
3374     /* if it's marked for purging, close it */
3375     if (peer->doPurge)
3376     {
3377         tordbg(s, "purging peer %s because its doPurge flag is set", tr_atomAddrStr(atom));
3378         return true;
3379     }
3380 
3381     /* disconnect if we're both seeds and enough time has passed for PEX */
3382     if (tr_torrentIsSeed(tor) && tr_peerIsSeed(peer))
3383     {
3384         return !tr_torrentAllowsPex(tor) || now - atom->time >= 30;
3385     }
3386 
3387     /* disconnect if it's been too long since piece data has been transferred.
3388      * this is on a sliding scale based on number of available peers... */
3389     {
3390         int const relaxStrictnessIfFewerThanN = (int)(getMaxPeerCount(tor) * 0.9 + 0.5);
3391         /* if we have >= relaxIfFewerThan, strictness is 100%.
3392          * if we have zero connections, strictness is 0% */
3393         float const strictness = peerCount >= relaxStrictnessIfFewerThanN ? 1.0 :
3394             peerCount / (float)relaxStrictnessIfFewerThanN;
3395         int const lo = MIN_UPLOAD_IDLE_SECS;
3396         int const hi = MAX_UPLOAD_IDLE_SECS;
3397         int const limit = hi - (hi - lo) * strictness;
3398         int const idleTime = now - MAX(atom->time, atom->piece_data_time);
3399 
3400         /*
3401         fprintf(stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... "
3402             "idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time),
3403             (int)(now - atom->piece_data_time), idleTime, idleTime > limit);
3404         */
3405 
3406         if (idleTime > limit)
3407         {
3408             tordbg(s, "purging peer %s because it's been %d secs since we shared anything", tr_atomAddrStr(atom), idleTime);
3409             return true;
3410         }
3411     }
3412 
3413     return false;
3414 }
3415 
getPeersToClose(tr_swarm * s,time_t const now_sec,int * setmeSize)3416 static tr_peer** getPeersToClose(tr_swarm* s, time_t const now_sec, int* setmeSize)
3417 {
3418     TR_ASSERT(swarmIsLocked(s));
3419 
3420     int peerCount;
3421     int outsize = 0;
3422     struct tr_peer** ret = NULL;
3423     tr_peer** peers = (tr_peer**)tr_ptrArrayPeek(&s->peers, &peerCount);
3424 
3425     for (int i = 0; i < peerCount; ++i)
3426     {
3427         if (shouldPeerBeClosed(s, peers[i], peerCount, now_sec))
3428         {
3429             if (ret == NULL)
3430             {
3431                 ret = tr_new(tr_peer*, peerCount);
3432             }
3433 
3434             ret[outsize++] = peers[i];
3435         }
3436     }
3437 
3438     *setmeSize = outsize;
3439     return ret;
3440 }
3441 
getReconnectIntervalSecs(struct peer_atom const * atom,time_t const now)3442 static int getReconnectIntervalSecs(struct peer_atom const* atom, time_t const now)
3443 {
3444     int sec;
3445     bool const unreachable = (atom->flags2 & MYFLAG_UNREACHABLE) != 0;
3446 
3447     /* if we were recently connected to this peer and transferring piece
3448      * data, try to reconnect to them sooner rather that later -- we don't
3449      * want network troubles to get in the way of a good peer. */
3450     if (!unreachable && now - atom->piece_data_time <= MINIMUM_RECONNECT_INTERVAL_SECS * 2)
3451     {
3452         sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3453     }
3454     /* otherwise, the interval depends on how many times we've tried
3455      * and failed to connect to the peer */
3456     else
3457     {
3458         int step = atom->numFails;
3459 
3460         /* penalize peers that were unreachable the last time we tried */
3461         if (unreachable)
3462         {
3463             step += 2;
3464         }
3465 
3466         switch (step)
3467         {
3468         case 0:
3469             sec = 0;
3470             break;
3471 
3472         case 1:
3473             sec = 10;
3474             break;
3475 
3476         case 2:
3477             sec = 60 * 2;
3478             break;
3479 
3480         case 3:
3481             sec = 60 * 15;
3482             break;
3483 
3484         case 4:
3485             sec = 60 * 30;
3486             break;
3487 
3488         case 5:
3489             sec = 60 * 60;
3490             break;
3491 
3492         default:
3493             sec = 60 * 120;
3494             break;
3495         }
3496     }
3497 
3498     dbgmsg("reconnect interval for %s is %d seconds", tr_atomAddrStr(atom), sec);
3499     return sec;
3500 }
3501 
removePeer(tr_swarm * s,tr_peer * peer)3502 static void removePeer(tr_swarm* s, tr_peer* peer)
3503 {
3504     TR_ASSERT(swarmIsLocked(s));
3505 
3506     struct peer_atom* atom = peer->atom;
3507     TR_ASSERT(atom != NULL);
3508 
3509     atom->time = tr_time();
3510 
3511     tr_ptrArrayRemoveSortedPointer(&s->peers, peer, peerCompare);
3512     --s->stats.peerCount;
3513     --s->stats.peerFromCount[atom->fromFirst];
3514 
3515     if (replicationExists(s))
3516     {
3517         tr_decrReplicationFromBitfield(s, &peer->have);
3518     }
3519 
3520     TR_ASSERT(s->stats.peerCount == tr_ptrArraySize(&s->peers));
3521     TR_ASSERT(s->stats.peerFromCount[atom->fromFirst] >= 0);
3522 
3523     tr_peerFree(peer);
3524 }
3525 
closePeer(tr_swarm * s,tr_peer * peer)3526 static void closePeer(tr_swarm* s, tr_peer* peer)
3527 {
3528     TR_ASSERT(s != NULL);
3529     TR_ASSERT(peer != NULL);
3530 
3531     struct peer_atom* atom = peer->atom;
3532 
3533     /* if we transferred piece data, then they might be good peers,
3534        so reset their `numFails' weight to zero. otherwise we connected
3535        to them fruitlessly, so mark it as another fail */
3536     if (atom->piece_data_time != 0)
3537     {
3538         tordbg(s, "resetting atom %s numFails to 0", tr_atomAddrStr(atom));
3539         atom->numFails = 0;
3540     }
3541     else
3542     {
3543         ++atom->numFails;
3544         tordbg(s, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails);
3545     }
3546 
3547     tordbg(s, "removing bad peer %s", tr_atomAddrStr(peer->atom));
3548     removePeer(s, peer);
3549 }
3550 
removeAllPeers(tr_swarm * s)3551 static void removeAllPeers(tr_swarm* s)
3552 {
3553     while (!tr_ptrArrayEmpty(&s->peers))
3554     {
3555         removePeer(s, tr_ptrArrayNth(&s->peers, 0));
3556     }
3557 
3558     TR_ASSERT(s->stats.peerCount == 0);
3559 }
3560 
closeBadPeers(tr_swarm * s,time_t const now_sec)3561 static void closeBadPeers(tr_swarm* s, time_t const now_sec)
3562 {
3563     if (!tr_ptrArrayEmpty(&s->peers))
3564     {
3565         int peerCount;
3566         struct tr_peer** peers;
3567 
3568         peers = getPeersToClose(s, now_sec, &peerCount);
3569 
3570         for (int i = 0; i < peerCount; ++i)
3571         {
3572             closePeer(s, peers[i]);
3573         }
3574 
3575         tr_free(peers);
3576     }
3577 }
3578 
3579 struct peer_liveliness
3580 {
3581     tr_peer* peer;
3582     void* clientData;
3583     time_t pieceDataTime;
3584     time_t time;
3585     unsigned int speed;
3586     bool doPurge;
3587 };
3588 
comparePeerLiveliness(void const * va,void const * vb)3589 static int comparePeerLiveliness(void const* va, void const* vb)
3590 {
3591     struct peer_liveliness const* a = va;
3592     struct peer_liveliness const* b = vb;
3593 
3594     if (a->doPurge != b->doPurge)
3595     {
3596         return a->doPurge ? 1 : -1;
3597     }
3598 
3599     if (a->speed != b->speed) /* faster goes first */
3600     {
3601         return a->speed > b->speed ? -1 : 1;
3602     }
3603 
3604     /* the one to give us data more recently goes first */
3605     if (a->pieceDataTime != b->pieceDataTime)
3606     {
3607         return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3608     }
3609 
3610     /* the one we connected to most recently goes first */
3611     if (a->time != b->time)
3612     {
3613         return a->time > b->time ? -1 : 1;
3614     }
3615 
3616     return 0;
3617 }
3618 
sortPeersByLivelinessImpl(tr_peer ** peers,void ** clientData,int n,uint64_t now,tr_voidptr_compare_func compare)3619 static void sortPeersByLivelinessImpl(tr_peer** peers, void** clientData, int n, uint64_t now, tr_voidptr_compare_func compare)
3620 {
3621     struct peer_liveliness* lives;
3622     struct peer_liveliness* l;
3623 
3624     /* build a sortable array of peer + extra info */
3625     lives = tr_new0(struct peer_liveliness, n);
3626     l = lives;
3627 
3628     for (int i = 0; i < n; ++i, ++l)
3629     {
3630         tr_peer* p = peers[i];
3631         l->peer = p;
3632         l->doPurge = p->doPurge;
3633         l->pieceDataTime = p->atom->piece_data_time;
3634         l->time = p->atom->time;
3635         l->speed = tr_peerGetPieceSpeed_Bps(p, now, TR_UP) + tr_peerGetPieceSpeed_Bps(p, now, TR_DOWN);
3636 
3637         if (clientData != NULL)
3638         {
3639             l->clientData = clientData[i];
3640         }
3641     }
3642 
3643     /* sort 'em */
3644     TR_ASSERT(n == l - lives);
3645     qsort(lives, n, sizeof(struct peer_liveliness), compare);
3646 
3647     l = lives;
3648 
3649     /* build the peer array */
3650     for (int i = 0; i < n; ++i, ++l)
3651     {
3652         peers[i] = l->peer;
3653 
3654         if (clientData != NULL)
3655         {
3656             clientData[i] = l->clientData;
3657         }
3658     }
3659 
3660     TR_ASSERT(n == l - lives);
3661 
3662     /* cleanup */
3663     tr_free(lives);
3664 }
3665 
sortPeersByLiveliness(tr_peer ** peers,void ** clientData,int n,uint64_t now)3666 static void sortPeersByLiveliness(tr_peer** peers, void** clientData, int n, uint64_t now)
3667 {
3668     sortPeersByLivelinessImpl(peers, clientData, n, now, comparePeerLiveliness);
3669 }
3670 
enforceTorrentPeerLimit(tr_swarm * s,uint64_t now)3671 static void enforceTorrentPeerLimit(tr_swarm* s, uint64_t now)
3672 {
3673     int n = tr_ptrArraySize(&s->peers);
3674     int const max = tr_torrentGetPeerLimit(s->tor);
3675 
3676     if (n > max)
3677     {
3678         void* base = tr_ptrArrayBase(&s->peers);
3679         tr_peer** peers = tr_memdup(base, n * sizeof(tr_peer*));
3680         sortPeersByLiveliness(peers, NULL, n, now);
3681 
3682         while (n > max)
3683         {
3684             closePeer(s, peers[--n]);
3685         }
3686 
3687         tr_free(peers);
3688     }
3689 }
3690 
enforceSessionPeerLimit(tr_session * session,uint64_t now)3691 static void enforceSessionPeerLimit(tr_session* session, uint64_t now)
3692 {
3693     int n = 0;
3694     tr_torrent* tor = NULL;
3695     int const max = tr_sessionGetPeerLimit(session);
3696 
3697     /* count the total number of peers */
3698     while ((tor = tr_torrentNext(session, tor)) != NULL)
3699     {
3700         n += tr_ptrArraySize(&tor->swarm->peers);
3701     }
3702 
3703     /* if there are too many, prune out the worst */
3704     if (n > max)
3705     {
3706         tr_peer** peers = tr_new(tr_peer*, n);
3707         tr_swarm** swarms = tr_new(tr_swarm*, n);
3708 
3709         /* populate the peer array */
3710         n = 0;
3711         tor = NULL;
3712 
3713         while ((tor = tr_torrentNext(session, tor)) != NULL)
3714         {
3715             tr_swarm* s = tor->swarm;
3716 
3717             for (int i = 0, tn = tr_ptrArraySize(&s->peers); i < tn; ++i, ++n)
3718             {
3719                 peers[n] = tr_ptrArrayNth(&s->peers, i);
3720                 swarms[n] = s;
3721             }
3722         }
3723 
3724         /* sort 'em */
3725         sortPeersByLiveliness(peers, (void**)swarms, n, now);
3726 
3727         /* cull out the crappiest */
3728         while (n-- > max)
3729         {
3730             closePeer(swarms[n], peers[n]);
3731         }
3732 
3733         /* cleanup */
3734         tr_free(swarms);
3735         tr_free(peers);
3736     }
3737 }
3738 
3739 static void makeNewPeerConnections(tr_peerMgr* mgr, int const max);
3740 
reconnectPulse(evutil_socket_t foo UNUSED,short bar UNUSED,void * vmgr)3741 static void reconnectPulse(evutil_socket_t foo UNUSED, short bar UNUSED, void* vmgr)
3742 {
3743     tr_torrent* tor;
3744     tr_peerMgr* mgr = vmgr;
3745     time_t const now_sec = tr_time();
3746     uint64_t const now_msec = tr_time_msec();
3747 
3748     /**
3749     ***  enforce the per-session and per-torrent peer limits
3750     **/
3751 
3752     /* if we're over the per-torrent peer limits, cull some peers */
3753     tor = NULL;
3754 
3755     while ((tor = tr_torrentNext(mgr->session, tor)) != NULL)
3756     {
3757         if (tor->isRunning)
3758         {
3759             enforceTorrentPeerLimit(tor->swarm, now_msec);
3760         }
3761     }
3762 
3763     /* if we're over the per-session peer limits, cull some peers */
3764     enforceSessionPeerLimit(mgr->session, now_msec);
3765 
3766     /* remove crappy peers */
3767     tor = NULL;
3768 
3769     while ((tor = tr_torrentNext(mgr->session, tor)) != NULL)
3770     {
3771         if (!tor->swarm->isRunning)
3772         {
3773             removeAllPeers(tor->swarm);
3774         }
3775         else
3776         {
3777             closeBadPeers(tor->swarm, now_sec);
3778         }
3779     }
3780 
3781     /* try to make new peer connections */
3782     makeNewPeerConnections(mgr, MAX_CONNECTIONS_PER_PULSE);
3783 }
3784 
3785 /****
3786 *****
3787 *****  BANDWIDTH ALLOCATION
3788 *****
3789 ****/
3790 
pumpAllPeers(tr_peerMgr * mgr)3791 static void pumpAllPeers(tr_peerMgr* mgr)
3792 {
3793     tr_torrent* tor = NULL;
3794 
3795     while ((tor = tr_torrentNext(mgr->session, tor)) != NULL)
3796     {
3797         tr_swarm* s = tor->swarm;
3798 
3799         for (int j = 0, n = tr_ptrArraySize(&s->peers); j < n; ++j)
3800         {
3801             tr_peerMsgsPulse(tr_ptrArrayNth(&s->peers, j));
3802         }
3803     }
3804 }
3805 
queuePulseForeach(void * vtor)3806 static void queuePulseForeach(void* vtor)
3807 {
3808     tr_torrent* tor = vtor;
3809 
3810     tr_torrentStartNow(tor);
3811 
3812     if (tor->queue_started_callback != NULL)
3813     {
3814         (*tor->queue_started_callback)(tor, tor->queue_started_user_data);
3815     }
3816 }
3817 
queuePulse(tr_session * session,tr_direction dir)3818 static void queuePulse(tr_session* session, tr_direction dir)
3819 {
3820     TR_ASSERT(tr_isSession(session));
3821     TR_ASSERT(tr_isDirection(dir));
3822 
3823     if (tr_sessionGetQueueEnabled(session, dir))
3824     {
3825         tr_ptrArray torrents = TR_PTR_ARRAY_INIT;
3826 
3827         tr_sessionGetNextQueuedTorrents(session, dir, tr_sessionCountQueueFreeSlots(session, dir), &torrents);
3828 
3829         tr_ptrArrayForeach(&torrents, queuePulseForeach);
3830 
3831         tr_ptrArrayDestruct(&torrents, NULL);
3832     }
3833 }
3834 
bandwidthPulse(evutil_socket_t foo UNUSED,short bar UNUSED,void * vmgr)3835 static void bandwidthPulse(evutil_socket_t foo UNUSED, short bar UNUSED, void* vmgr)
3836 {
3837     tr_torrent* tor;
3838     tr_peerMgr* mgr = vmgr;
3839     tr_session* session = mgr->session;
3840     managerLock(mgr);
3841 
3842     /* FIXME: this next line probably isn't necessary... */
3843     pumpAllPeers(mgr);
3844 
3845     /* allocate bandwidth to the peers */
3846     tr_bandwidthAllocate(&session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC);
3847     tr_bandwidthAllocate(&session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC);
3848 
3849     /* torrent upkeep */
3850     tor = NULL;
3851 
3852     while ((tor = tr_torrentNext(session, tor)) != NULL)
3853     {
3854         /* possibly stop torrents that have seeded enough */
3855         tr_torrentCheckSeedLimit(tor);
3856 
3857         /* run the completeness check for any torrents that need it */
3858         if (tor->swarm->needsCompletenessCheck)
3859         {
3860             tor->swarm->needsCompletenessCheck = false;
3861             tr_torrentRecheckCompleteness(tor);
3862         }
3863 
3864         /* stop torrents that are ready to stop, but couldn't be stopped
3865            earlier during the peer-io callback call chain */
3866         if (tor->isStopping)
3867         {
3868             tr_torrentStop(tor);
3869         }
3870 
3871         /* update the torrent's stats */
3872         tor->swarm->stats.activeWebseedCount = countActiveWebseeds(tor->swarm);
3873     }
3874 
3875     /* pump the queues */
3876     queuePulse(session, TR_UP);
3877     queuePulse(session, TR_DOWN);
3878 
3879     reconnectPulse(0, 0, mgr);
3880 
3881     tr_timerAddMsec(mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC);
3882     managerUnlock(mgr);
3883 }
3884 
3885 /***
3886 ****
3887 ***/
3888 
compareAtomPtrsByAddress(void const * va,void const * vb)3889 static int compareAtomPtrsByAddress(void const* va, void const* vb)
3890 {
3891     struct peer_atom const* a = *(struct peer_atom const* const*)va;
3892     struct peer_atom const* b = *(struct peer_atom const* const*)vb;
3893 
3894     TR_ASSERT(tr_isAtom(a));
3895     TR_ASSERT(tr_isAtom(b));
3896 
3897     return tr_address_compare(&a->addr, &b->addr);
3898 }
3899 
3900 /* best come first, worst go last */
compareAtomPtrsByShelfDate(void const * va,void const * vb)3901 static int compareAtomPtrsByShelfDate(void const* va, void const* vb)
3902 {
3903     struct peer_atom const* a = *(struct peer_atom const* const*)va;
3904     struct peer_atom const* b = *(struct peer_atom const* const*)vb;
3905 
3906     TR_ASSERT(tr_isAtom(a));
3907     TR_ASSERT(tr_isAtom(b));
3908 
3909     int const data_time_cutoff_secs = 60 * 60;
3910     time_t const tr_now = tr_time();
3911 
3912     /* primary key: the last piece data time *if* it was within the last hour */
3913     time_t atime = a->piece_data_time;
3914 
3915     if (atime + data_time_cutoff_secs < tr_now)
3916     {
3917         atime = 0;
3918     }
3919 
3920     time_t btime = b->piece_data_time;
3921 
3922     if (btime + data_time_cutoff_secs < tr_now)
3923     {
3924         btime = 0;
3925     }
3926 
3927     if (atime != btime)
3928     {
3929         return atime > btime ? -1 : 1;
3930     }
3931 
3932     /* secondary key: shelf date. */
3933     if (a->shelf_date != b->shelf_date)
3934     {
3935         return a->shelf_date > b->shelf_date ? -1 : 1;
3936     }
3937 
3938     return 0;
3939 }
3940 
getMaxAtomCount(tr_torrent const * tor)3941 static int getMaxAtomCount(tr_torrent const* tor)
3942 {
3943     return MIN(50, tor->maxConnectedPeers * 3);
3944 }
3945 
atomPulse(evutil_socket_t foo UNUSED,short bar UNUSED,void * vmgr)3946 static void atomPulse(evutil_socket_t foo UNUSED, short bar UNUSED, void* vmgr)
3947 {
3948     tr_torrent* tor = NULL;
3949     tr_peerMgr* mgr = vmgr;
3950     managerLock(mgr);
3951 
3952     while ((tor = tr_torrentNext(mgr->session, tor)) != NULL)
3953     {
3954         int atomCount;
3955         tr_swarm* s = tor->swarm;
3956         int const maxAtomCount = getMaxAtomCount(tor);
3957         struct peer_atom** atoms = (struct peer_atom**)tr_ptrArrayPeek(&s->pool, &atomCount);
3958 
3959         if (atomCount > maxAtomCount) /* we've got too many atoms... time to prune */
3960         {
3961             int keepCount = 0;
3962             int testCount = 0;
3963             struct peer_atom** keep = tr_new(struct peer_atom*, atomCount);
3964             struct peer_atom** test = tr_new(struct peer_atom*, atomCount);
3965 
3966             /* keep the ones that are in use */
3967             for (int i = 0; i < atomCount; ++i)
3968             {
3969                 struct peer_atom* atom = atoms[i];
3970 
3971                 if (peerIsInUse(s, atom))
3972                 {
3973                     keep[keepCount++] = atom;
3974                 }
3975                 else
3976                 {
3977                     test[testCount++] = atom;
3978                 }
3979             }
3980 
3981             /* if there's room, keep the best of what's left */
3982             int i = 0;
3983 
3984             if (keepCount < maxAtomCount)
3985             {
3986                 qsort(test, testCount, sizeof(struct peer_atom*), compareAtomPtrsByShelfDate);
3987 
3988                 while (i < testCount && keepCount < maxAtomCount)
3989                 {
3990                     keep[keepCount++] = test[i++];
3991                 }
3992             }
3993 
3994             /* free the culled atoms */
3995             while (i < testCount)
3996             {
3997                 tr_free(test[i++]);
3998             }
3999 
4000             /* rebuild Torrent.pool with what's left */
4001             tr_ptrArrayDestruct(&s->pool, NULL);
4002             s->pool = TR_PTR_ARRAY_INIT;
4003             qsort(keep, keepCount, sizeof(struct peer_atom*), compareAtomPtrsByAddress);
4004 
4005             for (int i = 0; i < keepCount; ++i)
4006             {
4007                 tr_ptrArrayAppend(&s->pool, keep[i]);
4008             }
4009 
4010             tordbg(s, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount);
4011 
4012             /* cleanup */
4013             tr_free(test);
4014             tr_free(keep);
4015         }
4016     }
4017 
4018     tr_timerAddMsec(mgr->atomTimer, ATOM_PERIOD_MSEC);
4019     managerUnlock(mgr);
4020 }
4021 
4022 /***
4023 ****
4024 ****
4025 ****
4026 ***/
4027 
4028 /* is this atom someone that we'd want to initiate a connection to? */
isPeerCandidate(tr_torrent const * tor,struct peer_atom * atom,time_t const now)4029 static bool isPeerCandidate(tr_torrent const* tor, struct peer_atom* atom, time_t const now)
4030 {
4031     /* not if we're both seeds */
4032     if (tr_torrentIsSeed(tor) && atomIsSeed(atom))
4033     {
4034         return false;
4035     }
4036 
4037     /* not if we've already got a connection to them... */
4038     if (peerIsInUse(tor->swarm, atom))
4039     {
4040         return false;
4041     }
4042 
4043     /* not if we just tried them already */
4044     if (now - atom->time < getReconnectIntervalSecs(atom, now))
4045     {
4046         return false;
4047     }
4048 
4049     /* not if they're blocklisted */
4050     if (isAtomBlocklisted(tor->session, atom))
4051     {
4052         return false;
4053     }
4054 
4055     /* not if they're banned... */
4056     if ((atom->flags2 & MYFLAG_BANNED) != 0)
4057     {
4058         return false;
4059     }
4060 
4061     return true;
4062 }
4063 
4064 struct peer_candidate
4065 {
4066     uint64_t score;
4067     tr_torrent* tor;
4068     struct peer_atom* atom;
4069 };
4070 
torrentWasRecentlyStarted(tr_torrent const * tor)4071 static bool torrentWasRecentlyStarted(tr_torrent const* tor)
4072 {
4073     return difftime(tr_time(), tor->startDate) < 120;
4074 }
4075 
addValToKey(uint64_t value,int width,uint64_t addme)4076 static inline uint64_t addValToKey(uint64_t value, int width, uint64_t addme)
4077 {
4078     value = value << (uint64_t)width;
4079     value |= addme;
4080     return value;
4081 }
4082 
4083 /* smaller value is better */
getPeerCandidateScore(tr_torrent const * tor,struct peer_atom const * atom,uint8_t salt)4084 static uint64_t getPeerCandidateScore(tr_torrent const* tor, struct peer_atom const* atom, uint8_t salt)
4085 {
4086     uint64_t i;
4087     uint64_t score = 0;
4088     bool const failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
4089 
4090     /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
4091     i = failed ? 1 : 0;
4092     score = addValToKey(score, 1, i);
4093 
4094     /* prefer the one we attempted least recently (to cycle through all peers) */
4095     i = atom->lastConnectionAttemptAt;
4096     score = addValToKey(score, 32, i);
4097 
4098     /* prefer peers belonging to a torrent of a higher priority */
4099     switch (tr_torrentGetPriority(tor))
4100     {
4101     case TR_PRI_HIGH:
4102         i = 0;
4103         break;
4104 
4105     case TR_PRI_NORMAL:
4106         i = 1;
4107         break;
4108 
4109     case TR_PRI_LOW:
4110         i = 2;
4111         break;
4112     }
4113 
4114     score = addValToKey(score, 4, i);
4115 
4116     /* prefer recently-started torrents */
4117     i = torrentWasRecentlyStarted(tor) ? 0 : 1;
4118     score = addValToKey(score, 1, i);
4119 
4120     /* prefer torrents we're downloading with */
4121     i = tr_torrentIsSeed(tor) ? 1 : 0;
4122     score = addValToKey(score, 1, i);
4123 
4124     /* prefer peers that are known to be connectible */
4125     i = (atom->flags & ADDED_F_CONNECTABLE) != 0 ? 0 : 1;
4126     score = addValToKey(score, 1, i);
4127 
4128     /* prefer peers that we might have a chance of uploading to...
4129     so lower seed probability is better */
4130     if (atom->seedProbability == 100)
4131     {
4132         i = 101;
4133     }
4134     else if (atom->seedProbability == -1)
4135     {
4136         i = 100;
4137     }
4138     else
4139     {
4140         i = atom->seedProbability;
4141     }
4142 
4143     score = addValToKey(score, 8, i);
4144 
4145     /* Prefer peers that we got from more trusted sources.
4146      * lower `fromBest' values indicate more trusted sources */
4147     score = addValToKey(score, 4, atom->fromBest);
4148 
4149     /* salt */
4150     score = addValToKey(score, 8, salt);
4151 
4152     return score;
4153 }
4154 
comparePeerCandidates(void const * va,void const * vb)4155 static int comparePeerCandidates(void const* va, void const* vb)
4156 {
4157     int ret;
4158     struct peer_candidate const* a = va;
4159     struct peer_candidate const* b = vb;
4160 
4161     if (a->score < b->score)
4162     {
4163         ret = -1;
4164     }
4165     else if (a->score > b->score)
4166     {
4167         ret = 1;
4168     }
4169     else
4170     {
4171         ret = 0;
4172     }
4173 
4174     return ret;
4175 }
4176 
4177 /* Partial sorting -- selecting the k best candidates
4178    Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
selectPeerCandidates(struct peer_candidate * candidates,int candidate_count,int select_count)4179 static void selectPeerCandidates(struct peer_candidate* candidates, int candidate_count, int select_count)
4180 {
4181     tr_quickfindFirstK(candidates, candidate_count, sizeof(struct peer_candidate), comparePeerCandidates, select_count);
4182 }
4183 
4184 #ifdef TR_ENABLE_ASSERTS
4185 
checkBestScoresComeFirst(struct peer_candidate const * candidates,int n,int k)4186 static bool checkBestScoresComeFirst(struct peer_candidate const* candidates, int n, int k)
4187 {
4188     uint64_t worstFirstScore = 0;
4189     int const x = MIN(n, k) - 1;
4190 
4191     for (int i = 0; i < x; i++)
4192     {
4193         if (worstFirstScore < candidates[i].score)
4194         {
4195             worstFirstScore = candidates[i].score;
4196         }
4197     }
4198 
4199     for (int i = 0; i < x; i++)
4200     {
4201         TR_ASSERT(candidates[i].score <= worstFirstScore);
4202     }
4203 
4204     for (int i = x + 1; i < n; i++)
4205     {
4206         TR_ASSERT(candidates[i].score >= worstFirstScore);
4207     }
4208 
4209     return true;
4210 }
4211 
4212 #endif /* TR_ENABLE_ASSERTS */
4213 
4214 /** @return an array of all the atoms we might want to connect to */
getPeerCandidates(tr_session * session,int * candidateCount,int max)4215 static struct peer_candidate* getPeerCandidates(tr_session* session, int* candidateCount, int max)
4216 {
4217     int atomCount;
4218     int peerCount;
4219     tr_torrent* tor;
4220     struct peer_candidate* candidates;
4221     struct peer_candidate* walk;
4222     time_t const now = tr_time();
4223     uint64_t const now_msec = tr_time_msec();
4224     /* leave 5% of connection slots for incoming connections -- ticket #2609 */
4225     int const maxCandidates = tr_sessionGetPeerLimit(session) * 0.95;
4226 
4227     /* count how many peers and atoms we've got */
4228     tor = NULL;
4229     atomCount = 0;
4230     peerCount = 0;
4231 
4232     while ((tor = tr_torrentNext(session, tor)) != NULL)
4233     {
4234         atomCount += tr_ptrArraySize(&tor->swarm->pool);
4235         peerCount += tr_ptrArraySize(&tor->swarm->peers);
4236     }
4237 
4238     /* don't start any new handshakes if we're full up */
4239     if (maxCandidates <= peerCount)
4240     {
4241         *candidateCount = 0;
4242         return NULL;
4243     }
4244 
4245     /* allocate an array of candidates */
4246     walk = candidates = tr_new(struct peer_candidate, atomCount);
4247 
4248     /* populate the candidate array */
4249     tor = NULL;
4250 
4251     while ((tor = tr_torrentNext(session, tor)) != NULL)
4252     {
4253         int nAtoms;
4254         struct peer_atom** atoms;
4255 
4256         if (!tor->swarm->isRunning)
4257         {
4258             continue;
4259         }
4260 
4261         /* if we've already got enough peers in this torrent... */
4262         if (tr_torrentGetPeerLimit(tor) <= tr_ptrArraySize(&tor->swarm->peers))
4263         {
4264             continue;
4265         }
4266 
4267         /* if we've already got enough speed in this torrent... */
4268         if (tr_torrentIsSeed(tor) && isBandwidthMaxedOut(&tor->bandwidth, now_msec, TR_UP))
4269         {
4270             continue;
4271         }
4272 
4273         atoms = (struct peer_atom**)tr_ptrArrayPeek(&tor->swarm->pool, &nAtoms);
4274 
4275         for (int i = 0; i < nAtoms; ++i)
4276         {
4277             struct peer_atom* atom = atoms[i];
4278 
4279             if (isPeerCandidate(tor, atom, now))
4280             {
4281                 uint8_t const salt = tr_rand_int_weak(1024);
4282                 walk->tor = tor;
4283                 walk->atom = atom;
4284                 walk->score = getPeerCandidateScore(tor, atom, salt);
4285                 ++walk;
4286             }
4287         }
4288     }
4289 
4290     *candidateCount = walk - candidates;
4291 
4292     if (walk != candidates)
4293     {
4294         selectPeerCandidates(candidates, walk - candidates, max);
4295     }
4296 
4297     TR_ASSERT(checkBestScoresComeFirst(candidates, *candidateCount, max));
4298     return candidates;
4299 }
4300 
initiateConnection(tr_peerMgr * mgr,tr_swarm * s,struct peer_atom * atom)4301 static void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, struct peer_atom* atom)
4302 {
4303     tr_peerIo* io;
4304     time_t const now = tr_time();
4305     bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
4306 
4307     if (atom->fromFirst == TR_PEER_FROM_PEX)
4308     {
4309         /* PEX has explicit signalling for uTP support.  If an atom
4310            originally came from PEX and doesn't have the uTP flag, skip the
4311            uTP connection attempt.  Are we being optimistic here? */
4312         utp = utp && (atom->flags & ADDED_F_UTP_FLAGS) != 0;
4313     }
4314 
4315     tordbg(s, "Starting an OUTGOING%s connection with %s", utp ? " µTP" : "", tr_atomAddrStr(atom));
4316 
4317     io = tr_peerIoNewOutgoing(mgr->session, &mgr->session->bandwidth, &atom->addr, atom->port, s->tor->info.hash,
4318         s->tor->completeness == TR_SEED, utp);
4319 
4320     if (io == NULL)
4321     {
4322         tordbg(s, "peerIo not created; marking peer %s as unreachable", tr_atomAddrStr(atom));
4323         atom->flags2 |= MYFLAG_UNREACHABLE;
4324         atom->numFails++;
4325     }
4326     else
4327     {
4328         tr_handshake* handshake = tr_handshakeNew(io, mgr->session->encryptionMode, myHandshakeDoneCB, mgr);
4329 
4330         TR_ASSERT(tr_peerIoGetTorrentHash(io));
4331 
4332         tr_peerIoUnref(io); /* balanced by the initial ref in tr_peerIoNewOutgoing() */
4333 
4334         tr_ptrArrayInsertSorted(&s->outgoingHandshakes, handshake, handshakeCompare);
4335     }
4336 
4337     atom->lastConnectionAttemptAt = now;
4338     atom->time = now;
4339 }
4340 
initiateCandidateConnection(tr_peerMgr * mgr,struct peer_candidate * c)4341 static void initiateCandidateConnection(tr_peerMgr* mgr, struct peer_candidate* c)
4342 {
4343 #if 0
4344 
4345     fprintf(stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n", tr_atomAddrStr(c->atom),
4346         tr_torrentName(c->tor), (int)c->atom->seedProbability, tr_torrentIsPrivate(c->tor) ? "private" : "public",
4347         tr_torrentIsSeed(c->tor) ? "seed" : "downloader");
4348 
4349 #endif
4350 
4351     initiateConnection(mgr, c->tor->swarm, c->atom);
4352 }
4353 
makeNewPeerConnections(struct tr_peerMgr * mgr,int const max)4354 static void makeNewPeerConnections(struct tr_peerMgr* mgr, int const max)
4355 {
4356     int n;
4357     struct peer_candidate* candidates;
4358 
4359     candidates = getPeerCandidates(mgr->session, &n, max);
4360 
4361     for (int i = 0; i < n && i < max; ++i)
4362     {
4363         initiateCandidateConnection(mgr, &candidates[i]);
4364     }
4365 
4366     tr_free(candidates);
4367 }
4368