1 /*
2 * This file Copyright (C) 2007-2014 Mnemosyne LLC
3 *
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
6 *
7 */
8
9 #include <errno.h> /* error codes ERANGE, ... */
10 #include <limits.h> /* INT_MAX */
11 #include <string.h> /* memcpy, memcmp, strstr */
12 #include <stdlib.h> /* qsort */
13
14 #include <event2/event.h>
15
16 #include <libutp/utp.h>
17
18 #include "transmission.h"
19 #include "announcer.h"
20 #include "bandwidth.h"
21 #include "blocklist.h"
22 #include "cache.h"
23 #include "clients.h"
24 #include "completion.h"
25 #include "crypto-utils.h"
26 #include "handshake.h"
27 #include "log.h"
28 #include "net.h"
29 #include "peer-io.h"
30 #include "peer-mgr.h"
31 #include "peer-msgs.h"
32 #include "ptrarray.h"
33 #include "session.h"
34 #include "stats.h" /* tr_statsAddUploaded, tr_statsAddDownloaded */
35 #include "torrent.h"
36 #include "tr-assert.h"
37 #include "tr-utp.h"
38 #include "utils.h"
39 #include "webseed.h"
40
41 enum
42 {
43 /* how frequently to cull old atoms */
44 ATOM_PERIOD_MSEC = (60 * 1000),
45 /* how frequently to change which peers are choked */
46 RECHOKE_PERIOD_MSEC = (10 * 1000),
47 /* an optimistically unchoked peer is immune from rechoking
48 for this many calls to rechokeUploads(). */
49 OPTIMISTIC_UNCHOKE_MULTIPLIER = 4,
50 /* how frequently to reallocate bandwidth */
51 BANDWIDTH_PERIOD_MSEC = 500,
52 /* how frequently to age out old piece request lists */
53 REFILL_UPKEEP_PERIOD_MSEC = (10 * 1000),
54 /* how frequently to decide which peers live and die */
55 RECONNECT_PERIOD_MSEC = 500,
56 /* when many peers are available, keep idle ones this long */
57 MIN_UPLOAD_IDLE_SECS = (60),
58 /* when few peers are available, keep idle ones this long */
59 MAX_UPLOAD_IDLE_SECS = (60 * 5),
60 /* max number of peers to ask for per second overall.
61 * this throttle is to avoid overloading the router */
62 MAX_CONNECTIONS_PER_SECOND = 12,
63 /* */
64 MAX_CONNECTIONS_PER_PULSE = (int)(MAX_CONNECTIONS_PER_SECOND * (RECONNECT_PERIOD_MSEC / 1000.0)),
65 /* number of bad pieces a peer is allowed to send before we ban them */
66 MAX_BAD_PIECES_PER_PEER = 5,
67 /* amount of time to keep a list of request pieces lying around
68 before it's considered too old and needs to be rebuilt */
69 PIECE_LIST_SHELF_LIFE_SECS = 60,
70 /* use for bitwise operations w/peer_atom.flags2 */
71 MYFLAG_BANNED = 1,
72 /* use for bitwise operations w/peer_atom.flags2 */
73 /* unreachable for now... but not banned.
74 * if they try to connect to us it's okay */
75 MYFLAG_UNREACHABLE = 2,
76 /* the minimum we'll wait before attempting to reconnect to a peer */
77 MINIMUM_RECONNECT_INTERVAL_SECS = 5,
78 /** how long we'll let requests we've made linger before we cancel them */
79 REQUEST_TTL_SECS = 90,
80 /* */
81 NO_BLOCKS_CANCEL_HISTORY = 120,
82 /* */
83 CANCEL_HISTORY_SEC = 60
84 };
85
86 tr_peer_event const TR_PEER_EVENT_INIT =
87 {
88 .eventType = TR_PEER_CLIENT_GOT_BLOCK,
89 .pieceIndex = 0,
90 .bitfield = NULL,
91 .offset = 0,
92 .length = 0,
93 .err = 0,
94 .port = 0
95 };
96
97 tr_swarm_stats const TR_SWARM_STATS_INIT =
98 {
99 .activePeerCount = { 0, 0 },
100 .activeWebseedCount = 0,
101 .peerCount = 0,
102 .peerFromCount = { 0, 0, 0, 0, 0, 0, 0 }
103 };
104
105 /**
106 ***
107 **/
108
109 /**
110 * Peer information that should be kept even before we've connected and
111 * after we've disconnected. These are kept in a pool of peer_atoms to decide
112 * which ones would make good candidates for connecting to, and to watch out
113 * for banned peers.
114 *
115 * @see tr_peer
116 * @see tr_peerMsgs
117 */
118 struct peer_atom
119 {
120 uint8_t fromFirst; /* where the peer was first found */
121 uint8_t fromBest; /* the "best" value of where the peer has been found */
122 uint8_t flags; /* these match the added_f flags */
123 uint8_t flags2; /* flags that aren't defined in added_f */
124 int8_t seedProbability; /* how likely is this to be a seed... [0..100] or -1 for unknown */
125 int8_t blocklisted; /* -1 for unknown, true for blocklisted, false for not blocklisted */
126
127 tr_port port;
128 bool utp_failed; /* We recently failed to connect over uTP */
129 uint16_t numFails;
130 time_t time; /* when the peer's connection status last changed */
131 time_t piece_data_time;
132
133 time_t lastConnectionAttemptAt;
134 time_t lastConnectionAt;
135
136 /* similar to a TTL field, but less rigid --
137 * if the swarm is small, the atom will be kept past this date. */
138 time_t shelf_date;
139 tr_peer* peer; /* will be NULL if not connected */
140 tr_address addr;
141 };
142
143 #ifndef TR_ENABLE_ASSERTS
144
145 #define tr_isAtom(a) (true)
146
147 #else
148
tr_isAtom(struct peer_atom const * atom)149 static bool tr_isAtom(struct peer_atom const* atom)
150 {
151 return atom != NULL && atom->fromFirst < TR_PEER_FROM__MAX && atom->fromBest < TR_PEER_FROM__MAX &&
152 tr_address_is_valid(&atom->addr);
153 }
154
155 #endif
156
tr_atomAddrStr(struct peer_atom const * atom)157 static char const* tr_atomAddrStr(struct peer_atom const* atom)
158 {
159 return atom != NULL ? tr_peerIoAddrStr(&atom->addr, atom->port) : "[no atom]";
160 }
161
162 struct block_request
163 {
164 tr_block_index_t block;
165 tr_peer* peer;
166 time_t sentAt;
167 };
168
169 struct weighted_piece
170 {
171 tr_piece_index_t index;
172 int16_t salt;
173 int16_t requestCount;
174 };
175
176 enum piece_sort_state
177 {
178 PIECES_UNSORTED,
179 PIECES_SORTED_BY_INDEX,
180 PIECES_SORTED_BY_WEIGHT
181 };
182
183 /** @brief Opaque, per-torrent data structure for peer connection information */
184 typedef struct tr_swarm
185 {
186 tr_swarm_stats stats;
187
188 tr_ptrArray outgoingHandshakes; /* tr_handshake */
189 tr_ptrArray pool; /* struct peer_atom */
190 tr_ptrArray peers; /* tr_peerMsgs */
191 tr_ptrArray webseeds; /* tr_webseed */
192
193 tr_torrent* tor;
194 struct tr_peerMgr* manager;
195
196 tr_peerMsgs* optimistic; /* the optimistic peer, or NULL if none */
197 int optimisticUnchokeTimeScaler;
198
199 bool isRunning;
200 bool needsCompletenessCheck;
201
202 struct block_request* requests;
203 int requestCount;
204 int requestAlloc;
205
206 struct weighted_piece* pieces;
207 int pieceCount;
208 enum piece_sort_state pieceSortState;
209
210 /* An array of pieceCount items stating how many peers have each piece.
211 This is used to help us for downloading pieces "rarest first."
212 This may be NULL if we don't have metainfo yet, or if we're not
213 downloading and don't care about rarity */
214 uint16_t* pieceReplication;
215 size_t pieceReplicationSize;
216
217 int interestedCount;
218 int maxPeers;
219 time_t lastCancel;
220
221 /* Before the endgame this should be 0. In endgame, is contains the average
222 * number of pending requests per peer. Only peers which have more pending
223 * requests are considered 'fast' are allowed to request a block that's
224 * already been requested from another (slower?) peer. */
225 int endgame;
226 }
227 tr_swarm;
228
229 struct tr_peerMgr
230 {
231 tr_session* session;
232 tr_ptrArray incomingHandshakes; /* tr_handshake */
233 struct event* bandwidthTimer;
234 struct event* rechokeTimer;
235 struct event* refillUpkeepTimer;
236 struct event* atomTimer;
237 };
238
239 #define tordbg(t, ...) tr_logAddDeepNamed(tr_torrentName((t)->tor), __VA_ARGS__)
240
241 #define dbgmsg(...) tr_logAddDeepNamed(NULL, __VA_ARGS__)
242
243 /**
244 *** tr_peer virtual functions
245 **/
246
tr_peerIsTransferringPieces(tr_peer const * peer,uint64_t now,tr_direction direction,unsigned int * Bps)247 static bool tr_peerIsTransferringPieces(tr_peer const* peer, uint64_t now, tr_direction direction, unsigned int* Bps)
248 {
249 TR_ASSERT(peer != NULL);
250 TR_ASSERT(peer->funcs != NULL);
251
252 return (*peer->funcs->is_transferring_pieces)(peer, now, direction, Bps);
253 }
254
tr_peerGetPieceSpeed_Bps(tr_peer const * peer,uint64_t now,tr_direction direction)255 unsigned int tr_peerGetPieceSpeed_Bps(tr_peer const* peer, uint64_t now, tr_direction direction)
256 {
257 unsigned int Bps = 0;
258 tr_peerIsTransferringPieces(peer, now, direction, &Bps);
259 return Bps;
260 }
261
tr_peerFree(tr_peer * peer)262 static void tr_peerFree(tr_peer* peer)
263 {
264 TR_ASSERT(peer != NULL);
265 TR_ASSERT(peer->funcs != NULL);
266
267 (*peer->funcs->destruct)(peer);
268
269 tr_free(peer);
270 }
271
tr_peerConstruct(tr_peer * peer,tr_torrent const * tor)272 void tr_peerConstruct(tr_peer* peer, tr_torrent const* tor)
273 {
274 TR_ASSERT(peer != NULL);
275 TR_ASSERT(tr_isTorrent(tor));
276
277 memset(peer, 0, sizeof(tr_peer));
278
279 peer->client = TR_KEY_NONE;
280 peer->swarm = tor->swarm;
281 tr_bitfieldConstruct(&peer->have, tor->info.pieceCount);
282 tr_bitfieldConstruct(&peer->blame, tor->blockCount);
283 }
284
285 static void peerDeclinedAllRequests(tr_swarm*, tr_peer const*);
286
tr_peerDestruct(tr_peer * peer)287 void tr_peerDestruct(tr_peer* peer)
288 {
289 TR_ASSERT(peer != NULL);
290
291 if (peer->swarm != NULL)
292 {
293 peerDeclinedAllRequests(peer->swarm, peer);
294 }
295
296 tr_bitfieldDestruct(&peer->have);
297 tr_bitfieldDestruct(&peer->blame);
298
299 if (peer->atom != NULL)
300 {
301 peer->atom->peer = NULL;
302 }
303 }
304
305 /**
306 ***
307 **/
308
managerLock(struct tr_peerMgr const * manager)309 static inline void managerLock(struct tr_peerMgr const* manager)
310 {
311 tr_sessionLock(manager->session);
312 }
313
managerUnlock(struct tr_peerMgr const * manager)314 static inline void managerUnlock(struct tr_peerMgr const* manager)
315 {
316 tr_sessionUnlock(manager->session);
317 }
318
swarmLock(tr_swarm * swarm)319 static inline void swarmLock(tr_swarm* swarm)
320 {
321 managerLock(swarm->manager);
322 }
323
swarmUnlock(tr_swarm * swarm)324 static inline void swarmUnlock(tr_swarm* swarm)
325 {
326 managerUnlock(swarm->manager);
327 }
328
329 #ifdef TR_ENABLE_ASSERTS
330
swarmIsLocked(tr_swarm const * swarm)331 static inline bool swarmIsLocked(tr_swarm const* swarm)
332 {
333 return tr_sessionIsLocked(swarm->manager->session);
334 }
335
336 #endif /* TR_ENABLE_ASSERTS */
337
338 /**
339 ***
340 **/
341
handshakeCompareToAddr(void const * va,void const * vb)342 static int handshakeCompareToAddr(void const* va, void const* vb)
343 {
344 tr_handshake const* a = va;
345
346 return tr_address_compare(tr_handshakeGetAddr(a, NULL), vb);
347 }
348
handshakeCompare(void const * a,void const * b)349 static int handshakeCompare(void const* a, void const* b)
350 {
351 return handshakeCompareToAddr(a, tr_handshakeGetAddr(b, NULL));
352 }
353
getExistingHandshake(tr_ptrArray * handshakes,tr_address const * addr)354 static inline tr_handshake* getExistingHandshake(tr_ptrArray* handshakes, tr_address const* addr)
355 {
356 if (tr_ptrArrayEmpty(handshakes))
357 {
358 return NULL;
359 }
360
361 return tr_ptrArrayFindSorted(handshakes, addr, handshakeCompareToAddr);
362 }
363
comparePeerAtomToAddress(void const * va,void const * vb)364 static int comparePeerAtomToAddress(void const* va, void const* vb)
365 {
366 struct peer_atom const* a = va;
367
368 return tr_address_compare(&a->addr, vb);
369 }
370
compareAtomsByAddress(void const * va,void const * vb)371 static int compareAtomsByAddress(void const* va, void const* vb)
372 {
373 struct peer_atom const* b = vb;
374
375 TR_ASSERT(tr_isAtom(b));
376
377 return comparePeerAtomToAddress(va, &b->addr);
378 }
379
380 /**
381 ***
382 **/
383
tr_peerAddress(tr_peer const * peer)384 tr_address const* tr_peerAddress(tr_peer const* peer)
385 {
386 return &peer->atom->addr;
387 }
388
getExistingSwarm(tr_peerMgr * manager,uint8_t const * hash)389 static tr_swarm* getExistingSwarm(tr_peerMgr* manager, uint8_t const* hash)
390 {
391 tr_torrent* tor = tr_torrentFindFromHash(manager->session, hash);
392
393 return tor == NULL ? NULL : tor->swarm;
394 }
395
peerCompare(void const * a,void const * b)396 static int peerCompare(void const* a, void const* b)
397 {
398 return tr_address_compare(tr_peerAddress(a), tr_peerAddress(b));
399 }
400
getExistingAtom(tr_swarm const * cswarm,tr_address const * addr)401 static struct peer_atom* getExistingAtom(tr_swarm const* cswarm, tr_address const* addr)
402 {
403 tr_swarm* swarm = (tr_swarm*)cswarm;
404 return tr_ptrArrayFindSorted(&swarm->pool, addr, comparePeerAtomToAddress);
405 }
406
peerIsInUse(tr_swarm const * cs,struct peer_atom const * atom)407 static bool peerIsInUse(tr_swarm const* cs, struct peer_atom const* atom)
408 {
409 tr_swarm* s = (tr_swarm*)cs;
410
411 TR_ASSERT(swarmIsLocked(s));
412
413 return atom->peer != NULL || getExistingHandshake(&s->outgoingHandshakes, &atom->addr) != NULL ||
414 getExistingHandshake(&s->manager->incomingHandshakes, &atom->addr) != NULL;
415 }
416
replicationExists(tr_swarm const * s)417 static inline bool replicationExists(tr_swarm const* s)
418 {
419 return s->pieceReplication != NULL;
420 }
421
replicationFree(tr_swarm * s)422 static void replicationFree(tr_swarm* s)
423 {
424 tr_free(s->pieceReplication);
425 s->pieceReplication = NULL;
426 s->pieceReplicationSize = 0;
427 }
428
replicationNew(tr_swarm * s)429 static void replicationNew(tr_swarm* s)
430 {
431 TR_ASSERT(!replicationExists(s));
432
433 tr_piece_index_t const piece_count = s->tor->info.pieceCount;
434 int const n = tr_ptrArraySize(&s->peers);
435
436 s->pieceReplicationSize = piece_count;
437 s->pieceReplication = tr_new0(uint16_t, piece_count);
438
439 for (tr_piece_index_t piece_i = 0; piece_i < piece_count; ++piece_i)
440 {
441 uint16_t r = 0;
442
443 for (int peer_i = 0; peer_i < n; ++peer_i)
444 {
445 tr_peer* peer = tr_ptrArrayNth(&s->peers, peer_i);
446
447 if (tr_bitfieldHas(&peer->have, piece_i))
448 {
449 ++r;
450 }
451 }
452
453 s->pieceReplication[piece_i] = r;
454 }
455 }
456
swarmFree(void * vs)457 static void swarmFree(void* vs)
458 {
459 tr_swarm* s = vs;
460
461 TR_ASSERT(s != NULL);
462 TR_ASSERT(!s->isRunning);
463 TR_ASSERT(swarmIsLocked(s));
464 TR_ASSERT(tr_ptrArrayEmpty(&s->outgoingHandshakes));
465 TR_ASSERT(tr_ptrArrayEmpty(&s->peers));
466
467 tr_ptrArrayDestruct(&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
468 tr_ptrArrayDestruct(&s->pool, (PtrArrayForeachFunc)tr_free);
469 tr_ptrArrayDestruct(&s->outgoingHandshakes, NULL);
470 tr_ptrArrayDestruct(&s->peers, NULL);
471 s->stats = TR_SWARM_STATS_INIT;
472
473 replicationFree(s);
474
475 tr_free(s->requests);
476 tr_free(s->pieces);
477 tr_free(s);
478 }
479
480 static void peerCallbackFunc(tr_peer*, tr_peer_event const*, void*);
481
rebuildWebseedArray(tr_swarm * s,tr_torrent * tor)482 static void rebuildWebseedArray(tr_swarm* s, tr_torrent* tor)
483 {
484 tr_info const* inf = &tor->info;
485
486 /* clear the array */
487 tr_ptrArrayDestruct(&s->webseeds, (PtrArrayForeachFunc)tr_peerFree);
488 s->webseeds = TR_PTR_ARRAY_INIT;
489 s->stats.activeWebseedCount = 0;
490
491 /* repopulate it */
492 for (unsigned int i = 0; i < inf->webseedCount; ++i)
493 {
494 tr_webseed* w = tr_webseedNew(tor, inf->webseeds[i], peerCallbackFunc, s);
495 tr_ptrArrayAppend(&s->webseeds, w);
496 }
497 }
498
swarmNew(tr_peerMgr * manager,tr_torrent * tor)499 static tr_swarm* swarmNew(tr_peerMgr* manager, tr_torrent* tor)
500 {
501 tr_swarm* s;
502
503 s = tr_new0(tr_swarm, 1);
504 s->manager = manager;
505 s->tor = tor;
506 s->pool = TR_PTR_ARRAY_INIT;
507 s->peers = TR_PTR_ARRAY_INIT;
508 s->webseeds = TR_PTR_ARRAY_INIT;
509 s->outgoingHandshakes = TR_PTR_ARRAY_INIT;
510
511 rebuildWebseedArray(s, tor);
512
513 return s;
514 }
515
516 static void ensureMgrTimersExist(struct tr_peerMgr* m);
517
tr_peerMgrNew(tr_session * session)518 tr_peerMgr* tr_peerMgrNew(tr_session* session)
519 {
520 tr_peerMgr* m = tr_new0(tr_peerMgr, 1);
521 m->session = session;
522 m->incomingHandshakes = TR_PTR_ARRAY_INIT;
523 ensureMgrTimersExist(m);
524 return m;
525 }
526
deleteTimer(struct event ** t)527 static void deleteTimer(struct event** t)
528 {
529 if (*t != NULL)
530 {
531 event_free(*t);
532 *t = NULL;
533 }
534 }
535
deleteTimers(struct tr_peerMgr * m)536 static void deleteTimers(struct tr_peerMgr* m)
537 {
538 deleteTimer(&m->atomTimer);
539 deleteTimer(&m->bandwidthTimer);
540 deleteTimer(&m->rechokeTimer);
541 deleteTimer(&m->refillUpkeepTimer);
542 }
543
tr_peerMgrFree(tr_peerMgr * manager)544 void tr_peerMgrFree(tr_peerMgr* manager)
545 {
546 managerLock(manager);
547
548 deleteTimers(manager);
549
550 /* free the handshakes. Abort invokes handshakeDoneCB(), which removes
551 * the item from manager->handshakes, so this is a little roundabout... */
552 while (!tr_ptrArrayEmpty(&manager->incomingHandshakes))
553 {
554 tr_handshakeAbort(tr_ptrArrayNth(&manager->incomingHandshakes, 0));
555 }
556
557 tr_ptrArrayDestruct(&manager->incomingHandshakes, NULL);
558
559 managerUnlock(manager);
560 tr_free(manager);
561 }
562
563 /***
564 ****
565 ***/
566
tr_peerMgrOnBlocklistChanged(tr_peerMgr * mgr)567 void tr_peerMgrOnBlocklistChanged(tr_peerMgr* mgr)
568 {
569 tr_torrent* tor = NULL;
570 tr_session* session = mgr->session;
571
572 /* we cache whether or not a peer is blocklisted...
573 since the blocklist has changed, erase that cached value */
574 while ((tor = tr_torrentNext(session, tor)) != NULL)
575 {
576 tr_swarm* s = tor->swarm;
577
578 for (int i = 0, n = tr_ptrArraySize(&s->pool); i < n; ++i)
579 {
580 struct peer_atom* atom = tr_ptrArrayNth(&s->pool, i);
581 atom->blocklisted = -1;
582 }
583 }
584 }
585
isAtomBlocklisted(tr_session * session,struct peer_atom * atom)586 static bool isAtomBlocklisted(tr_session* session, struct peer_atom* atom)
587 {
588 if (atom->blocklisted < 0)
589 {
590 atom->blocklisted = (int8_t)tr_sessionIsAddressBlocked(session, &atom->addr);
591 }
592
593 return atom->blocklisted != 0;
594 }
595
596 /***
597 ****
598 ***/
599
atomSetSeedProbability(struct peer_atom * atom,int seedProbability)600 static void atomSetSeedProbability(struct peer_atom* atom, int seedProbability)
601 {
602 TR_ASSERT(atom != NULL);
603 TR_ASSERT(seedProbability >= -1);
604 TR_ASSERT(seedProbability <= 100);
605
606 atom->seedProbability = seedProbability;
607
608 if (seedProbability == 100)
609 {
610 atom->flags |= ADDED_F_SEED_FLAG;
611 }
612 else if (seedProbability != -1)
613 {
614 atom->flags &= ~ADDED_F_SEED_FLAG;
615 }
616 }
617
atomIsSeed(struct peer_atom const * atom)618 static inline bool atomIsSeed(struct peer_atom const* atom)
619 {
620 return atom->seedProbability == 100;
621 }
622
atomSetSeed(tr_swarm const * s,struct peer_atom * atom)623 static void atomSetSeed(tr_swarm const* s, struct peer_atom* atom)
624 {
625 if (!atomIsSeed(atom))
626 {
627 tordbg(s, "marking peer %s as a seed", tr_atomAddrStr(atom));
628
629 atomSetSeedProbability(atom, 100);
630 }
631 }
632
tr_peerMgrPeerIsSeed(tr_torrent const * tor,tr_address const * addr)633 bool tr_peerMgrPeerIsSeed(tr_torrent const* tor, tr_address const* addr)
634 {
635 bool isSeed = false;
636 tr_swarm const* s = tor->swarm;
637 struct peer_atom const* atom = getExistingAtom(s, addr);
638
639 if (atom != NULL)
640 {
641 isSeed = atomIsSeed(atom);
642 }
643
644 return isSeed;
645 }
646
tr_peerMgrSetUtpSupported(tr_torrent * tor,tr_address const * addr)647 void tr_peerMgrSetUtpSupported(tr_torrent* tor, tr_address const* addr)
648 {
649 struct peer_atom* atom = getExistingAtom(tor->swarm, addr);
650
651 if (atom != NULL)
652 {
653 atom->flags |= ADDED_F_UTP_FLAGS;
654 }
655 }
656
tr_peerMgrSetUtpFailed(tr_torrent * tor,tr_address const * addr,bool failed)657 void tr_peerMgrSetUtpFailed(tr_torrent* tor, tr_address const* addr, bool failed)
658 {
659 struct peer_atom* atom = getExistingAtom(tor->swarm, addr);
660
661 if (atom != NULL)
662 {
663 atom->utp_failed = failed;
664 }
665 }
666
667 /**
668 *** REQUESTS
669 ***
670 *** There are two data structures associated with managing block requests:
671 ***
672 *** 1. tr_swarm::requests, an array of "struct block_request" which keeps
673 *** track of which blocks have been requested, and when, and by which peers.
674 *** This is list is used for (a) cancelling requests that have been pending
675 *** for too long and (b) avoiding duplicate requests before endgame.
676 ***
677 *** 2. tr_swarm::pieces, an array of "struct weighted_piece" which lists the
678 *** pieces that we want to request. It's used to decide which blocks to
679 *** return next when tr_peerMgrGetBlockRequests() is called.
680 **/
681
682 /**
683 *** struct block_request
684 **/
685
compareReqByBlock(void const * va,void const * vb)686 static int compareReqByBlock(void const* va, void const* vb)
687 {
688 struct block_request const* a = va;
689 struct block_request const* b = vb;
690
691 /* primary key: block */
692 if (a->block < b->block)
693 {
694 return -1;
695 }
696
697 if (a->block > b->block)
698 {
699 return 1;
700 }
701
702 /* secondary key: peer */
703 if (a->peer < b->peer)
704 {
705 return -1;
706 }
707
708 if (a->peer > b->peer)
709 {
710 return 1;
711 }
712
713 return 0;
714 }
715
requestListAdd(tr_swarm * s,tr_block_index_t block,tr_peer * peer)716 static void requestListAdd(tr_swarm* s, tr_block_index_t block, tr_peer* peer)
717 {
718 struct block_request key;
719
720 /* ensure enough room is available... */
721 if (s->requestCount + 1 >= s->requestAlloc)
722 {
723 int const CHUNK_SIZE = 128;
724 s->requestAlloc += CHUNK_SIZE;
725 s->requests = tr_renew(struct block_request, s->requests, s->requestAlloc);
726 }
727
728 /* populate the record we're inserting */
729 key.block = block;
730 key.peer = peer;
731 key.sentAt = tr_time();
732
733 /* insert the request to our array... */
734 {
735 bool exact;
736 int const pos = tr_lowerBound(&key, s->requests, s->requestCount, sizeof(struct block_request), compareReqByBlock,
737 &exact);
738 TR_ASSERT(!exact);
739 memmove(s->requests + pos + 1, s->requests + pos, sizeof(struct block_request) * (s->requestCount++ - pos));
740 s->requests[pos] = key;
741 }
742
743 if (peer != NULL)
744 {
745 ++peer->pendingReqsToPeer;
746 TR_ASSERT(peer->pendingReqsToPeer >= 0);
747 }
748
749 // fprintf(stderr, "added request of block %lu from peer %s... there are now %d block\n", (unsigned long)block,
750 // tr_atomAddrStr(peer->atom), s->requestCount);
751 }
752
requestListLookup(tr_swarm * s,tr_block_index_t block,tr_peer const * peer)753 static struct block_request* requestListLookup(tr_swarm* s, tr_block_index_t block, tr_peer const* peer)
754 {
755 struct block_request key;
756 key.block = block;
757 key.peer = (tr_peer*)peer;
758
759 return bsearch(&key, s->requests, s->requestCount, sizeof(struct block_request), compareReqByBlock);
760 }
761
762 /**
763 * Find the peers are we currently requesting the block
764 * with index @a block from and append them to @a peerArr.
765 */
getBlockRequestPeers(tr_swarm * s,tr_block_index_t block,tr_ptrArray * peerArr)766 static void getBlockRequestPeers(tr_swarm* s, tr_block_index_t block, tr_ptrArray* peerArr)
767 {
768 bool exact;
769 int pos;
770 struct block_request key;
771
772 key.block = block;
773 key.peer = NULL;
774 pos = tr_lowerBound(&key, s->requests, s->requestCount, sizeof(struct block_request), compareReqByBlock, &exact);
775
776 TR_ASSERT(!exact); /* shouldn't have a request with .peer == NULL */
777
778 for (int i = pos; i < s->requestCount; ++i)
779 {
780 if (s->requests[i].block != block)
781 {
782 break;
783 }
784
785 tr_ptrArrayAppend(peerArr, s->requests[i].peer);
786 }
787 }
788
decrementPendingReqCount(struct block_request const * b)789 static void decrementPendingReqCount(struct block_request const* b)
790 {
791 if (b->peer != NULL)
792 {
793 if (b->peer->pendingReqsToPeer > 0)
794 {
795 --b->peer->pendingReqsToPeer;
796 }
797 }
798 }
799
requestListRemove(tr_swarm * s,tr_block_index_t block,tr_peer const * peer)800 static void requestListRemove(tr_swarm* s, tr_block_index_t block, tr_peer const* peer)
801 {
802 struct block_request const* b = requestListLookup(s, block, peer);
803
804 if (b != NULL)
805 {
806 int const pos = b - s->requests;
807 TR_ASSERT(pos < s->requestCount);
808
809 decrementPendingReqCount(b);
810
811 tr_removeElementFromArray(s->requests, pos, sizeof(struct block_request), s->requestCount);
812 --s->requestCount;
813
814 // fprintf(stderr, "removing request of block %lu from peer %s... there are now %d block requests left\n", (unsigned long)block,
815 // tr_atomAddrStr(peer->atom), t->requestCount);
816 }
817 }
818
countActiveWebseeds(tr_swarm * s)819 static int countActiveWebseeds(tr_swarm* s)
820 {
821 int activeCount = 0;
822
823 if (s->tor->isRunning && !tr_torrentIsSeed(s->tor))
824 {
825 uint64_t const now = tr_time_msec();
826
827 for (int i = 0, n = tr_ptrArraySize(&s->webseeds); i < n; ++i)
828 {
829 if (tr_peerIsTransferringPieces(tr_ptrArrayNth(&s->webseeds, i), now, TR_DOWN, NULL))
830 {
831 ++activeCount;
832 }
833 }
834 }
835
836 return activeCount;
837 }
838
testForEndgame(tr_swarm const * s)839 static bool testForEndgame(tr_swarm const* s)
840 {
841 /* we consider ourselves to be in endgame if the number of bytes
842 we've got requested is >= the number of bytes left to download */
843 return (uint64_t)s->requestCount * s->tor->blockSize >= tr_torrentGetLeftUntilDone(s->tor);
844 }
845
updateEndgame(tr_swarm * s)846 static void updateEndgame(tr_swarm* s)
847 {
848 TR_ASSERT(s->requestCount >= 0);
849
850 if (!testForEndgame(s))
851 {
852 /* not in endgame */
853 s->endgame = 0;
854 }
855 else if (s->endgame == 0) /* only recalculate when endgame first begins */
856 {
857 int numDownloading = 0;
858
859 /* add the active bittorrent peers... */
860 for (int i = 0, n = tr_ptrArraySize(&s->peers); i < n; ++i)
861 {
862 tr_peer const* p = tr_ptrArrayNth(&s->peers, i);
863
864 if (p->pendingReqsToPeer > 0)
865 {
866 ++numDownloading;
867 }
868 }
869
870 /* add the active webseeds... */
871 numDownloading += countActiveWebseeds(s);
872
873 /* average number of pending requests per downloading peer */
874 s->endgame = s->requestCount / MAX(numDownloading, 1);
875 }
876 }
877
878 /****
879 *****
880 ***** Piece List Manipulation / Accessors
881 *****
882 ****/
883
invalidatePieceSorting(tr_swarm * s)884 static inline void invalidatePieceSorting(tr_swarm* s)
885 {
886 s->pieceSortState = PIECES_UNSORTED;
887 }
888
889 static tr_torrent const* weightTorrent;
890
891 static uint16_t const* weightReplication;
892
setComparePieceByWeightTorrent(tr_swarm * s)893 static void setComparePieceByWeightTorrent(tr_swarm* s)
894 {
895 if (!replicationExists(s))
896 {
897 replicationNew(s);
898 }
899
900 weightTorrent = s->tor;
901 weightReplication = s->pieceReplication;
902 }
903
904 /* we try to create a "weight" s.t. high-priority pieces come before others,
905 * and that partially-complete pieces come before empty ones. */
comparePieceByWeight(void const * va,void const * vb)906 static int comparePieceByWeight(void const* va, void const* vb)
907 {
908 struct weighted_piece const* a = va;
909 struct weighted_piece const* b = vb;
910 int ia;
911 int ib;
912 int missing;
913 int pending;
914 tr_torrent const* tor = weightTorrent;
915 uint16_t const* rep = weightReplication;
916
917 /* primary key: weight */
918 missing = tr_torrentMissingBlocksInPiece(tor, a->index);
919 pending = a->requestCount;
920 ia = missing > pending ? missing - pending : tor->blockCountInPiece + pending;
921 missing = tr_torrentMissingBlocksInPiece(tor, b->index);
922 pending = b->requestCount;
923 ib = missing > pending ? missing - pending : tor->blockCountInPiece + pending;
924
925 if (ia < ib)
926 {
927 return -1;
928 }
929
930 if (ia > ib)
931 {
932 return 1;
933 }
934
935 /* secondary key: higher priorities go first */
936 ia = tor->info.pieces[a->index].priority;
937 ib = tor->info.pieces[b->index].priority;
938
939 if (ia > ib)
940 {
941 return -1;
942 }
943
944 if (ia < ib)
945 {
946 return 1;
947 }
948
949 /* tertiary key: rarest first. */
950 ia = rep[a->index];
951 ib = rep[b->index];
952
953 if (ia < ib)
954 {
955 return -1;
956 }
957
958 if (ia > ib)
959 {
960 return 1;
961 }
962
963 /* quaternary key: random */
964 if (a->salt < b->salt)
965 {
966 return -1;
967 }
968
969 if (a->salt > b->salt)
970 {
971 return 1;
972 }
973
974 /* okay, they're equal */
975 return 0;
976 }
977
comparePieceByIndex(void const * va,void const * vb)978 static int comparePieceByIndex(void const* va, void const* vb)
979 {
980 struct weighted_piece const* a = va;
981 struct weighted_piece const* b = vb;
982
983 if (a->index < b->index)
984 {
985 return -1;
986 }
987
988 if (a->index > b->index)
989 {
990 return 1;
991 }
992
993 return 0;
994 }
995
pieceListSort(tr_swarm * s,enum piece_sort_state state)996 static void pieceListSort(tr_swarm* s, enum piece_sort_state state)
997 {
998 TR_ASSERT(state == PIECES_SORTED_BY_INDEX || state == PIECES_SORTED_BY_WEIGHT);
999
1000 if (state == PIECES_SORTED_BY_WEIGHT)
1001 {
1002 setComparePieceByWeightTorrent(s);
1003 qsort(s->pieces, s->pieceCount, sizeof(struct weighted_piece), comparePieceByWeight);
1004 }
1005 else
1006 {
1007 qsort(s->pieces, s->pieceCount, sizeof(struct weighted_piece), comparePieceByIndex);
1008 }
1009
1010 s->pieceSortState = state;
1011 }
1012
1013 /**
1014 * These functions are useful for testing, but too expensive for nightly builds.
1015 * let's leave it disabled but add an easy hook to compile it back in
1016 */
1017 #if 1
1018
1019 #define assertWeightedPiecesAreSorted(t)
1020 #define assertReplicationCountIsExact(t)
1021
1022 #else
1023
assertWeightedPiecesAreSorted(Torrent * t)1024 static void assertWeightedPiecesAreSorted(Torrent* t)
1025 {
1026 if (t->endgame == 0)
1027 {
1028 setComparePieceByWeightTorrent(t);
1029
1030 for (int i = 0; i < t->pieceCount - 1; ++i)
1031 {
1032 TR_ASSERT(comparePieceByWeight(&t->pieces[i], &t->pieces[i + 1]) <= 0);
1033 }
1034 }
1035 }
1036
assertReplicationCountIsExact(Torrent * t)1037 static void assertReplicationCountIsExact(Torrent* t)
1038 {
1039 /* This assert might fail due to errors of implementations in other
1040 * clients. It happens when receiving duplicate bitfields/HaveAll/HaveNone
1041 * from a client. If a such a behavior is noticed,
1042 * a bug report should be filled to the faulty client. */
1043
1044 uint16_t const* rep = t->pieceReplication;
1045 size_t const piece_count = t->pieceReplicationSize;
1046 tr_peer const** peers = (tr_peer const**)tr_ptrArrayBase(&t->peers);
1047 int const peer_count = tr_ptrArraySize(&t->peers);
1048
1049 TR_ASSERT(piece_count == t->tor->info.pieceCount);
1050
1051 for (size_t piece_i = 0; piece_i < piece_count; ++piece_i)
1052 {
1053 uint16_t r = 0;
1054
1055 for (int peer_i = 0; peer_i < peer_count; ++peer_i)
1056 {
1057 if (tr_bitsetHas(&peers[peer_i]->have, piece_i))
1058 {
1059 ++r;
1060 }
1061 }
1062
1063 TR_ASSERT(rep[piece_i] == r);
1064 }
1065 }
1066
1067 #endif
1068
pieceListLookup(tr_swarm * s,tr_piece_index_t index)1069 static struct weighted_piece* pieceListLookup(tr_swarm* s, tr_piece_index_t index)
1070 {
1071 for (int i = 0; i < s->pieceCount; ++i)
1072 {
1073 if (s->pieces[i].index == index)
1074 {
1075 return &s->pieces[i];
1076 }
1077 }
1078
1079 return NULL;
1080 }
1081
pieceListRebuild(tr_swarm * s)1082 static void pieceListRebuild(tr_swarm* s)
1083 {
1084 if (!tr_torrentIsSeed(s->tor))
1085 {
1086 tr_piece_index_t* pool;
1087 tr_piece_index_t poolCount = 0;
1088 tr_torrent const* tor = s->tor;
1089 tr_info const* inf = tr_torrentInfo(tor);
1090 struct weighted_piece* pieces;
1091 int pieceCount;
1092
1093 /* build the new list */
1094 pool = tr_new(tr_piece_index_t, inf->pieceCount);
1095
1096 for (tr_piece_index_t i = 0; i < inf->pieceCount; ++i)
1097 {
1098 if (!inf->pieces[i].dnd)
1099 {
1100 if (!tr_torrentPieceIsComplete(tor, i))
1101 {
1102 pool[poolCount++] = i;
1103 }
1104 }
1105 }
1106
1107 pieceCount = poolCount;
1108 pieces = tr_new0(struct weighted_piece, pieceCount);
1109
1110 for (tr_piece_index_t i = 0; i < poolCount; ++i)
1111 {
1112 struct weighted_piece* piece = pieces + i;
1113 piece->index = pool[i];
1114 piece->requestCount = 0;
1115 piece->salt = tr_rand_int_weak(4096);
1116 }
1117
1118 /* if we already had a list of pieces, merge it into
1119 * the new list so we don't lose its requestCounts */
1120 if (s->pieces != NULL)
1121 {
1122 struct weighted_piece* o = s->pieces;
1123 struct weighted_piece* oend = o + s->pieceCount;
1124 struct weighted_piece* n = pieces;
1125 struct weighted_piece* nend = n + pieceCount;
1126
1127 pieceListSort(s, PIECES_SORTED_BY_INDEX);
1128
1129 while (o != oend && n != nend)
1130 {
1131 if (o->index < n->index)
1132 {
1133 ++o;
1134 }
1135 else if (o->index > n->index)
1136 {
1137 ++n;
1138 }
1139 else
1140 {
1141 *n++ = *o++;
1142 }
1143 }
1144
1145 tr_free(s->pieces);
1146 }
1147
1148 s->pieces = pieces;
1149 s->pieceCount = pieceCount;
1150
1151 pieceListSort(s, PIECES_SORTED_BY_WEIGHT);
1152
1153 /* cleanup */
1154 tr_free(pool);
1155 }
1156 }
1157
pieceListRemovePiece(tr_swarm * s,tr_piece_index_t piece)1158 static void pieceListRemovePiece(tr_swarm* s, tr_piece_index_t piece)
1159 {
1160 struct weighted_piece* p;
1161
1162 if ((p = pieceListLookup(s, piece)) != NULL)
1163 {
1164 int const pos = p - s->pieces;
1165
1166 tr_removeElementFromArray(s->pieces, pos, sizeof(struct weighted_piece), s->pieceCount);
1167 --s->pieceCount;
1168
1169 if (s->pieceCount == 0)
1170 {
1171 tr_free(s->pieces);
1172 s->pieces = NULL;
1173 }
1174 }
1175 }
1176
pieceListResortPiece(tr_swarm * s,struct weighted_piece * p)1177 static void pieceListResortPiece(tr_swarm* s, struct weighted_piece* p)
1178 {
1179 int pos;
1180 bool isSorted = true;
1181
1182 if (p == NULL)
1183 {
1184 return;
1185 }
1186
1187 /* is the torrent already sorted? */
1188 pos = p - s->pieces;
1189 setComparePieceByWeightTorrent(s);
1190
1191 if (isSorted && pos > 0 && comparePieceByWeight(p - 1, p) > 0)
1192 {
1193 isSorted = false;
1194 }
1195
1196 if (isSorted && pos < s->pieceCount - 1 && comparePieceByWeight(p, p + 1) > 0)
1197 {
1198 isSorted = false;
1199 }
1200
1201 if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1202 {
1203 pieceListSort(s, PIECES_SORTED_BY_WEIGHT);
1204 isSorted = true;
1205 }
1206
1207 /* if it's not sorted, move it around */
1208 if (!isSorted)
1209 {
1210 bool exact;
1211 struct weighted_piece const tmp = *p;
1212
1213 tr_removeElementFromArray(s->pieces, pos, sizeof(struct weighted_piece), s->pieceCount);
1214 --s->pieceCount;
1215
1216 pos = tr_lowerBound(&tmp, s->pieces, s->pieceCount, sizeof(struct weighted_piece), comparePieceByWeight, &exact);
1217
1218 memmove(&s->pieces[pos + 1], &s->pieces[pos], sizeof(struct weighted_piece) * (s->pieceCount - pos));
1219 ++s->pieceCount;
1220
1221 s->pieces[pos] = tmp;
1222 }
1223
1224 assertWeightedPiecesAreSorted(s);
1225 }
1226
pieceListRemoveRequest(tr_swarm * s,tr_block_index_t block)1227 static void pieceListRemoveRequest(tr_swarm* s, tr_block_index_t block)
1228 {
1229 struct weighted_piece* p;
1230 tr_piece_index_t const index = tr_torBlockPiece(s->tor, block);
1231
1232 if ((p = pieceListLookup(s, index)) != NULL && p->requestCount > 0)
1233 {
1234 --p->requestCount;
1235 pieceListResortPiece(s, p);
1236 }
1237 }
1238
1239 /****
1240 *****
1241 ***** Replication count (for rarest first policy)
1242 *****
1243 ****/
1244
1245 /**
1246 * Increase the replication count of this piece and sort it if the
1247 * piece list is already sorted
1248 */
tr_incrReplicationOfPiece(tr_swarm * s,size_t const index)1249 static void tr_incrReplicationOfPiece(tr_swarm* s, size_t const index)
1250 {
1251 TR_ASSERT(replicationExists(s));
1252 TR_ASSERT(s->pieceReplicationSize == s->tor->info.pieceCount);
1253
1254 /* One more replication of this piece is present in the swarm */
1255 ++s->pieceReplication[index];
1256
1257 /* we only resort the piece if the list is already sorted */
1258 if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1259 {
1260 pieceListResortPiece(s, pieceListLookup(s, index));
1261 }
1262 }
1263
1264 /**
1265 * Increases the replication count of pieces present in the bitfield
1266 */
tr_incrReplicationFromBitfield(tr_swarm * s,tr_bitfield const * b)1267 static void tr_incrReplicationFromBitfield(tr_swarm* s, tr_bitfield const* b)
1268 {
1269 TR_ASSERT(replicationExists(s));
1270
1271 uint16_t* rep = s->pieceReplication;
1272
1273 for (size_t i = 0, n = s->tor->info.pieceCount; i < n; ++i)
1274 {
1275 if (tr_bitfieldHas(b, i))
1276 {
1277 ++rep[i];
1278 }
1279 }
1280
1281 if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1282 {
1283 invalidatePieceSorting(s);
1284 }
1285 }
1286
1287 /**
1288 * Increase the replication count of every piece
1289 */
tr_incrReplication(tr_swarm * s)1290 static void tr_incrReplication(tr_swarm* s)
1291 {
1292 TR_ASSERT(replicationExists(s));
1293 TR_ASSERT(s->pieceReplicationSize == s->tor->info.pieceCount);
1294
1295 for (size_t i = 0; i < s->pieceReplicationSize; ++i)
1296 {
1297 ++s->pieceReplication[i];
1298 }
1299 }
1300
1301 /**
1302 * Decrease the replication count of pieces present in the bitset.
1303 */
tr_decrReplicationFromBitfield(tr_swarm * s,tr_bitfield const * b)1304 static void tr_decrReplicationFromBitfield(tr_swarm* s, tr_bitfield const* b)
1305 {
1306 TR_ASSERT(replicationExists(s));
1307 TR_ASSERT(s->pieceReplicationSize == s->tor->info.pieceCount);
1308
1309 if (tr_bitfieldHasAll(b))
1310 {
1311 for (size_t i = 0; i < s->pieceReplicationSize; ++i)
1312 {
1313 --s->pieceReplication[i];
1314 }
1315 }
1316 else if (!tr_bitfieldHasNone(b))
1317 {
1318 for (size_t i = 0; i < s->pieceReplicationSize; ++i)
1319 {
1320 if (tr_bitfieldHas(b, i))
1321 {
1322 --s->pieceReplication[i];
1323 }
1324 }
1325
1326 if (s->pieceSortState == PIECES_SORTED_BY_WEIGHT)
1327 {
1328 invalidatePieceSorting(s);
1329 }
1330 }
1331 }
1332
1333 /**
1334 ***
1335 **/
1336
tr_peerMgrRebuildRequests(tr_torrent * tor)1337 void tr_peerMgrRebuildRequests(tr_torrent* tor)
1338 {
1339 TR_ASSERT(tr_isTorrent(tor));
1340
1341 pieceListRebuild(tor->swarm);
1342 }
1343
tr_peerMgrGetNextRequests(tr_torrent * tor,tr_peer * peer,int numwant,tr_block_index_t * setme,int * numgot,bool get_intervals)1344 void tr_peerMgrGetNextRequests(tr_torrent* tor, tr_peer* peer, int numwant, tr_block_index_t* setme, int* numgot,
1345 bool get_intervals)
1346 {
1347 /* sanity clause */
1348 TR_ASSERT(tr_isTorrent(tor));
1349 TR_ASSERT(numwant > 0);
1350
1351 tr_swarm* s;
1352 tr_bitfield const* const have = &peer->have;
1353
1354 /* walk through the pieces and find blocks that should be requested */
1355 s = tor->swarm;
1356
1357 /* prep the pieces list */
1358 if (s->pieces == NULL)
1359 {
1360 pieceListRebuild(s);
1361 }
1362
1363 if (s->pieceSortState != PIECES_SORTED_BY_WEIGHT)
1364 {
1365 pieceListSort(s, PIECES_SORTED_BY_WEIGHT);
1366 }
1367
1368 assertReplicationCountIsExact(s);
1369 assertWeightedPiecesAreSorted(s);
1370
1371 updateEndgame(s);
1372
1373 struct weighted_piece* pieces = s->pieces;
1374 int got = 0;
1375 int checkedPieceCount = 0;
1376
1377 for (int i = 0; i < s->pieceCount && got < numwant; ++i, ++checkedPieceCount)
1378 {
1379 struct weighted_piece* p = pieces + i;
1380
1381 /* if the peer has this piece that we want... */
1382 if (tr_bitfieldHas(have, p->index))
1383 {
1384 tr_block_index_t first;
1385 tr_block_index_t last;
1386 tr_ptrArray peerArr = TR_PTR_ARRAY_INIT;
1387
1388 tr_torGetPieceBlockRange(tor, p->index, &first, &last);
1389
1390 for (tr_block_index_t b = first; b <= last && (got < numwant || (get_intervals && setme[2 * got - 1] == b - 1));
1391 ++b)
1392 {
1393 int peerCount;
1394 tr_peer** peers;
1395
1396 /* don't request blocks we've already got */
1397 if (tr_torrentBlockIsComplete(tor, b))
1398 {
1399 continue;
1400 }
1401
1402 /* always add peer if this block has no peers yet */
1403 tr_ptrArrayClear(&peerArr);
1404 getBlockRequestPeers(s, b, &peerArr);
1405 peers = (tr_peer**)tr_ptrArrayPeek(&peerArr, &peerCount);
1406
1407 if (peerCount != 0)
1408 {
1409 /* don't make a second block request until the endgame */
1410 if (s->endgame == 0)
1411 {
1412 continue;
1413 }
1414
1415 /* don't have more than two peers requesting this block */
1416 if (peerCount > 1)
1417 {
1418 continue;
1419 }
1420
1421 /* don't send the same request to the same peer twice */
1422 if (peer == peers[0])
1423 {
1424 continue;
1425 }
1426
1427 /* in the endgame allow an additional peer to download a
1428 block but only if the peer seems to be handling requests
1429 relatively fast */
1430 if (peer->pendingReqsToPeer + numwant - got < s->endgame)
1431 {
1432 continue;
1433 }
1434 }
1435
1436 /* update the caller's table */
1437 if (!get_intervals)
1438 {
1439 setme[got++] = b;
1440 }
1441 /* if intervals are requested two array entries are necessarry:
1442 one for the interval's starting block and one for its end block */
1443 else if (got != 0 && setme[2 * got - 1] == b - 1 && b != first)
1444 {
1445 /* expand the last interval */
1446 ++setme[2 * got - 1];
1447 }
1448 else
1449 {
1450 /* begin a new interval */
1451 setme[2 * got] = b;
1452 setme[2 * got + 1] = b;
1453 ++got;
1454 }
1455
1456 /* update our own tables */
1457 requestListAdd(s, b, peer);
1458 ++p->requestCount;
1459 }
1460
1461 tr_ptrArrayDestruct(&peerArr, NULL);
1462 }
1463 }
1464
1465 /* In most cases we've just changed the weights of a small number of pieces.
1466 * So rather than qsort()ing the entire array, it's faster to apply an
1467 * adaptive insertion sort algorithm. */
1468 if (got > 0)
1469 {
1470 /* not enough requests || last piece modified */
1471 if (checkedPieceCount == s->pieceCount)
1472 {
1473 --checkedPieceCount;
1474 }
1475
1476 setComparePieceByWeightTorrent(s);
1477
1478 for (int i = checkedPieceCount - 1; i >= 0; --i)
1479 {
1480 bool exact;
1481
1482 /* relative position! */
1483 int const newpos = tr_lowerBound(&s->pieces[i], &s->pieces[i + 1], s->pieceCount - (i + 1),
1484 sizeof(struct weighted_piece), comparePieceByWeight, &exact);
1485
1486 if (newpos > 0)
1487 {
1488 struct weighted_piece const piece = s->pieces[i];
1489 memmove(&s->pieces[i], &s->pieces[i + 1], sizeof(struct weighted_piece) * newpos);
1490 s->pieces[i + newpos] = piece;
1491 }
1492 }
1493 }
1494
1495 assertWeightedPiecesAreSorted(t);
1496 *numgot = got;
1497 }
1498
tr_peerMgrDidPeerRequest(tr_torrent const * tor,tr_peer const * peer,tr_block_index_t block)1499 bool tr_peerMgrDidPeerRequest(tr_torrent const* tor, tr_peer const* peer, tr_block_index_t block)
1500 {
1501 return requestListLookup((tr_swarm*)tor->swarm, block, peer) != NULL;
1502 }
1503
1504 /* cancel requests that are too old */
refillUpkeep(evutil_socket_t foo UNUSED,short bar UNUSED,void * vmgr)1505 static void refillUpkeep(evutil_socket_t foo UNUSED, short bar UNUSED, void* vmgr)
1506 {
1507 time_t now;
1508 time_t too_old;
1509 tr_torrent* tor;
1510 int cancel_buflen = 0;
1511 struct block_request* cancel = NULL;
1512 tr_peerMgr* mgr = vmgr;
1513 managerLock(mgr);
1514
1515 now = tr_time();
1516 too_old = now - REQUEST_TTL_SECS;
1517
1518 /* alloc the temporary "cancel" buffer */
1519 tor = NULL;
1520
1521 while ((tor = tr_torrentNext(mgr->session, tor)) != NULL)
1522 {
1523 cancel_buflen = MAX(cancel_buflen, tor->swarm->requestCount);
1524 }
1525
1526 if (cancel_buflen > 0)
1527 {
1528 cancel = tr_new(struct block_request, cancel_buflen);
1529 }
1530
1531 /* prune requests that are too old */
1532 tor = NULL;
1533
1534 while ((tor = tr_torrentNext(mgr->session, tor)) != NULL)
1535 {
1536 tr_swarm* s = tor->swarm;
1537 int const n = s->requestCount;
1538
1539 if (n > 0)
1540 {
1541 int keepCount = 0;
1542 int cancelCount = 0;
1543
1544 for (int i = 0; i < n; ++i)
1545 {
1546 struct block_request const* const request = &s->requests[i];
1547 tr_peerMsgs* msgs = PEER_MSGS(request->peer);
1548
1549 if (msgs != NULL && request->sentAt <= too_old && !tr_peerMsgsIsReadingBlock(msgs, request->block))
1550 {
1551 TR_ASSERT(cancel != NULL);
1552 TR_ASSERT(cancelCount < cancel_buflen);
1553
1554 cancel[cancelCount++] = *request;
1555 }
1556 else
1557 {
1558 if (i != keepCount)
1559 {
1560 s->requests[keepCount] = *request;
1561 }
1562
1563 keepCount++;
1564 }
1565 }
1566
1567 /* prune out the ones we aren't keeping */
1568 s->requestCount = keepCount;
1569
1570 /* send cancel messages for all the "cancel" ones */
1571 for (int i = 0; i < cancelCount; ++i)
1572 {
1573 struct block_request const* const request = &cancel[i];
1574 tr_peerMsgs* msgs = PEER_MSGS(request->peer);
1575
1576 if (msgs != NULL)
1577 {
1578 tr_historyAdd(&request->peer->cancelsSentToPeer, now, 1);
1579 tr_peerMsgsCancel(msgs, request->block);
1580 decrementPendingReqCount(request);
1581 }
1582 }
1583
1584 /* decrement the pending request counts for the timed-out blocks */
1585 for (int i = 0; i < cancelCount; ++i)
1586 {
1587 struct block_request const* const request = &cancel[i];
1588
1589 pieceListRemoveRequest(s, request->block);
1590 }
1591 }
1592 }
1593
1594 tr_free(cancel);
1595 tr_timerAddMsec(mgr->refillUpkeepTimer, REFILL_UPKEEP_PERIOD_MSEC);
1596 managerUnlock(mgr);
1597 }
1598
addStrike(tr_swarm * s,tr_peer * peer)1599 static void addStrike(tr_swarm* s, tr_peer* peer)
1600 {
1601 tordbg(s, "increasing peer %s strike count to %d", tr_atomAddrStr(peer->atom), peer->strikes + 1);
1602
1603 if (++peer->strikes >= MAX_BAD_PIECES_PER_PEER)
1604 {
1605 struct peer_atom* atom = peer->atom;
1606 atom->flags2 |= MYFLAG_BANNED;
1607 peer->doPurge = true;
1608 tordbg(s, "banning peer %s", tr_atomAddrStr(atom));
1609 }
1610 }
1611
peerSuggestedPiece(tr_swarm * s UNUSED,tr_peer * peer UNUSED,tr_piece_index_t pieceIndex UNUSED,int isFastAllowed UNUSED)1612 static void peerSuggestedPiece(tr_swarm* s UNUSED, tr_peer* peer UNUSED, tr_piece_index_t pieceIndex UNUSED,
1613 int isFastAllowed UNUSED)
1614 {
1615 #if 0
1616
1617 TR_ASSERT(t != NULL);
1618 TR_ASSERT(peer != NULL);
1619 TR_ASSERT(peer->msgs != NULL);
1620
1621 /* is this a valid piece? */
1622 if (pieceIndex >= t->tor->info.pieceCount)
1623 {
1624 return;
1625 }
1626
1627 /* don't ask for it if we've already got it */
1628 if (tr_torrentPieceIsComplete(t->tor, pieceIndex))
1629 {
1630 return;
1631 }
1632
1633 /* don't ask for it if they don't have it */
1634 if (!tr_bitfieldHas(peer->have, pieceIndex))
1635 {
1636 return;
1637 }
1638
1639 /* don't ask for it if we're choked and it's not fast */
1640 if (!isFastAllowed && peer->clientIsChoked)
1641 {
1642 return;
1643 }
1644
1645 /* request the blocks that we don't have in this piece */
1646 {
1647 tr_block_index_t first;
1648 tr_block_index_t last;
1649 tr_torrent const* tor = t->tor;
1650
1651 tr_torGetPieceBlockRange(t->tor, pieceIndex, &first, &last);
1652
1653 for (tr_block_index_t b = first; b <= last; ++b)
1654 {
1655 if (tr_torrentBlockIsComplete(tor, b))
1656 {
1657 uint32_t const offset = getBlockOffsetInPiece(tor, b);
1658 uint32_t const length = tr_torBlockCountBytes(tor, b);
1659 tr_peerMsgsAddRequest(peer->msgs, pieceIndex, offset, length);
1660 incrementPieceRequests(t, pieceIndex);
1661 }
1662 }
1663 }
1664
1665 #endif
1666 }
1667
removeRequestFromTables(tr_swarm * s,tr_block_index_t block,tr_peer const * peer)1668 static void removeRequestFromTables(tr_swarm* s, tr_block_index_t block, tr_peer const* peer)
1669 {
1670 requestListRemove(s, block, peer);
1671 pieceListRemoveRequest(s, block);
1672 }
1673
1674 /* peer choked us, or maybe it disconnected.
1675 either way we need to remove all its requests */
peerDeclinedAllRequests(tr_swarm * s,tr_peer const * peer)1676 static void peerDeclinedAllRequests(tr_swarm* s, tr_peer const* peer)
1677 {
1678 int n = 0;
1679 tr_block_index_t* blocks = tr_new(tr_block_index_t, s->requestCount);
1680
1681 for (int i = 0; i < s->requestCount; ++i)
1682 {
1683 if (peer == s->requests[i].peer)
1684 {
1685 blocks[n++] = s->requests[i].block;
1686 }
1687 }
1688
1689 for (int i = 0; i < n; ++i)
1690 {
1691 removeRequestFromTables(s, blocks[i], peer);
1692 }
1693
1694 tr_free(blocks);
1695 }
1696
cancelAllRequestsForBlock(tr_swarm * s,tr_block_index_t block,tr_peer * no_notify)1697 static void cancelAllRequestsForBlock(tr_swarm* s, tr_block_index_t block, tr_peer* no_notify)
1698 {
1699 int peerCount;
1700 tr_peer** peers;
1701 tr_ptrArray peerArr;
1702
1703 peerArr = TR_PTR_ARRAY_INIT;
1704 getBlockRequestPeers(s, block, &peerArr);
1705 peers = (tr_peer**)tr_ptrArrayPeek(&peerArr, &peerCount);
1706
1707 for (int i = 0; i < peerCount; ++i)
1708 {
1709 tr_peer* p = peers[i];
1710
1711 if (p != no_notify && tr_isPeerMsgs(p))
1712 {
1713 tr_historyAdd(&p->cancelsSentToPeer, tr_time(), 1);
1714 tr_peerMsgsCancel(PEER_MSGS(p), block);
1715 }
1716
1717 removeRequestFromTables(s, block, p);
1718 }
1719
1720 tr_ptrArrayDestruct(&peerArr, NULL);
1721 }
1722
tr_peerMgrPieceCompleted(tr_torrent * tor,tr_piece_index_t p)1723 void tr_peerMgrPieceCompleted(tr_torrent* tor, tr_piece_index_t p)
1724 {
1725 bool pieceCameFromPeers = false;
1726 tr_swarm* const s = tor->swarm;
1727
1728 /* walk through our peers */
1729 for (int i = 0, n = tr_ptrArraySize(&s->peers); i < n; ++i)
1730 {
1731 tr_peer* peer = tr_ptrArrayNth(&s->peers, i);
1732
1733 /* notify the peer that we now have this piece */
1734 tr_peerMsgsHave(PEER_MSGS(peer), p);
1735
1736 if (!pieceCameFromPeers)
1737 {
1738 pieceCameFromPeers = tr_bitfieldHas(&peer->blame, p);
1739 }
1740 }
1741
1742 if (pieceCameFromPeers) /* webseed downloads don't belong in announce totals */
1743 {
1744 tr_announcerAddBytes(tor, TR_ANN_DOWN, tr_torPieceCountBytes(tor, p));
1745 }
1746
1747 /* bookkeeping */
1748 pieceListRemovePiece(s, p);
1749 s->needsCompletenessCheck = true;
1750 }
1751
peerCallbackFunc(tr_peer * peer,tr_peer_event const * e,void * vs)1752 static void peerCallbackFunc(tr_peer* peer, tr_peer_event const* e, void* vs)
1753 {
1754 TR_ASSERT(peer != NULL);
1755
1756 tr_swarm* s = vs;
1757
1758 swarmLock(s);
1759
1760 switch (e->eventType)
1761 {
1762 case TR_PEER_PEER_GOT_PIECE_DATA:
1763 {
1764 time_t const now = tr_time();
1765 tr_torrent* tor = s->tor;
1766
1767 tor->uploadedCur += e->length;
1768 tr_announcerAddBytes(tor, TR_ANN_UP, e->length);
1769 tr_torrentSetDateActive(tor, now);
1770 tr_torrentSetDirty(tor);
1771 tr_statsAddUploaded(tor->session, e->length);
1772
1773 if (peer->atom != NULL)
1774 {
1775 peer->atom->piece_data_time = now;
1776 }
1777
1778 break;
1779 }
1780
1781 case TR_PEER_CLIENT_GOT_PIECE_DATA:
1782 {
1783 time_t const now = tr_time();
1784 tr_torrent* tor = s->tor;
1785
1786 tor->downloadedCur += e->length;
1787 tr_torrentSetDateActive(tor, now);
1788 tr_torrentSetDirty(tor);
1789
1790 tr_statsAddDownloaded(tor->session, e->length);
1791
1792 if (peer->atom != NULL)
1793 {
1794 peer->atom->piece_data_time = now;
1795 }
1796
1797 break;
1798 }
1799
1800 case TR_PEER_CLIENT_GOT_HAVE:
1801 if (replicationExists(s))
1802 {
1803 tr_incrReplicationOfPiece(s, e->pieceIndex);
1804 assertReplicationCountIsExact(s);
1805 }
1806
1807 break;
1808
1809 case TR_PEER_CLIENT_GOT_HAVE_ALL:
1810 if (replicationExists(s))
1811 {
1812 tr_incrReplication(s);
1813 assertReplicationCountIsExact(s);
1814 }
1815
1816 break;
1817
1818 case TR_PEER_CLIENT_GOT_HAVE_NONE:
1819 /* noop */
1820 break;
1821
1822 case TR_PEER_CLIENT_GOT_BITFIELD:
1823 TR_ASSERT(e->bitfield != NULL);
1824
1825 if (replicationExists(s))
1826 {
1827 tr_incrReplicationFromBitfield(s, e->bitfield);
1828 assertReplicationCountIsExact(s);
1829 }
1830
1831 break;
1832
1833 case TR_PEER_CLIENT_GOT_REJ:
1834 {
1835 tr_block_index_t b = _tr_block(s->tor, e->pieceIndex, e->offset);
1836
1837 if (b < s->tor->blockCount)
1838 {
1839 removeRequestFromTables(s, b, peer);
1840 }
1841 else
1842 {
1843 tordbg(s, "Peer %s sent an out-of-range reject message", tr_atomAddrStr(peer->atom));
1844 }
1845
1846 break;
1847 }
1848
1849 case TR_PEER_CLIENT_GOT_CHOKE:
1850 peerDeclinedAllRequests(s, peer);
1851 break;
1852
1853 case TR_PEER_CLIENT_GOT_PORT:
1854 if (peer->atom != NULL)
1855 {
1856 peer->atom->port = e->port;
1857 }
1858
1859 break;
1860
1861 case TR_PEER_CLIENT_GOT_SUGGEST:
1862 peerSuggestedPiece(s, peer, e->pieceIndex, false);
1863 break;
1864
1865 case TR_PEER_CLIENT_GOT_ALLOWED_FAST:
1866 peerSuggestedPiece(s, peer, e->pieceIndex, true);
1867 break;
1868
1869 case TR_PEER_CLIENT_GOT_BLOCK:
1870 {
1871 tr_torrent* tor = s->tor;
1872 tr_piece_index_t const p = e->pieceIndex;
1873 tr_block_index_t const block = _tr_block(tor, p, e->offset);
1874 cancelAllRequestsForBlock(s, block, peer);
1875 tr_historyAdd(&peer->blocksSentToClient, tr_time(), 1);
1876 pieceListResortPiece(s, pieceListLookup(s, p));
1877 tr_torrentGotBlock(tor, block);
1878 break;
1879 }
1880
1881 case TR_PEER_ERROR:
1882 if (e->err == ERANGE || e->err == EMSGSIZE || e->err == ENOTCONN)
1883 {
1884 /* some protocol error from the peer */
1885 peer->doPurge = true;
1886 tordbg(s, "setting %s doPurge flag because we got an ERANGE, EMSGSIZE, or ENOTCONN error", tr_atomAddrStr(
1887 peer->atom));
1888 }
1889 else
1890 {
1891 tordbg(s, "unhandled error: %s", tr_strerror(e->err));
1892 }
1893
1894 break;
1895
1896 default:
1897 TR_ASSERT_MSG(false, "unhandled peer event type %d", (int)e->eventType);
1898 }
1899
1900 swarmUnlock(s);
1901 }
1902
getDefaultShelfLife(uint8_t from)1903 static int getDefaultShelfLife(uint8_t from)
1904 {
1905 /* in general, peers obtained from firsthand contact
1906 * are better than those from secondhand, etc etc */
1907 switch (from)
1908 {
1909 case TR_PEER_FROM_INCOMING:
1910 return 60 * 60 * 6;
1911
1912 case TR_PEER_FROM_LTEP:
1913 return 60 * 60 * 6;
1914
1915 case TR_PEER_FROM_TRACKER:
1916 return 60 * 60 * 3;
1917
1918 case TR_PEER_FROM_DHT:
1919 return 60 * 60 * 3;
1920
1921 case TR_PEER_FROM_PEX:
1922 return 60 * 60 * 2;
1923
1924 case TR_PEER_FROM_RESUME:
1925 return 60 * 60;
1926
1927 case TR_PEER_FROM_LPD:
1928 return 10 * 60;
1929
1930 default:
1931 return 60 * 60;
1932 }
1933 }
1934
ensureAtomExists(tr_swarm * s,tr_address const * addr,tr_port const port,uint8_t const flags,int8_t const seedProbability,uint8_t const from)1935 static void ensureAtomExists(tr_swarm* s, tr_address const* addr, tr_port const port, uint8_t const flags,
1936 int8_t const seedProbability, uint8_t const from)
1937 {
1938 TR_ASSERT(tr_address_is_valid(addr));
1939 TR_ASSERT(from < TR_PEER_FROM__MAX);
1940
1941 struct peer_atom* a = getExistingAtom(s, addr);
1942
1943 if (a == NULL)
1944 {
1945 int const jitter = tr_rand_int_weak(60 * 10);
1946 a = tr_new0(struct peer_atom, 1);
1947 a->addr = *addr;
1948 a->port = port;
1949 a->flags = flags;
1950 a->fromFirst = from;
1951 a->fromBest = from;
1952 a->shelf_date = tr_time() + getDefaultShelfLife(from) + jitter;
1953 a->blocklisted = -1;
1954 atomSetSeedProbability(a, seedProbability);
1955 tr_ptrArrayInsertSorted(&s->pool, a, compareAtomsByAddress);
1956
1957 tordbg(s, "got a new atom: %s", tr_atomAddrStr(a));
1958 }
1959 else
1960 {
1961 if (from < a->fromBest)
1962 {
1963 a->fromBest = from;
1964 }
1965
1966 if (a->seedProbability == -1)
1967 {
1968 atomSetSeedProbability(a, seedProbability);
1969 }
1970
1971 a->flags |= flags;
1972 }
1973 }
1974
getMaxPeerCount(tr_torrent const * tor)1975 static int getMaxPeerCount(tr_torrent const* tor)
1976 {
1977 return tor->maxConnectedPeers;
1978 }
1979
getPeerCount(tr_swarm const * s)1980 static int getPeerCount(tr_swarm const* s)
1981 {
1982 return tr_ptrArraySize(&s->peers); /* + tr_ptrArraySize(&t->outgoingHandshakes); */
1983 }
1984
createBitTorrentPeer(tr_torrent * tor,struct tr_peerIo * io,struct peer_atom * atom,tr_quark client)1985 static void createBitTorrentPeer(tr_torrent* tor, struct tr_peerIo* io, struct peer_atom* atom, tr_quark client)
1986 {
1987 TR_ASSERT(atom != NULL);
1988 TR_ASSERT(tr_isTorrent(tor));
1989 TR_ASSERT(tor->swarm != NULL);
1990
1991 tr_swarm* swarm = tor->swarm;
1992
1993 tr_peer* peer = (tr_peer*)tr_peerMsgsNew(tor, io, peerCallbackFunc, swarm);
1994 peer->atom = atom;
1995 peer->client = client;
1996 atom->peer = peer;
1997
1998 tr_ptrArrayInsertSorted(&swarm->peers, peer, peerCompare);
1999 ++swarm->stats.peerCount;
2000 ++swarm->stats.peerFromCount[atom->fromFirst];
2001
2002 TR_ASSERT(swarm->stats.peerCount == tr_ptrArraySize(&swarm->peers));
2003 TR_ASSERT(swarm->stats.peerFromCount[atom->fromFirst] <= swarm->stats.peerCount);
2004
2005 tr_peerMsgs* msgs = PEER_MSGS(peer);
2006 tr_peerMsgsUpdateActive(msgs, TR_UP);
2007 tr_peerMsgsUpdateActive(msgs, TR_DOWN);
2008 }
2009
2010 /* FIXME: this is kind of a mess. */
myHandshakeDoneCB(tr_handshake * handshake,tr_peerIo * io,bool readAnythingFromPeer,bool isConnected,uint8_t const * peer_id,void * vmanager)2011 static bool myHandshakeDoneCB(tr_handshake* handshake, tr_peerIo* io, bool readAnythingFromPeer, bool isConnected,
2012 uint8_t const* peer_id, void* vmanager)
2013 {
2014 TR_ASSERT(io != NULL);
2015
2016 bool ok = isConnected;
2017 bool success = false;
2018 tr_port port;
2019 tr_address const* addr;
2020 tr_peerMgr* manager = vmanager;
2021 tr_swarm* s = tr_peerIoHasTorrentHash(io) ? getExistingSwarm(manager, tr_peerIoGetTorrentHash(io)) : NULL;
2022
2023 if (tr_peerIoIsIncoming(io))
2024 {
2025 tr_ptrArrayRemoveSortedPointer(&manager->incomingHandshakes, handshake, handshakeCompare);
2026 }
2027 else if (s != NULL)
2028 {
2029 tr_ptrArrayRemoveSortedPointer(&s->outgoingHandshakes, handshake, handshakeCompare);
2030 }
2031
2032 if (s != NULL)
2033 {
2034 swarmLock(s);
2035 }
2036
2037 addr = tr_peerIoGetAddress(io, &port);
2038
2039 if (!ok || s == NULL || !s->isRunning)
2040 {
2041 if (s != NULL)
2042 {
2043 struct peer_atom* atom = getExistingAtom(s, addr);
2044
2045 if (atom != NULL)
2046 {
2047 ++atom->numFails;
2048
2049 if (!readAnythingFromPeer)
2050 {
2051 tordbg(s, "marking peer %s as unreachable... numFails is %d", tr_atomAddrStr(atom), (int)atom->numFails);
2052 atom->flags2 |= MYFLAG_UNREACHABLE;
2053 }
2054 }
2055 }
2056 }
2057 else /* looking good */
2058 {
2059 struct peer_atom* atom;
2060
2061 ensureAtomExists(s, addr, port, 0, -1, TR_PEER_FROM_INCOMING);
2062 atom = getExistingAtom(s, addr);
2063
2064 TR_ASSERT(atom != NULL);
2065
2066 atom->time = tr_time();
2067 atom->piece_data_time = 0;
2068 atom->lastConnectionAt = tr_time();
2069
2070 if (!tr_peerIoIsIncoming(io))
2071 {
2072 atom->flags |= ADDED_F_CONNECTABLE;
2073 atom->flags2 &= ~MYFLAG_UNREACHABLE;
2074 }
2075
2076 /* In principle, this flag specifies whether the peer groks uTP,
2077 not whether it's currently connected over uTP. */
2078 if (io->socket.type == TR_PEER_SOCKET_TYPE_UTP)
2079 {
2080 atom->flags |= ADDED_F_UTP_FLAGS;
2081 }
2082
2083 if ((atom->flags2 & MYFLAG_BANNED) != 0)
2084 {
2085 tordbg(s, "banned peer %s tried to reconnect", tr_atomAddrStr(atom));
2086 }
2087 else if (tr_peerIoIsIncoming(io) && getPeerCount(s) >= getMaxPeerCount(s->tor))
2088 {
2089 }
2090 else
2091 {
2092 tr_peer* peer = atom->peer;
2093
2094 if (peer != NULL)
2095 {
2096 /* we already have this peer */
2097 }
2098 else
2099 {
2100 tr_quark client;
2101 tr_peerIo* io;
2102 char buf[128];
2103
2104 if (peer_id != NULL)
2105 {
2106 client = tr_quark_new(tr_clientForId(buf, sizeof(buf), peer_id), TR_BAD_SIZE);
2107 }
2108 else
2109 {
2110 client = TR_KEY_NONE;
2111 }
2112
2113 io = tr_handshakeStealIO(handshake); /* this steals its refcount too, which is balanced by our unref in peerDelete() */
2114 tr_peerIoSetParent(io, &s->tor->bandwidth);
2115 createBitTorrentPeer(s->tor, io, atom, client);
2116
2117 success = true;
2118 }
2119 }
2120 }
2121
2122 if (s != NULL)
2123 {
2124 swarmUnlock(s);
2125 }
2126
2127 return success;
2128 }
2129
close_peer_socket(struct tr_peer_socket const socket,tr_session * session)2130 static void close_peer_socket(struct tr_peer_socket const socket, tr_session* session)
2131 {
2132 switch (socket.type)
2133 {
2134 case TR_PEER_SOCKET_TYPE_NONE:
2135 break;
2136
2137 case TR_PEER_SOCKET_TYPE_TCP:
2138 tr_netClose(session, socket.handle.tcp);
2139 break;
2140
2141 #ifdef WITH_UTP
2142
2143 case TR_PEER_SOCKET_TYPE_UTP:
2144 UTP_Close(socket.handle.utp);
2145 break;
2146
2147 #endif
2148
2149 default:
2150 TR_ASSERT_MSG(false, "unsupported peer socket type %d", socket.type);
2151 }
2152 }
2153
tr_peerMgrAddIncoming(tr_peerMgr * manager,tr_address * addr,tr_port port,struct tr_peer_socket const socket)2154 void tr_peerMgrAddIncoming(tr_peerMgr* manager, tr_address* addr, tr_port port, struct tr_peer_socket const socket)
2155 {
2156 TR_ASSERT(tr_isSession(manager->session));
2157
2158 managerLock(manager);
2159
2160 tr_session* session = manager->session;
2161
2162 if (tr_sessionIsAddressBlocked(session, addr))
2163 {
2164 tr_logAddDebug("Banned IP address \"%s\" tried to connect to us", tr_address_to_string(addr));
2165 close_peer_socket(socket, session);
2166 }
2167 else if (getExistingHandshake(&manager->incomingHandshakes, addr) != NULL)
2168 {
2169 close_peer_socket(socket, session);
2170 }
2171 else /* we don't have a connection to them yet... */
2172 {
2173 tr_peerIo* io;
2174 tr_handshake* handshake;
2175
2176 io = tr_peerIoNewIncoming(session, &session->bandwidth, addr, port, socket);
2177
2178 handshake = tr_handshakeNew(io, session->encryptionMode, myHandshakeDoneCB, manager);
2179
2180 tr_peerIoUnref(io); /* balanced by the implicit ref in tr_peerIoNewIncoming() */
2181
2182 tr_ptrArrayInsertSorted(&manager->incomingHandshakes, handshake, handshakeCompare);
2183 }
2184
2185 managerUnlock(manager);
2186 }
2187
tr_peerMgrAddPex(tr_torrent * tor,uint8_t from,tr_pex const * pex,int8_t seedProbability)2188 void tr_peerMgrAddPex(tr_torrent* tor, uint8_t from, tr_pex const* pex, int8_t seedProbability)
2189 {
2190 if (tr_isPex(pex)) /* safeguard against corrupt data */
2191 {
2192 tr_swarm* s = tor->swarm;
2193 managerLock(s->manager);
2194
2195 if (!tr_sessionIsAddressBlocked(s->manager->session, &pex->addr))
2196 {
2197 if (tr_address_is_valid_for_peers(&pex->addr, pex->port))
2198 {
2199 ensureAtomExists(s, &pex->addr, pex->port, pex->flags, seedProbability, from);
2200 }
2201 }
2202
2203 managerUnlock(s->manager);
2204 }
2205 }
2206
tr_peerMgrCompactToPex(void const * compact,size_t compactLen,uint8_t const * added_f,size_t added_f_len,size_t * pexCount)2207 tr_pex* tr_peerMgrCompactToPex(void const* compact, size_t compactLen, uint8_t const* added_f, size_t added_f_len,
2208 size_t* pexCount)
2209 {
2210 size_t n = compactLen / 6;
2211 uint8_t const* walk = compact;
2212 tr_pex* pex = tr_new0(tr_pex, n);
2213
2214 for (size_t i = 0; i < n; ++i)
2215 {
2216 pex[i].addr.type = TR_AF_INET;
2217 memcpy(&pex[i].addr.addr, walk, 4);
2218 walk += 4;
2219 memcpy(&pex[i].port, walk, 2);
2220 walk += 2;
2221
2222 if (added_f != NULL && n == added_f_len)
2223 {
2224 pex[i].flags = added_f[i];
2225 }
2226 }
2227
2228 *pexCount = n;
2229 return pex;
2230 }
2231
tr_peerMgrCompact6ToPex(void const * compact,size_t compactLen,uint8_t const * added_f,size_t added_f_len,size_t * pexCount)2232 tr_pex* tr_peerMgrCompact6ToPex(void const* compact, size_t compactLen, uint8_t const* added_f, size_t added_f_len,
2233 size_t* pexCount)
2234 {
2235 size_t n = compactLen / 18;
2236 uint8_t const* walk = compact;
2237 tr_pex* pex = tr_new0(tr_pex, n);
2238
2239 for (size_t i = 0; i < n; ++i)
2240 {
2241 pex[i].addr.type = TR_AF_INET6;
2242 memcpy(&pex[i].addr.addr.addr6.s6_addr, walk, 16);
2243 walk += 16;
2244 memcpy(&pex[i].port, walk, 2);
2245 walk += 2;
2246
2247 if (added_f != NULL && n == added_f_len)
2248 {
2249 pex[i].flags = added_f[i];
2250 }
2251 }
2252
2253 *pexCount = n;
2254 return pex;
2255 }
2256
2257 /**
2258 ***
2259 **/
2260
tr_peerMgrGotBadPiece(tr_torrent * tor,tr_piece_index_t pieceIndex)2261 void tr_peerMgrGotBadPiece(tr_torrent* tor, tr_piece_index_t pieceIndex)
2262 {
2263 tr_swarm* s = tor->swarm;
2264 uint32_t const byteCount = tr_torPieceCountBytes(tor, pieceIndex);
2265
2266 for (int i = 0, n = tr_ptrArraySize(&s->peers); i != n; ++i)
2267 {
2268 tr_peer* peer = tr_ptrArrayNth(&s->peers, i);
2269
2270 if (tr_bitfieldHas(&peer->blame, pieceIndex))
2271 {
2272 tordbg(s, "peer %s contributed to corrupt piece (%d); now has %d strikes", tr_atomAddrStr(peer->atom), pieceIndex,
2273 (int)peer->strikes + 1);
2274 addStrike(s, peer);
2275 }
2276 }
2277
2278 tr_announcerAddBytes(tor, TR_ANN_CORRUPT, byteCount);
2279 }
2280
tr_pexCompare(void const * va,void const * vb)2281 int tr_pexCompare(void const* va, void const* vb)
2282 {
2283 tr_pex const* a = va;
2284 tr_pex const* b = vb;
2285
2286 TR_ASSERT(tr_isPex(a));
2287 TR_ASSERT(tr_isPex(b));
2288
2289 int i;
2290
2291 if ((i = tr_address_compare(&a->addr, &b->addr)) != 0)
2292 {
2293 return i;
2294 }
2295
2296 if (a->port != b->port)
2297 {
2298 return a->port < b->port ? -1 : 1;
2299 }
2300
2301 return 0;
2302 }
2303
2304 /* better goes first */
compareAtomsByUsefulness(void const * va,void const * vb)2305 static int compareAtomsByUsefulness(void const* va, void const* vb)
2306 {
2307 struct peer_atom const* a = *(struct peer_atom const* const*)va;
2308 struct peer_atom const* b = *(struct peer_atom const* const*)vb;
2309
2310 TR_ASSERT(tr_isAtom(a));
2311 TR_ASSERT(tr_isAtom(b));
2312
2313 if (a->piece_data_time != b->piece_data_time)
2314 {
2315 return a->piece_data_time > b->piece_data_time ? -1 : 1;
2316 }
2317
2318 if (a->fromBest != b->fromBest)
2319 {
2320 return a->fromBest < b->fromBest ? -1 : 1;
2321 }
2322
2323 if (a->numFails != b->numFails)
2324 {
2325 return a->numFails < b->numFails ? -1 : 1;
2326 }
2327
2328 return 0;
2329 }
2330
isAtomInteresting(tr_torrent const * tor,struct peer_atom * atom)2331 static bool isAtomInteresting(tr_torrent const* tor, struct peer_atom* atom)
2332 {
2333 if (tr_torrentIsSeed(tor) && atomIsSeed(atom))
2334 {
2335 return false;
2336 }
2337
2338 if (peerIsInUse(tor->swarm, atom))
2339 {
2340 return true;
2341 }
2342
2343 if (isAtomBlocklisted(tor->session, atom))
2344 {
2345 return false;
2346 }
2347
2348 if ((atom->flags2 & MYFLAG_BANNED) != 0)
2349 {
2350 return false;
2351 }
2352
2353 return true;
2354 }
2355
tr_peerMgrGetPeers(tr_torrent * tor,tr_pex ** setme_pex,uint8_t af,uint8_t list_mode,int maxCount)2356 int tr_peerMgrGetPeers(tr_torrent* tor, tr_pex** setme_pex, uint8_t af, uint8_t list_mode, int maxCount)
2357 {
2358 TR_ASSERT(tr_isTorrent(tor));
2359 TR_ASSERT(setme_pex != NULL);
2360 TR_ASSERT(af == TR_AF_INET || af == TR_AF_INET6);
2361 TR_ASSERT(list_mode == TR_PEERS_CONNECTED || list_mode == TR_PEERS_INTERESTING);
2362
2363 int n;
2364 int count = 0;
2365 int atomCount = 0;
2366 tr_swarm const* s = tor->swarm;
2367 struct peer_atom** atoms = NULL;
2368 tr_pex* pex;
2369 tr_pex* walk;
2370
2371 managerLock(s->manager);
2372
2373 /**
2374 *** build a list of atoms
2375 **/
2376
2377 if (list_mode == TR_PEERS_CONNECTED) /* connected peers only */
2378 {
2379 tr_peer const** peers = (tr_peer const**)tr_ptrArrayBase(&s->peers);
2380 atomCount = tr_ptrArraySize(&s->peers);
2381 atoms = tr_new(struct peer_atom*, atomCount);
2382
2383 for (int i = 0; i < atomCount; ++i)
2384 {
2385 atoms[i] = peers[i]->atom;
2386 }
2387 }
2388 else /* TR_PEERS_INTERESTING */
2389 {
2390 struct peer_atom** atomBase = (struct peer_atom**)tr_ptrArrayBase(&s->pool);
2391 n = tr_ptrArraySize(&s->pool);
2392 atoms = tr_new(struct peer_atom*, n);
2393
2394 for (int i = 0; i < n; ++i)
2395 {
2396 if (isAtomInteresting(tor, atomBase[i]))
2397 {
2398 atoms[atomCount++] = atomBase[i];
2399 }
2400 }
2401 }
2402
2403 qsort(atoms, atomCount, sizeof(struct peer_atom*), compareAtomsByUsefulness);
2404
2405 /**
2406 *** add the first N of them into our return list
2407 **/
2408
2409 n = MIN(atomCount, maxCount);
2410 pex = walk = tr_new0(tr_pex, n);
2411
2412 for (int i = 0; i < atomCount && count < n; ++i)
2413 {
2414 struct peer_atom const* atom = atoms[i];
2415
2416 if (atom->addr.type == af)
2417 {
2418 TR_ASSERT(tr_address_is_valid(&atom->addr));
2419
2420 walk->addr = atom->addr;
2421 walk->port = atom->port;
2422 walk->flags = atom->flags;
2423 ++count;
2424 ++walk;
2425 }
2426 }
2427
2428 qsort(pex, count, sizeof(tr_pex), tr_pexCompare);
2429
2430 TR_ASSERT(walk - pex == count);
2431 *setme_pex = pex;
2432
2433 /* cleanup */
2434 tr_free(atoms);
2435 managerUnlock(s->manager);
2436 return count;
2437 }
2438
2439 static void atomPulse(evutil_socket_t, short, void*);
2440 static void bandwidthPulse(evutil_socket_t, short, void*);
2441 static void rechokePulse(evutil_socket_t, short, void*);
2442 static void reconnectPulse(evutil_socket_t, short, void*);
2443
createTimer(tr_session * session,int msec,event_callback_fn callback,void * cbdata)2444 static struct event* createTimer(tr_session* session, int msec, event_callback_fn callback, void* cbdata)
2445 {
2446 struct event* timer = evtimer_new(session->event_base, callback, cbdata);
2447 tr_timerAddMsec(timer, msec);
2448 return timer;
2449 }
2450
ensureMgrTimersExist(struct tr_peerMgr * m)2451 static void ensureMgrTimersExist(struct tr_peerMgr* m)
2452 {
2453 if (m->atomTimer == NULL)
2454 {
2455 m->atomTimer = createTimer(m->session, ATOM_PERIOD_MSEC, atomPulse, m);
2456 }
2457
2458 if (m->bandwidthTimer == NULL)
2459 {
2460 m->bandwidthTimer = createTimer(m->session, BANDWIDTH_PERIOD_MSEC, bandwidthPulse, m);
2461 }
2462
2463 if (m->rechokeTimer == NULL)
2464 {
2465 m->rechokeTimer = createTimer(m->session, RECHOKE_PERIOD_MSEC, rechokePulse, m);
2466 }
2467
2468 if (m->refillUpkeepTimer == NULL)
2469 {
2470 m->refillUpkeepTimer = createTimer(m->session, REFILL_UPKEEP_PERIOD_MSEC, refillUpkeep, m);
2471 }
2472 }
2473
tr_peerMgrStartTorrent(tr_torrent * tor)2474 void tr_peerMgrStartTorrent(tr_torrent* tor)
2475 {
2476 TR_ASSERT(tr_isTorrent(tor));
2477 TR_ASSERT(tr_torrentIsLocked(tor));
2478
2479 tr_swarm* s = tor->swarm;
2480
2481 ensureMgrTimersExist(s->manager);
2482
2483 s->isRunning = true;
2484 s->maxPeers = tor->maxConnectedPeers;
2485 s->pieceSortState = PIECES_UNSORTED;
2486
2487 rechokePulse(0, 0, s->manager);
2488 }
2489
2490 static void removeAllPeers(tr_swarm*);
2491
stopSwarm(tr_swarm * swarm)2492 static void stopSwarm(tr_swarm* swarm)
2493 {
2494 swarm->isRunning = false;
2495
2496 replicationFree(swarm);
2497 invalidatePieceSorting(swarm);
2498
2499 removeAllPeers(swarm);
2500
2501 /* disconnect the handshakes. handshakeAbort calls handshakeDoneCB(),
2502 * which removes the handshake from t->outgoingHandshakes... */
2503 while (!tr_ptrArrayEmpty(&swarm->outgoingHandshakes))
2504 {
2505 tr_handshakeAbort(tr_ptrArrayNth(&swarm->outgoingHandshakes, 0));
2506 }
2507 }
2508
tr_peerMgrStopTorrent(tr_torrent * tor)2509 void tr_peerMgrStopTorrent(tr_torrent* tor)
2510 {
2511 TR_ASSERT(tr_isTorrent(tor));
2512 TR_ASSERT(tr_torrentIsLocked(tor));
2513
2514 stopSwarm(tor->swarm);
2515 }
2516
tr_peerMgrAddTorrent(tr_peerMgr * manager,tr_torrent * tor)2517 void tr_peerMgrAddTorrent(tr_peerMgr* manager, tr_torrent* tor)
2518 {
2519 TR_ASSERT(tr_isTorrent(tor));
2520 TR_ASSERT(tr_torrentIsLocked(tor));
2521 TR_ASSERT(tor->swarm == NULL);
2522
2523 tor->swarm = swarmNew(manager, tor);
2524 }
2525
tr_peerMgrRemoveTorrent(tr_torrent * tor)2526 void tr_peerMgrRemoveTorrent(tr_torrent* tor)
2527 {
2528 TR_ASSERT(tr_isTorrent(tor));
2529 TR_ASSERT(tr_torrentIsLocked(tor));
2530
2531 stopSwarm(tor->swarm);
2532 swarmFree(tor->swarm);
2533 }
2534
tr_peerUpdateProgress(tr_torrent * tor,tr_peer * peer)2535 void tr_peerUpdateProgress(tr_torrent* tor, tr_peer* peer)
2536 {
2537 tr_bitfield const* have = &peer->have;
2538
2539 if (tr_bitfieldHasAll(have))
2540 {
2541 peer->progress = 1.0;
2542 }
2543 else if (tr_bitfieldHasNone(have))
2544 {
2545 peer->progress = 0.0;
2546 }
2547 else
2548 {
2549 float const true_count = tr_bitfieldCountTrueBits(have);
2550
2551 if (tr_torrentHasMetadata(tor))
2552 {
2553 peer->progress = true_count / tor->info.pieceCount;
2554 }
2555 else /* without pieceCount, this result is only a best guess... */
2556 {
2557 peer->progress = true_count / (have->bit_count + 1);
2558 }
2559 }
2560
2561 /* clamp the progress range */
2562 if (peer->progress < 0.0)
2563 {
2564 peer->progress = 0.0;
2565 }
2566
2567 if (peer->progress > 1.0)
2568 {
2569 peer->progress = 1.0;
2570 }
2571
2572 if (peer->atom != NULL && peer->progress >= 1.0)
2573 {
2574 atomSetSeed(tor->swarm, peer->atom);
2575 }
2576 }
2577
tr_peerMgrOnTorrentGotMetainfo(tr_torrent * tor)2578 void tr_peerMgrOnTorrentGotMetainfo(tr_torrent* tor)
2579 {
2580 int peerCount;
2581 tr_peer** peers;
2582
2583 /* the webseed list may have changed... */
2584 rebuildWebseedArray(tor->swarm, tor);
2585
2586 /* some peer_msgs' progress fields may not be accurate if we
2587 didn't have the metadata before now... so refresh them all... */
2588 peerCount = tr_ptrArraySize(&tor->swarm->peers);
2589 peers = (tr_peer**)tr_ptrArrayBase(&tor->swarm->peers);
2590
2591 for (int i = 0; i < peerCount; ++i)
2592 {
2593 tr_peerUpdateProgress(tor, peers[i]);
2594 }
2595
2596 /* update the bittorrent peers' willingnes... */
2597 for (int i = 0; i < peerCount; ++i)
2598 {
2599 tr_peerMsgsUpdateActive(tr_peerMsgsCast(peers[i]), TR_UP);
2600 tr_peerMsgsUpdateActive(tr_peerMsgsCast(peers[i]), TR_DOWN);
2601 }
2602 }
2603
tr_peerMgrTorrentAvailability(tr_torrent const * tor,int8_t * tab,unsigned int tabCount)2604 void tr_peerMgrTorrentAvailability(tr_torrent const* tor, int8_t* tab, unsigned int tabCount)
2605 {
2606 TR_ASSERT(tr_isTorrent(tor));
2607 TR_ASSERT(tab != NULL);
2608 TR_ASSERT(tabCount > 0);
2609
2610 memset(tab, 0, tabCount);
2611
2612 if (tr_torrentHasMetadata(tor))
2613 {
2614 int const peerCount = tr_ptrArraySize(&tor->swarm->peers);
2615 tr_peer const** peers = (tr_peer const**)tr_ptrArrayBase(&tor->swarm->peers);
2616 float const interval = tor->info.pieceCount / (float)tabCount;
2617 bool const isSeed = tr_torrentGetCompleteness(tor) == TR_SEED;
2618
2619 for (tr_piece_index_t i = 0; i < tabCount; ++i)
2620 {
2621 int const piece = i * interval;
2622
2623 if (isSeed || tr_torrentPieceIsComplete(tor, piece))
2624 {
2625 tab[i] = -1;
2626 }
2627 else if (peerCount != 0)
2628 {
2629 for (int j = 0; j < peerCount; ++j)
2630 {
2631 if (tr_bitfieldHas(&peers[j]->have, piece))
2632 {
2633 ++tab[i];
2634 }
2635 }
2636 }
2637 }
2638 }
2639 }
2640
tr_swarmGetStats(tr_swarm const * swarm,tr_swarm_stats * setme)2641 void tr_swarmGetStats(tr_swarm const* swarm, tr_swarm_stats* setme)
2642 {
2643 TR_ASSERT(swarm != NULL);
2644 TR_ASSERT(setme != NULL);
2645
2646 *setme = swarm->stats;
2647 }
2648
tr_swarmIncrementActivePeers(tr_swarm * swarm,tr_direction direction,bool is_active)2649 void tr_swarmIncrementActivePeers(tr_swarm* swarm, tr_direction direction, bool is_active)
2650 {
2651 int n = swarm->stats.activePeerCount[direction];
2652
2653 if (is_active)
2654 {
2655 ++n;
2656 }
2657 else
2658 {
2659 --n;
2660 }
2661
2662 TR_ASSERT(n >= 0);
2663 TR_ASSERT(n <= swarm->stats.peerCount);
2664
2665 swarm->stats.activePeerCount[direction] = n;
2666 }
2667
tr_peerIsSeed(tr_peer const * peer)2668 bool tr_peerIsSeed(tr_peer const* peer)
2669 {
2670 if (peer->progress >= 1.0)
2671 {
2672 return true;
2673 }
2674
2675 if (peer->atom != NULL && atomIsSeed(peer->atom))
2676 {
2677 return true;
2678 }
2679
2680 return false;
2681 }
2682
2683 /* count how many bytes we want that connected peers have */
tr_peerMgrGetDesiredAvailable(tr_torrent const * tor)2684 uint64_t tr_peerMgrGetDesiredAvailable(tr_torrent const* tor)
2685 {
2686 TR_ASSERT(tr_isTorrent(tor));
2687
2688 /* common shortcuts... */
2689
2690 if (!tor->isRunning || tor->isStopping || tr_torrentIsSeed(tor) || !tr_torrentHasMetadata(tor))
2691 {
2692 return 0;
2693 }
2694
2695 tr_swarm const* s = tor->swarm;
2696
2697 if (s == NULL || !s->isRunning)
2698 {
2699 return 0;
2700 }
2701
2702 size_t const n = tr_ptrArraySize(&s->peers);
2703
2704 if (n == 0)
2705 {
2706 return 0;
2707 }
2708 else
2709 {
2710 tr_peer const** peers = (tr_peer const**)tr_ptrArrayBase(&s->peers);
2711
2712 for (size_t i = 0; i < n; ++i)
2713 {
2714 if (peers[i]->atom != NULL && atomIsSeed(peers[i]->atom))
2715 {
2716 return tr_torrentGetLeftUntilDone(tor);
2717 }
2718 }
2719 }
2720
2721 if (s->pieceReplication == NULL || s->pieceReplicationSize == 0)
2722 {
2723 return 0;
2724 }
2725
2726 /* do it the hard way */
2727
2728 uint64_t desiredAvailable = 0;
2729
2730 for (size_t i = 0, n = MIN(tor->info.pieceCount, s->pieceReplicationSize); i < n; ++i)
2731 {
2732 if (!tor->info.pieces[i].dnd && s->pieceReplication[i] > 0)
2733 {
2734 desiredAvailable += tr_torrentMissingBytesInPiece(tor, i);
2735 }
2736 }
2737
2738 TR_ASSERT(desiredAvailable <= tor->info.totalSize);
2739 return desiredAvailable;
2740 }
2741
tr_peerMgrWebSpeeds_KBps(tr_torrent const * tor)2742 double* tr_peerMgrWebSpeeds_KBps(tr_torrent const* tor)
2743 {
2744 TR_ASSERT(tr_isTorrent(tor));
2745
2746 uint64_t const now = tr_time_msec();
2747
2748 tr_swarm* s = tor->swarm;
2749 TR_ASSERT(s->manager != NULL);
2750
2751 unsigned int n = tr_ptrArraySize(&s->webseeds);
2752 TR_ASSERT(n == tor->info.webseedCount);
2753
2754 double* ret = tr_new0(double, n);
2755
2756 for (unsigned int i = 0; i < n; ++i)
2757 {
2758 unsigned int Bps = 0;
2759
2760 if (tr_peerIsTransferringPieces(tr_ptrArrayNth(&s->webseeds, i), now, TR_DOWN, &Bps))
2761 {
2762 ret[i] = Bps / (double)tr_speed_K;
2763 }
2764 else
2765 {
2766 ret[i] = -1.0;
2767 }
2768 }
2769
2770 return ret;
2771 }
2772
tr_peerMgrPeerStats(tr_torrent const * tor,int * setmeCount)2773 struct tr_peer_stat* tr_peerMgrPeerStats(tr_torrent const* tor, int* setmeCount)
2774 {
2775 TR_ASSERT(tr_isTorrent(tor));
2776 TR_ASSERT(tor->swarm->manager != NULL);
2777
2778 time_t const now = tr_time();
2779 uint64_t const now_msec = tr_time_msec();
2780
2781 tr_swarm const* s = tor->swarm;
2782 tr_peer** peers = (tr_peer**)tr_ptrArrayBase(&s->peers);
2783 int size = tr_ptrArraySize(&s->peers);
2784 tr_peer_stat* ret = tr_new0(tr_peer_stat, size);
2785
2786 for (int i = 0; i < size; ++i)
2787 {
2788 char* pch;
2789 tr_peer* peer = peers[i];
2790 tr_peerMsgs* msgs = PEER_MSGS(peer);
2791 struct peer_atom const* atom = peer->atom;
2792 tr_peer_stat* stat = ret + i;
2793
2794 tr_address_to_string_with_buf(&atom->addr, stat->addr, sizeof(stat->addr));
2795 tr_strlcpy(stat->client, tr_quark_get_string(peer->client, NULL), sizeof(stat->client));
2796 stat->port = ntohs(peer->atom->port);
2797 stat->from = atom->fromFirst;
2798 stat->progress = peer->progress;
2799 stat->isUTP = tr_peerMsgsIsUtpConnection(msgs);
2800 stat->isEncrypted = tr_peerMsgsIsEncrypted(msgs);
2801 stat->rateToPeer_KBps = toSpeedKBps(tr_peerGetPieceSpeed_Bps(peer, now_msec, TR_CLIENT_TO_PEER));
2802 stat->rateToClient_KBps = toSpeedKBps(tr_peerGetPieceSpeed_Bps(peer, now_msec, TR_PEER_TO_CLIENT));
2803 stat->peerIsChoked = tr_peerMsgsIsPeerChoked(msgs);
2804 stat->peerIsInterested = tr_peerMsgsIsPeerInterested(msgs);
2805 stat->clientIsChoked = tr_peerMsgsIsClientChoked(msgs);
2806 stat->clientIsInterested = tr_peerMsgsIsClientInterested(msgs);
2807 stat->isIncoming = tr_peerMsgsIsIncomingConnection(msgs);
2808 stat->isDownloadingFrom = tr_peerMsgsIsActive(msgs, TR_PEER_TO_CLIENT);
2809 stat->isUploadingTo = tr_peerMsgsIsActive(msgs, TR_CLIENT_TO_PEER);
2810 stat->isSeed = tr_peerIsSeed(peer);
2811
2812 stat->blocksToPeer = tr_historyGet(&peer->blocksSentToPeer, now, CANCEL_HISTORY_SEC);
2813 stat->blocksToClient = tr_historyGet(&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
2814 stat->cancelsToPeer = tr_historyGet(&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
2815 stat->cancelsToClient = tr_historyGet(&peer->cancelsSentToClient, now, CANCEL_HISTORY_SEC);
2816
2817 stat->pendingReqsToPeer = peer->pendingReqsToPeer;
2818 stat->pendingReqsToClient = peer->pendingReqsToClient;
2819
2820 pch = stat->flagStr;
2821
2822 if (stat->isUTP)
2823 {
2824 *pch++ = 'T';
2825 }
2826
2827 if (s->optimistic == msgs)
2828 {
2829 *pch++ = 'O';
2830 }
2831
2832 if (stat->isDownloadingFrom)
2833 {
2834 *pch++ = 'D';
2835 }
2836 else if (stat->clientIsInterested)
2837 {
2838 *pch++ = 'd';
2839 }
2840
2841 if (stat->isUploadingTo)
2842 {
2843 *pch++ = 'U';
2844 }
2845 else if (stat->peerIsInterested)
2846 {
2847 *pch++ = 'u';
2848 }
2849
2850 if (!stat->clientIsChoked && !stat->clientIsInterested)
2851 {
2852 *pch++ = 'K';
2853 }
2854
2855 if (!stat->peerIsChoked && !stat->peerIsInterested)
2856 {
2857 *pch++ = '?';
2858 }
2859
2860 if (stat->isEncrypted)
2861 {
2862 *pch++ = 'E';
2863 }
2864
2865 if (stat->from == TR_PEER_FROM_DHT)
2866 {
2867 *pch++ = 'H';
2868 }
2869 else if (stat->from == TR_PEER_FROM_PEX)
2870 {
2871 *pch++ = 'X';
2872 }
2873
2874 if (stat->isIncoming)
2875 {
2876 *pch++ = 'I';
2877 }
2878
2879 *pch = '\0';
2880 }
2881
2882 *setmeCount = size;
2883 return ret;
2884 }
2885
2886 /***
2887 ****
2888 ****
2889 ***/
2890
tr_peerMgrClearInterest(tr_torrent * tor)2891 void tr_peerMgrClearInterest(tr_torrent* tor)
2892 {
2893 TR_ASSERT(tr_isTorrent(tor));
2894 TR_ASSERT(tr_torrentIsLocked(tor));
2895
2896 tr_swarm* s = tor->swarm;
2897 int const peerCount = tr_ptrArraySize(&s->peers);
2898
2899 for (int i = 0; i < peerCount; ++i)
2900 {
2901 tr_peerMsgsSetInterested(tr_ptrArrayNth(&s->peers, i), false);
2902 }
2903 }
2904
2905 /* does this peer have any pieces that we want? */
isPeerInteresting(tr_torrent * const tor,bool const * const piece_is_interesting,tr_peer const * const peer)2906 static bool isPeerInteresting(tr_torrent* const tor, bool const* const piece_is_interesting, tr_peer const* const peer)
2907 {
2908 /* these cases should have already been handled by the calling code... */
2909 TR_ASSERT(!tr_torrentIsSeed(tor));
2910 TR_ASSERT(tr_torrentIsPieceTransferAllowed(tor, TR_PEER_TO_CLIENT));
2911
2912 if (tr_peerIsSeed(peer))
2913 {
2914 return true;
2915 }
2916
2917 for (tr_piece_index_t i = 0; i < tor->info.pieceCount; ++i)
2918 {
2919 if (piece_is_interesting[i] && tr_bitfieldHas(&peer->have, i))
2920 {
2921 return true;
2922 }
2923 }
2924
2925 return false;
2926 }
2927
2928 typedef enum
2929 {
2930 RECHOKE_STATE_GOOD,
2931 RECHOKE_STATE_UNTESTED,
2932 RECHOKE_STATE_BAD
2933 }
2934 tr_rechoke_state;
2935
2936 struct tr_rechoke_info
2937 {
2938 tr_peer* peer;
2939 int salt;
2940 int rechoke_state;
2941 };
2942
compare_rechoke_info(void const * va,void const * vb)2943 static int compare_rechoke_info(void const* va, void const* vb)
2944 {
2945 struct tr_rechoke_info const* a = va;
2946 struct tr_rechoke_info const* b = vb;
2947
2948 if (a->rechoke_state != b->rechoke_state)
2949 {
2950 return a->rechoke_state - b->rechoke_state;
2951 }
2952
2953 return a->salt - b->salt;
2954 }
2955
2956 /* determines who we send "interested" messages to */
rechokeDownloads(tr_swarm * s)2957 static void rechokeDownloads(tr_swarm* s)
2958 {
2959 int maxPeers = 0;
2960 int rechoke_count = 0;
2961 struct tr_rechoke_info* rechoke = NULL;
2962 int const MIN_INTERESTING_PEERS = 5;
2963 int const peerCount = tr_ptrArraySize(&s->peers);
2964 time_t const now = tr_time();
2965
2966 /* some cases where this function isn't necessary */
2967 if (tr_torrentIsSeed(s->tor))
2968 {
2969 return;
2970 }
2971
2972 if (!tr_torrentIsPieceTransferAllowed(s->tor, TR_PEER_TO_CLIENT))
2973 {
2974 return;
2975 }
2976
2977 /* decide HOW MANY peers to be interested in */
2978 {
2979 int blocks = 0;
2980 int cancels = 0;
2981 time_t timeSinceCancel;
2982
2983 /* Count up how many blocks & cancels each peer has.
2984 *
2985 * There are two situations where we send out cancels --
2986 *
2987 * 1. We've got unresponsive peers, which is handled by deciding
2988 * -which- peers to be interested in.
2989 *
2990 * 2. We've hit our bandwidth cap, which is handled by deciding
2991 * -how many- peers to be interested in.
2992 *
2993 * We're working on 2. here, so we need to ignore unresponsive
2994 * peers in our calculations lest they confuse Transmission into
2995 * thinking it's hit its bandwidth cap.
2996 */
2997 for (int i = 0; i < peerCount; ++i)
2998 {
2999 tr_peer const* peer = tr_ptrArrayNth(&s->peers, i);
3000 int const b = tr_historyGet(&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
3001 int const c = tr_historyGet(&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
3002
3003 if (b == 0) /* ignore unresponsive peers, as described above */
3004 {
3005 continue;
3006 }
3007
3008 blocks += b;
3009 cancels += c;
3010 }
3011
3012 if (cancels > 0)
3013 {
3014 /* cancelRate: of the block requests we've recently made, the percentage we cancelled.
3015 * higher values indicate more congestion. */
3016 double const cancelRate = cancels / (double)(cancels + blocks);
3017 double const mult = 1 - MIN(cancelRate, 0.5);
3018 maxPeers = s->interestedCount * mult;
3019 tordbg(s, "cancel rate is %.3f -- reducing the number of peers we're interested in by %.0f percent", cancelRate,
3020 mult * 100);
3021 s->lastCancel = now;
3022 }
3023
3024 timeSinceCancel = now - s->lastCancel;
3025
3026 if (timeSinceCancel != 0)
3027 {
3028 int const maxIncrease = 15;
3029 time_t const maxHistory = 2 * CANCEL_HISTORY_SEC;
3030 double const mult = MIN(timeSinceCancel, maxHistory) / (double)maxHistory;
3031 int const inc = maxIncrease * mult;
3032 maxPeers = s->maxPeers + inc;
3033 tordbg(s, "time since last cancel is %jd -- increasing the number of peers we're interested in by %d",
3034 (intmax_t)timeSinceCancel, inc);
3035 }
3036 }
3037
3038 /* don't let the previous section's number tweaking go too far... */
3039 if (maxPeers < MIN_INTERESTING_PEERS)
3040 {
3041 maxPeers = MIN_INTERESTING_PEERS;
3042 }
3043
3044 if (maxPeers > s->tor->maxConnectedPeers)
3045 {
3046 maxPeers = s->tor->maxConnectedPeers;
3047 }
3048
3049 s->maxPeers = maxPeers;
3050
3051 if (peerCount > 0)
3052 {
3053 bool* piece_is_interesting;
3054 tr_torrent const* const tor = s->tor;
3055 int const n = tor->info.pieceCount;
3056
3057 /* build a bitfield of interesting pieces... */
3058 piece_is_interesting = tr_new(bool, n);
3059
3060 for (int i = 0; i < n; ++i)
3061 {
3062 piece_is_interesting[i] = !tor->info.pieces[i].dnd && !tr_torrentPieceIsComplete(tor, i);
3063 }
3064
3065 /* decide WHICH peers to be interested in (based on their cancel-to-block ratio) */
3066 for (int i = 0; i < peerCount; ++i)
3067 {
3068 tr_peer* peer = tr_ptrArrayNth(&s->peers, i);
3069
3070 if (!isPeerInteresting(s->tor, piece_is_interesting, peer))
3071 {
3072 tr_peerMsgsSetInterested(PEER_MSGS(peer), false);
3073 }
3074 else
3075 {
3076 tr_rechoke_state rechoke_state;
3077 int const blocks = tr_historyGet(&peer->blocksSentToClient, now, CANCEL_HISTORY_SEC);
3078 int const cancels = tr_historyGet(&peer->cancelsSentToPeer, now, CANCEL_HISTORY_SEC);
3079
3080 if (blocks == 0 && cancels == 0)
3081 {
3082 rechoke_state = RECHOKE_STATE_UNTESTED;
3083 }
3084 else if (cancels == 0)
3085 {
3086 rechoke_state = RECHOKE_STATE_GOOD;
3087 }
3088 else if (blocks == 0)
3089 {
3090 rechoke_state = RECHOKE_STATE_BAD;
3091 }
3092 else if (cancels * 10 < blocks)
3093 {
3094 rechoke_state = RECHOKE_STATE_GOOD;
3095 }
3096 else
3097 {
3098 rechoke_state = RECHOKE_STATE_BAD;
3099 }
3100
3101 if (rechoke == NULL)
3102 {
3103 rechoke = tr_new(struct tr_rechoke_info, peerCount);
3104 }
3105
3106 rechoke[rechoke_count].peer = peer;
3107 rechoke[rechoke_count].rechoke_state = rechoke_state;
3108 rechoke[rechoke_count].salt = tr_rand_int_weak(INT_MAX);
3109 rechoke_count++;
3110 }
3111 }
3112
3113 tr_free(piece_is_interesting);
3114 }
3115
3116 /* now that we know which & how many peers to be interested in... update the peer interest */
3117 qsort(rechoke, rechoke_count, sizeof(struct tr_rechoke_info), compare_rechoke_info);
3118 s->interestedCount = MIN(maxPeers, rechoke_count);
3119
3120 for (int i = 0; i < rechoke_count; ++i)
3121 {
3122 tr_peerMsgsSetInterested(PEER_MSGS(rechoke[i].peer), i < s->interestedCount);
3123 }
3124
3125 /* cleanup */
3126 tr_free(rechoke);
3127 }
3128
3129 /**
3130 ***
3131 **/
3132
3133 struct ChokeData
3134 {
3135 bool isInterested;
3136 bool wasChoked;
3137 bool isChoked;
3138 int rate;
3139 int salt;
3140 tr_peerMsgs* msgs;
3141 };
3142
compareChoke(void const * va,void const * vb)3143 static int compareChoke(void const* va, void const* vb)
3144 {
3145 struct ChokeData const* a = va;
3146 struct ChokeData const* b = vb;
3147
3148 if (a->rate != b->rate) /* prefer higher overall speeds */
3149 {
3150 return a->rate > b->rate ? -1 : 1;
3151 }
3152
3153 if (a->wasChoked != b->wasChoked) /* prefer unchoked */
3154 {
3155 return a->wasChoked ? 1 : -1;
3156 }
3157
3158 if (a->salt != b->salt) /* random order */
3159 {
3160 return a->salt - b->salt;
3161 }
3162
3163 return 0;
3164 }
3165
3166 /* is this a new connection? */
isNew(tr_peerMsgs const * msgs)3167 static bool isNew(tr_peerMsgs const* msgs)
3168 {
3169 return msgs != NULL && tr_peerMsgsGetConnectionAge(msgs) < 45;
3170 }
3171
3172 /* get a rate for deciding which peers to choke and unchoke. */
getRate(tr_torrent const * tor,struct peer_atom * atom,uint64_t now)3173 static int getRate(tr_torrent const* tor, struct peer_atom* atom, uint64_t now)
3174 {
3175 unsigned int Bps;
3176
3177 if (tr_torrentIsSeed(tor))
3178 {
3179 Bps = tr_peerGetPieceSpeed_Bps(atom->peer, now, TR_CLIENT_TO_PEER);
3180 }
3181 /* downloading a private torrent... take upload speed into account
3182 * because there may only be a small window of opportunity to share */
3183 else if (tr_torrentIsPrivate(tor))
3184 {
3185 Bps = tr_peerGetPieceSpeed_Bps(atom->peer, now, TR_PEER_TO_CLIENT) + tr_peerGetPieceSpeed_Bps(atom->peer, now,
3186 TR_CLIENT_TO_PEER);
3187 }
3188 /* downloading a public torrent */
3189 else
3190 {
3191 Bps = tr_peerGetPieceSpeed_Bps(atom->peer, now, TR_PEER_TO_CLIENT);
3192 }
3193
3194 /* convert it to bytes per second */
3195 return Bps;
3196 }
3197
isBandwidthMaxedOut(tr_bandwidth const * b,uint64_t const now_msec,tr_direction dir)3198 static inline bool isBandwidthMaxedOut(tr_bandwidth const* b, uint64_t const now_msec, tr_direction dir)
3199 {
3200 if (!tr_bandwidthIsLimited(b, dir))
3201 {
3202 return false;
3203 }
3204 else
3205 {
3206 unsigned int const got = tr_bandwidthGetPieceSpeed_Bps(b, now_msec, dir);
3207 unsigned int const want = tr_bandwidthGetDesiredSpeed_Bps(b, dir);
3208 return got >= want;
3209 }
3210 }
3211
rechokeUploads(tr_swarm * s,uint64_t const now)3212 static void rechokeUploads(tr_swarm* s, uint64_t const now)
3213 {
3214 TR_ASSERT(swarmIsLocked(s));
3215
3216 int const peerCount = tr_ptrArraySize(&s->peers);
3217 tr_peer** peers = (tr_peer**)tr_ptrArrayBase(&s->peers);
3218 struct ChokeData* choke = tr_new0(struct ChokeData, peerCount);
3219 tr_session const* session = s->manager->session;
3220 bool const chokeAll = !tr_torrentIsPieceTransferAllowed(s->tor, TR_CLIENT_TO_PEER);
3221 bool const isMaxedOut = isBandwidthMaxedOut(&s->tor->bandwidth, now, TR_UP);
3222
3223 /* an optimistic unchoke peer's "optimistic"
3224 * state lasts for N calls to rechokeUploads(). */
3225 if (s->optimisticUnchokeTimeScaler > 0)
3226 {
3227 s->optimisticUnchokeTimeScaler--;
3228 }
3229 else
3230 {
3231 s->optimistic = NULL;
3232 }
3233
3234 int size = 0;
3235
3236 /* sort the peers by preference and rate */
3237 for (int i = 0; i < peerCount; ++i)
3238 {
3239 tr_peer* peer = peers[i];
3240 tr_peerMsgs* msgs = PEER_MSGS(peer);
3241
3242 struct peer_atom* atom = peer->atom;
3243
3244 if (tr_peerIsSeed(peer))
3245 {
3246 /* choke seeds and partial seeds */
3247 tr_peerMsgsSetChoke(PEER_MSGS(peer), true);
3248 }
3249 else if (chokeAll)
3250 {
3251 /* choke everyone if we're not uploading */
3252 tr_peerMsgsSetChoke(PEER_MSGS(peer), true);
3253 }
3254 else if (msgs != s->optimistic)
3255 {
3256 struct ChokeData* n = &choke[size++];
3257 n->msgs = msgs;
3258 n->isInterested = tr_peerMsgsIsPeerInterested(msgs);
3259 n->wasChoked = tr_peerMsgsIsPeerChoked(msgs);
3260 n->rate = getRate(s->tor, atom, now);
3261 n->salt = tr_rand_int_weak(INT_MAX);
3262 n->isChoked = true;
3263 }
3264 }
3265
3266 qsort(choke, size, sizeof(struct ChokeData), compareChoke);
3267
3268 /**
3269 * Reciprocation and number of uploads capping is managed by unchoking
3270 * the N peers which have the best upload rate and are interested.
3271 * This maximizes the client's download rate. These N peers are
3272 * referred to as downloaders, because they are interested in downloading
3273 * from the client.
3274 *
3275 * Peers which have a better upload rate (as compared to the downloaders)
3276 * but aren't interested get unchoked. If they become interested, the
3277 * downloader with the worst upload rate gets choked. If a client has
3278 * a complete file, it uses its upload rate rather than its download
3279 * rate to decide which peers to unchoke.
3280 *
3281 * If our bandwidth is maxed out, don't unchoke any more peers.
3282 */
3283 int unchokedInterested = 0;
3284 int checkedChokeCount = 0;
3285
3286 for (int i = 0; i < size && unchokedInterested < session->uploadSlotsPerTorrent; ++i, ++checkedChokeCount)
3287 {
3288 choke[i].isChoked = isMaxedOut ? choke[i].wasChoked : false;
3289
3290 if (choke[i].isInterested)
3291 {
3292 ++unchokedInterested;
3293 }
3294 }
3295
3296 /* optimistic unchoke */
3297 if (s->optimistic == NULL && !isMaxedOut && checkedChokeCount < size)
3298 {
3299 int n;
3300 struct ChokeData* c;
3301 tr_ptrArray randPool = TR_PTR_ARRAY_INIT;
3302
3303 for (int i = checkedChokeCount; i < size; ++i)
3304 {
3305 if (choke[i].isInterested)
3306 {
3307 tr_peerMsgs const* msgs = choke[i].msgs;
3308 int const x = isNew(msgs) ? 3 : 1;
3309
3310 for (int y = 0; y < x; ++y)
3311 {
3312 tr_ptrArrayAppend(&randPool, &choke[i]);
3313 }
3314 }
3315 }
3316
3317 if ((n = tr_ptrArraySize(&randPool)) != 0)
3318 {
3319 c = tr_ptrArrayNth(&randPool, tr_rand_int_weak(n));
3320 c->isChoked = false;
3321 s->optimistic = c->msgs;
3322 s->optimisticUnchokeTimeScaler = OPTIMISTIC_UNCHOKE_MULTIPLIER;
3323 }
3324
3325 tr_ptrArrayDestruct(&randPool, NULL);
3326 }
3327
3328 for (int i = 0; i < size; ++i)
3329 {
3330 tr_peerMsgsSetChoke(choke[i].msgs, choke[i].isChoked);
3331 }
3332
3333 /* cleanup */
3334 tr_free(choke);
3335 }
3336
rechokePulse(evutil_socket_t foo UNUSED,short bar UNUSED,void * vmgr)3337 static void rechokePulse(evutil_socket_t foo UNUSED, short bar UNUSED, void* vmgr)
3338 {
3339 tr_torrent* tor = NULL;
3340 tr_peerMgr* mgr = vmgr;
3341 uint64_t const now = tr_time_msec();
3342
3343 managerLock(mgr);
3344
3345 while ((tor = tr_torrentNext(mgr->session, tor)) != NULL)
3346 {
3347 if (tor->isRunning)
3348 {
3349 tr_swarm* s = tor->swarm;
3350
3351 if (s->stats.peerCount > 0)
3352 {
3353 rechokeUploads(s, now);
3354 rechokeDownloads(s);
3355 }
3356 }
3357 }
3358
3359 tr_timerAddMsec(mgr->rechokeTimer, RECHOKE_PERIOD_MSEC);
3360 managerUnlock(mgr);
3361 }
3362
3363 /***
3364 ****
3365 **** Life and Death
3366 ****
3367 ***/
3368
shouldPeerBeClosed(tr_swarm const * s,tr_peer const * peer,int peerCount,time_t const now)3369 static bool shouldPeerBeClosed(tr_swarm const* s, tr_peer const* peer, int peerCount, time_t const now)
3370 {
3371 tr_torrent const* tor = s->tor;
3372 struct peer_atom const* atom = peer->atom;
3373
3374 /* if it's marked for purging, close it */
3375 if (peer->doPurge)
3376 {
3377 tordbg(s, "purging peer %s because its doPurge flag is set", tr_atomAddrStr(atom));
3378 return true;
3379 }
3380
3381 /* disconnect if we're both seeds and enough time has passed for PEX */
3382 if (tr_torrentIsSeed(tor) && tr_peerIsSeed(peer))
3383 {
3384 return !tr_torrentAllowsPex(tor) || now - atom->time >= 30;
3385 }
3386
3387 /* disconnect if it's been too long since piece data has been transferred.
3388 * this is on a sliding scale based on number of available peers... */
3389 {
3390 int const relaxStrictnessIfFewerThanN = (int)(getMaxPeerCount(tor) * 0.9 + 0.5);
3391 /* if we have >= relaxIfFewerThan, strictness is 100%.
3392 * if we have zero connections, strictness is 0% */
3393 float const strictness = peerCount >= relaxStrictnessIfFewerThanN ? 1.0 :
3394 peerCount / (float)relaxStrictnessIfFewerThanN;
3395 int const lo = MIN_UPLOAD_IDLE_SECS;
3396 int const hi = MAX_UPLOAD_IDLE_SECS;
3397 int const limit = hi - (hi - lo) * strictness;
3398 int const idleTime = now - MAX(atom->time, atom->piece_data_time);
3399
3400 /*
3401 fprintf(stderr, "strictness is %.3f, limit is %d seconds... time since connect is %d, time since piece is %d ... "
3402 "idleTime is %d, doPurge is %d\n", (double)strictness, limit, (int)(now - atom->time),
3403 (int)(now - atom->piece_data_time), idleTime, idleTime > limit);
3404 */
3405
3406 if (idleTime > limit)
3407 {
3408 tordbg(s, "purging peer %s because it's been %d secs since we shared anything", tr_atomAddrStr(atom), idleTime);
3409 return true;
3410 }
3411 }
3412
3413 return false;
3414 }
3415
getPeersToClose(tr_swarm * s,time_t const now_sec,int * setmeSize)3416 static tr_peer** getPeersToClose(tr_swarm* s, time_t const now_sec, int* setmeSize)
3417 {
3418 TR_ASSERT(swarmIsLocked(s));
3419
3420 int peerCount;
3421 int outsize = 0;
3422 struct tr_peer** ret = NULL;
3423 tr_peer** peers = (tr_peer**)tr_ptrArrayPeek(&s->peers, &peerCount);
3424
3425 for (int i = 0; i < peerCount; ++i)
3426 {
3427 if (shouldPeerBeClosed(s, peers[i], peerCount, now_sec))
3428 {
3429 if (ret == NULL)
3430 {
3431 ret = tr_new(tr_peer*, peerCount);
3432 }
3433
3434 ret[outsize++] = peers[i];
3435 }
3436 }
3437
3438 *setmeSize = outsize;
3439 return ret;
3440 }
3441
getReconnectIntervalSecs(struct peer_atom const * atom,time_t const now)3442 static int getReconnectIntervalSecs(struct peer_atom const* atom, time_t const now)
3443 {
3444 int sec;
3445 bool const unreachable = (atom->flags2 & MYFLAG_UNREACHABLE) != 0;
3446
3447 /* if we were recently connected to this peer and transferring piece
3448 * data, try to reconnect to them sooner rather that later -- we don't
3449 * want network troubles to get in the way of a good peer. */
3450 if (!unreachable && now - atom->piece_data_time <= MINIMUM_RECONNECT_INTERVAL_SECS * 2)
3451 {
3452 sec = MINIMUM_RECONNECT_INTERVAL_SECS;
3453 }
3454 /* otherwise, the interval depends on how many times we've tried
3455 * and failed to connect to the peer */
3456 else
3457 {
3458 int step = atom->numFails;
3459
3460 /* penalize peers that were unreachable the last time we tried */
3461 if (unreachable)
3462 {
3463 step += 2;
3464 }
3465
3466 switch (step)
3467 {
3468 case 0:
3469 sec = 0;
3470 break;
3471
3472 case 1:
3473 sec = 10;
3474 break;
3475
3476 case 2:
3477 sec = 60 * 2;
3478 break;
3479
3480 case 3:
3481 sec = 60 * 15;
3482 break;
3483
3484 case 4:
3485 sec = 60 * 30;
3486 break;
3487
3488 case 5:
3489 sec = 60 * 60;
3490 break;
3491
3492 default:
3493 sec = 60 * 120;
3494 break;
3495 }
3496 }
3497
3498 dbgmsg("reconnect interval for %s is %d seconds", tr_atomAddrStr(atom), sec);
3499 return sec;
3500 }
3501
removePeer(tr_swarm * s,tr_peer * peer)3502 static void removePeer(tr_swarm* s, tr_peer* peer)
3503 {
3504 TR_ASSERT(swarmIsLocked(s));
3505
3506 struct peer_atom* atom = peer->atom;
3507 TR_ASSERT(atom != NULL);
3508
3509 atom->time = tr_time();
3510
3511 tr_ptrArrayRemoveSortedPointer(&s->peers, peer, peerCompare);
3512 --s->stats.peerCount;
3513 --s->stats.peerFromCount[atom->fromFirst];
3514
3515 if (replicationExists(s))
3516 {
3517 tr_decrReplicationFromBitfield(s, &peer->have);
3518 }
3519
3520 TR_ASSERT(s->stats.peerCount == tr_ptrArraySize(&s->peers));
3521 TR_ASSERT(s->stats.peerFromCount[atom->fromFirst] >= 0);
3522
3523 tr_peerFree(peer);
3524 }
3525
closePeer(tr_swarm * s,tr_peer * peer)3526 static void closePeer(tr_swarm* s, tr_peer* peer)
3527 {
3528 TR_ASSERT(s != NULL);
3529 TR_ASSERT(peer != NULL);
3530
3531 struct peer_atom* atom = peer->atom;
3532
3533 /* if we transferred piece data, then they might be good peers,
3534 so reset their `numFails' weight to zero. otherwise we connected
3535 to them fruitlessly, so mark it as another fail */
3536 if (atom->piece_data_time != 0)
3537 {
3538 tordbg(s, "resetting atom %s numFails to 0", tr_atomAddrStr(atom));
3539 atom->numFails = 0;
3540 }
3541 else
3542 {
3543 ++atom->numFails;
3544 tordbg(s, "incremented atom %s numFails to %d", tr_atomAddrStr(atom), (int)atom->numFails);
3545 }
3546
3547 tordbg(s, "removing bad peer %s", tr_atomAddrStr(peer->atom));
3548 removePeer(s, peer);
3549 }
3550
removeAllPeers(tr_swarm * s)3551 static void removeAllPeers(tr_swarm* s)
3552 {
3553 while (!tr_ptrArrayEmpty(&s->peers))
3554 {
3555 removePeer(s, tr_ptrArrayNth(&s->peers, 0));
3556 }
3557
3558 TR_ASSERT(s->stats.peerCount == 0);
3559 }
3560
closeBadPeers(tr_swarm * s,time_t const now_sec)3561 static void closeBadPeers(tr_swarm* s, time_t const now_sec)
3562 {
3563 if (!tr_ptrArrayEmpty(&s->peers))
3564 {
3565 int peerCount;
3566 struct tr_peer** peers;
3567
3568 peers = getPeersToClose(s, now_sec, &peerCount);
3569
3570 for (int i = 0; i < peerCount; ++i)
3571 {
3572 closePeer(s, peers[i]);
3573 }
3574
3575 tr_free(peers);
3576 }
3577 }
3578
3579 struct peer_liveliness
3580 {
3581 tr_peer* peer;
3582 void* clientData;
3583 time_t pieceDataTime;
3584 time_t time;
3585 unsigned int speed;
3586 bool doPurge;
3587 };
3588
comparePeerLiveliness(void const * va,void const * vb)3589 static int comparePeerLiveliness(void const* va, void const* vb)
3590 {
3591 struct peer_liveliness const* a = va;
3592 struct peer_liveliness const* b = vb;
3593
3594 if (a->doPurge != b->doPurge)
3595 {
3596 return a->doPurge ? 1 : -1;
3597 }
3598
3599 if (a->speed != b->speed) /* faster goes first */
3600 {
3601 return a->speed > b->speed ? -1 : 1;
3602 }
3603
3604 /* the one to give us data more recently goes first */
3605 if (a->pieceDataTime != b->pieceDataTime)
3606 {
3607 return a->pieceDataTime > b->pieceDataTime ? -1 : 1;
3608 }
3609
3610 /* the one we connected to most recently goes first */
3611 if (a->time != b->time)
3612 {
3613 return a->time > b->time ? -1 : 1;
3614 }
3615
3616 return 0;
3617 }
3618
sortPeersByLivelinessImpl(tr_peer ** peers,void ** clientData,int n,uint64_t now,tr_voidptr_compare_func compare)3619 static void sortPeersByLivelinessImpl(tr_peer** peers, void** clientData, int n, uint64_t now, tr_voidptr_compare_func compare)
3620 {
3621 struct peer_liveliness* lives;
3622 struct peer_liveliness* l;
3623
3624 /* build a sortable array of peer + extra info */
3625 lives = tr_new0(struct peer_liveliness, n);
3626 l = lives;
3627
3628 for (int i = 0; i < n; ++i, ++l)
3629 {
3630 tr_peer* p = peers[i];
3631 l->peer = p;
3632 l->doPurge = p->doPurge;
3633 l->pieceDataTime = p->atom->piece_data_time;
3634 l->time = p->atom->time;
3635 l->speed = tr_peerGetPieceSpeed_Bps(p, now, TR_UP) + tr_peerGetPieceSpeed_Bps(p, now, TR_DOWN);
3636
3637 if (clientData != NULL)
3638 {
3639 l->clientData = clientData[i];
3640 }
3641 }
3642
3643 /* sort 'em */
3644 TR_ASSERT(n == l - lives);
3645 qsort(lives, n, sizeof(struct peer_liveliness), compare);
3646
3647 l = lives;
3648
3649 /* build the peer array */
3650 for (int i = 0; i < n; ++i, ++l)
3651 {
3652 peers[i] = l->peer;
3653
3654 if (clientData != NULL)
3655 {
3656 clientData[i] = l->clientData;
3657 }
3658 }
3659
3660 TR_ASSERT(n == l - lives);
3661
3662 /* cleanup */
3663 tr_free(lives);
3664 }
3665
sortPeersByLiveliness(tr_peer ** peers,void ** clientData,int n,uint64_t now)3666 static void sortPeersByLiveliness(tr_peer** peers, void** clientData, int n, uint64_t now)
3667 {
3668 sortPeersByLivelinessImpl(peers, clientData, n, now, comparePeerLiveliness);
3669 }
3670
enforceTorrentPeerLimit(tr_swarm * s,uint64_t now)3671 static void enforceTorrentPeerLimit(tr_swarm* s, uint64_t now)
3672 {
3673 int n = tr_ptrArraySize(&s->peers);
3674 int const max = tr_torrentGetPeerLimit(s->tor);
3675
3676 if (n > max)
3677 {
3678 void* base = tr_ptrArrayBase(&s->peers);
3679 tr_peer** peers = tr_memdup(base, n * sizeof(tr_peer*));
3680 sortPeersByLiveliness(peers, NULL, n, now);
3681
3682 while (n > max)
3683 {
3684 closePeer(s, peers[--n]);
3685 }
3686
3687 tr_free(peers);
3688 }
3689 }
3690
enforceSessionPeerLimit(tr_session * session,uint64_t now)3691 static void enforceSessionPeerLimit(tr_session* session, uint64_t now)
3692 {
3693 int n = 0;
3694 tr_torrent* tor = NULL;
3695 int const max = tr_sessionGetPeerLimit(session);
3696
3697 /* count the total number of peers */
3698 while ((tor = tr_torrentNext(session, tor)) != NULL)
3699 {
3700 n += tr_ptrArraySize(&tor->swarm->peers);
3701 }
3702
3703 /* if there are too many, prune out the worst */
3704 if (n > max)
3705 {
3706 tr_peer** peers = tr_new(tr_peer*, n);
3707 tr_swarm** swarms = tr_new(tr_swarm*, n);
3708
3709 /* populate the peer array */
3710 n = 0;
3711 tor = NULL;
3712
3713 while ((tor = tr_torrentNext(session, tor)) != NULL)
3714 {
3715 tr_swarm* s = tor->swarm;
3716
3717 for (int i = 0, tn = tr_ptrArraySize(&s->peers); i < tn; ++i, ++n)
3718 {
3719 peers[n] = tr_ptrArrayNth(&s->peers, i);
3720 swarms[n] = s;
3721 }
3722 }
3723
3724 /* sort 'em */
3725 sortPeersByLiveliness(peers, (void**)swarms, n, now);
3726
3727 /* cull out the crappiest */
3728 while (n-- > max)
3729 {
3730 closePeer(swarms[n], peers[n]);
3731 }
3732
3733 /* cleanup */
3734 tr_free(swarms);
3735 tr_free(peers);
3736 }
3737 }
3738
3739 static void makeNewPeerConnections(tr_peerMgr* mgr, int const max);
3740
reconnectPulse(evutil_socket_t foo UNUSED,short bar UNUSED,void * vmgr)3741 static void reconnectPulse(evutil_socket_t foo UNUSED, short bar UNUSED, void* vmgr)
3742 {
3743 tr_torrent* tor;
3744 tr_peerMgr* mgr = vmgr;
3745 time_t const now_sec = tr_time();
3746 uint64_t const now_msec = tr_time_msec();
3747
3748 /**
3749 *** enforce the per-session and per-torrent peer limits
3750 **/
3751
3752 /* if we're over the per-torrent peer limits, cull some peers */
3753 tor = NULL;
3754
3755 while ((tor = tr_torrentNext(mgr->session, tor)) != NULL)
3756 {
3757 if (tor->isRunning)
3758 {
3759 enforceTorrentPeerLimit(tor->swarm, now_msec);
3760 }
3761 }
3762
3763 /* if we're over the per-session peer limits, cull some peers */
3764 enforceSessionPeerLimit(mgr->session, now_msec);
3765
3766 /* remove crappy peers */
3767 tor = NULL;
3768
3769 while ((tor = tr_torrentNext(mgr->session, tor)) != NULL)
3770 {
3771 if (!tor->swarm->isRunning)
3772 {
3773 removeAllPeers(tor->swarm);
3774 }
3775 else
3776 {
3777 closeBadPeers(tor->swarm, now_sec);
3778 }
3779 }
3780
3781 /* try to make new peer connections */
3782 makeNewPeerConnections(mgr, MAX_CONNECTIONS_PER_PULSE);
3783 }
3784
3785 /****
3786 *****
3787 ***** BANDWIDTH ALLOCATION
3788 *****
3789 ****/
3790
pumpAllPeers(tr_peerMgr * mgr)3791 static void pumpAllPeers(tr_peerMgr* mgr)
3792 {
3793 tr_torrent* tor = NULL;
3794
3795 while ((tor = tr_torrentNext(mgr->session, tor)) != NULL)
3796 {
3797 tr_swarm* s = tor->swarm;
3798
3799 for (int j = 0, n = tr_ptrArraySize(&s->peers); j < n; ++j)
3800 {
3801 tr_peerMsgsPulse(tr_ptrArrayNth(&s->peers, j));
3802 }
3803 }
3804 }
3805
queuePulseForeach(void * vtor)3806 static void queuePulseForeach(void* vtor)
3807 {
3808 tr_torrent* tor = vtor;
3809
3810 tr_torrentStartNow(tor);
3811
3812 if (tor->queue_started_callback != NULL)
3813 {
3814 (*tor->queue_started_callback)(tor, tor->queue_started_user_data);
3815 }
3816 }
3817
queuePulse(tr_session * session,tr_direction dir)3818 static void queuePulse(tr_session* session, tr_direction dir)
3819 {
3820 TR_ASSERT(tr_isSession(session));
3821 TR_ASSERT(tr_isDirection(dir));
3822
3823 if (tr_sessionGetQueueEnabled(session, dir))
3824 {
3825 tr_ptrArray torrents = TR_PTR_ARRAY_INIT;
3826
3827 tr_sessionGetNextQueuedTorrents(session, dir, tr_sessionCountQueueFreeSlots(session, dir), &torrents);
3828
3829 tr_ptrArrayForeach(&torrents, queuePulseForeach);
3830
3831 tr_ptrArrayDestruct(&torrents, NULL);
3832 }
3833 }
3834
bandwidthPulse(evutil_socket_t foo UNUSED,short bar UNUSED,void * vmgr)3835 static void bandwidthPulse(evutil_socket_t foo UNUSED, short bar UNUSED, void* vmgr)
3836 {
3837 tr_torrent* tor;
3838 tr_peerMgr* mgr = vmgr;
3839 tr_session* session = mgr->session;
3840 managerLock(mgr);
3841
3842 /* FIXME: this next line probably isn't necessary... */
3843 pumpAllPeers(mgr);
3844
3845 /* allocate bandwidth to the peers */
3846 tr_bandwidthAllocate(&session->bandwidth, TR_UP, BANDWIDTH_PERIOD_MSEC);
3847 tr_bandwidthAllocate(&session->bandwidth, TR_DOWN, BANDWIDTH_PERIOD_MSEC);
3848
3849 /* torrent upkeep */
3850 tor = NULL;
3851
3852 while ((tor = tr_torrentNext(session, tor)) != NULL)
3853 {
3854 /* possibly stop torrents that have seeded enough */
3855 tr_torrentCheckSeedLimit(tor);
3856
3857 /* run the completeness check for any torrents that need it */
3858 if (tor->swarm->needsCompletenessCheck)
3859 {
3860 tor->swarm->needsCompletenessCheck = false;
3861 tr_torrentRecheckCompleteness(tor);
3862 }
3863
3864 /* stop torrents that are ready to stop, but couldn't be stopped
3865 earlier during the peer-io callback call chain */
3866 if (tor->isStopping)
3867 {
3868 tr_torrentStop(tor);
3869 }
3870
3871 /* update the torrent's stats */
3872 tor->swarm->stats.activeWebseedCount = countActiveWebseeds(tor->swarm);
3873 }
3874
3875 /* pump the queues */
3876 queuePulse(session, TR_UP);
3877 queuePulse(session, TR_DOWN);
3878
3879 reconnectPulse(0, 0, mgr);
3880
3881 tr_timerAddMsec(mgr->bandwidthTimer, BANDWIDTH_PERIOD_MSEC);
3882 managerUnlock(mgr);
3883 }
3884
3885 /***
3886 ****
3887 ***/
3888
compareAtomPtrsByAddress(void const * va,void const * vb)3889 static int compareAtomPtrsByAddress(void const* va, void const* vb)
3890 {
3891 struct peer_atom const* a = *(struct peer_atom const* const*)va;
3892 struct peer_atom const* b = *(struct peer_atom const* const*)vb;
3893
3894 TR_ASSERT(tr_isAtom(a));
3895 TR_ASSERT(tr_isAtom(b));
3896
3897 return tr_address_compare(&a->addr, &b->addr);
3898 }
3899
3900 /* best come first, worst go last */
compareAtomPtrsByShelfDate(void const * va,void const * vb)3901 static int compareAtomPtrsByShelfDate(void const* va, void const* vb)
3902 {
3903 struct peer_atom const* a = *(struct peer_atom const* const*)va;
3904 struct peer_atom const* b = *(struct peer_atom const* const*)vb;
3905
3906 TR_ASSERT(tr_isAtom(a));
3907 TR_ASSERT(tr_isAtom(b));
3908
3909 int const data_time_cutoff_secs = 60 * 60;
3910 time_t const tr_now = tr_time();
3911
3912 /* primary key: the last piece data time *if* it was within the last hour */
3913 time_t atime = a->piece_data_time;
3914
3915 if (atime + data_time_cutoff_secs < tr_now)
3916 {
3917 atime = 0;
3918 }
3919
3920 time_t btime = b->piece_data_time;
3921
3922 if (btime + data_time_cutoff_secs < tr_now)
3923 {
3924 btime = 0;
3925 }
3926
3927 if (atime != btime)
3928 {
3929 return atime > btime ? -1 : 1;
3930 }
3931
3932 /* secondary key: shelf date. */
3933 if (a->shelf_date != b->shelf_date)
3934 {
3935 return a->shelf_date > b->shelf_date ? -1 : 1;
3936 }
3937
3938 return 0;
3939 }
3940
getMaxAtomCount(tr_torrent const * tor)3941 static int getMaxAtomCount(tr_torrent const* tor)
3942 {
3943 return MIN(50, tor->maxConnectedPeers * 3);
3944 }
3945
atomPulse(evutil_socket_t foo UNUSED,short bar UNUSED,void * vmgr)3946 static void atomPulse(evutil_socket_t foo UNUSED, short bar UNUSED, void* vmgr)
3947 {
3948 tr_torrent* tor = NULL;
3949 tr_peerMgr* mgr = vmgr;
3950 managerLock(mgr);
3951
3952 while ((tor = tr_torrentNext(mgr->session, tor)) != NULL)
3953 {
3954 int atomCount;
3955 tr_swarm* s = tor->swarm;
3956 int const maxAtomCount = getMaxAtomCount(tor);
3957 struct peer_atom** atoms = (struct peer_atom**)tr_ptrArrayPeek(&s->pool, &atomCount);
3958
3959 if (atomCount > maxAtomCount) /* we've got too many atoms... time to prune */
3960 {
3961 int keepCount = 0;
3962 int testCount = 0;
3963 struct peer_atom** keep = tr_new(struct peer_atom*, atomCount);
3964 struct peer_atom** test = tr_new(struct peer_atom*, atomCount);
3965
3966 /* keep the ones that are in use */
3967 for (int i = 0; i < atomCount; ++i)
3968 {
3969 struct peer_atom* atom = atoms[i];
3970
3971 if (peerIsInUse(s, atom))
3972 {
3973 keep[keepCount++] = atom;
3974 }
3975 else
3976 {
3977 test[testCount++] = atom;
3978 }
3979 }
3980
3981 /* if there's room, keep the best of what's left */
3982 int i = 0;
3983
3984 if (keepCount < maxAtomCount)
3985 {
3986 qsort(test, testCount, sizeof(struct peer_atom*), compareAtomPtrsByShelfDate);
3987
3988 while (i < testCount && keepCount < maxAtomCount)
3989 {
3990 keep[keepCount++] = test[i++];
3991 }
3992 }
3993
3994 /* free the culled atoms */
3995 while (i < testCount)
3996 {
3997 tr_free(test[i++]);
3998 }
3999
4000 /* rebuild Torrent.pool with what's left */
4001 tr_ptrArrayDestruct(&s->pool, NULL);
4002 s->pool = TR_PTR_ARRAY_INIT;
4003 qsort(keep, keepCount, sizeof(struct peer_atom*), compareAtomPtrsByAddress);
4004
4005 for (int i = 0; i < keepCount; ++i)
4006 {
4007 tr_ptrArrayAppend(&s->pool, keep[i]);
4008 }
4009
4010 tordbg(s, "max atom count is %d... pruned from %d to %d\n", maxAtomCount, atomCount, keepCount);
4011
4012 /* cleanup */
4013 tr_free(test);
4014 tr_free(keep);
4015 }
4016 }
4017
4018 tr_timerAddMsec(mgr->atomTimer, ATOM_PERIOD_MSEC);
4019 managerUnlock(mgr);
4020 }
4021
4022 /***
4023 ****
4024 ****
4025 ****
4026 ***/
4027
4028 /* is this atom someone that we'd want to initiate a connection to? */
isPeerCandidate(tr_torrent const * tor,struct peer_atom * atom,time_t const now)4029 static bool isPeerCandidate(tr_torrent const* tor, struct peer_atom* atom, time_t const now)
4030 {
4031 /* not if we're both seeds */
4032 if (tr_torrentIsSeed(tor) && atomIsSeed(atom))
4033 {
4034 return false;
4035 }
4036
4037 /* not if we've already got a connection to them... */
4038 if (peerIsInUse(tor->swarm, atom))
4039 {
4040 return false;
4041 }
4042
4043 /* not if we just tried them already */
4044 if (now - atom->time < getReconnectIntervalSecs(atom, now))
4045 {
4046 return false;
4047 }
4048
4049 /* not if they're blocklisted */
4050 if (isAtomBlocklisted(tor->session, atom))
4051 {
4052 return false;
4053 }
4054
4055 /* not if they're banned... */
4056 if ((atom->flags2 & MYFLAG_BANNED) != 0)
4057 {
4058 return false;
4059 }
4060
4061 return true;
4062 }
4063
4064 struct peer_candidate
4065 {
4066 uint64_t score;
4067 tr_torrent* tor;
4068 struct peer_atom* atom;
4069 };
4070
torrentWasRecentlyStarted(tr_torrent const * tor)4071 static bool torrentWasRecentlyStarted(tr_torrent const* tor)
4072 {
4073 return difftime(tr_time(), tor->startDate) < 120;
4074 }
4075
addValToKey(uint64_t value,int width,uint64_t addme)4076 static inline uint64_t addValToKey(uint64_t value, int width, uint64_t addme)
4077 {
4078 value = value << (uint64_t)width;
4079 value |= addme;
4080 return value;
4081 }
4082
4083 /* smaller value is better */
getPeerCandidateScore(tr_torrent const * tor,struct peer_atom const * atom,uint8_t salt)4084 static uint64_t getPeerCandidateScore(tr_torrent const* tor, struct peer_atom const* atom, uint8_t salt)
4085 {
4086 uint64_t i;
4087 uint64_t score = 0;
4088 bool const failed = atom->lastConnectionAt < atom->lastConnectionAttemptAt;
4089
4090 /* prefer peers we've connected to, or never tried, over peers we failed to connect to. */
4091 i = failed ? 1 : 0;
4092 score = addValToKey(score, 1, i);
4093
4094 /* prefer the one we attempted least recently (to cycle through all peers) */
4095 i = atom->lastConnectionAttemptAt;
4096 score = addValToKey(score, 32, i);
4097
4098 /* prefer peers belonging to a torrent of a higher priority */
4099 switch (tr_torrentGetPriority(tor))
4100 {
4101 case TR_PRI_HIGH:
4102 i = 0;
4103 break;
4104
4105 case TR_PRI_NORMAL:
4106 i = 1;
4107 break;
4108
4109 case TR_PRI_LOW:
4110 i = 2;
4111 break;
4112 }
4113
4114 score = addValToKey(score, 4, i);
4115
4116 /* prefer recently-started torrents */
4117 i = torrentWasRecentlyStarted(tor) ? 0 : 1;
4118 score = addValToKey(score, 1, i);
4119
4120 /* prefer torrents we're downloading with */
4121 i = tr_torrentIsSeed(tor) ? 1 : 0;
4122 score = addValToKey(score, 1, i);
4123
4124 /* prefer peers that are known to be connectible */
4125 i = (atom->flags & ADDED_F_CONNECTABLE) != 0 ? 0 : 1;
4126 score = addValToKey(score, 1, i);
4127
4128 /* prefer peers that we might have a chance of uploading to...
4129 so lower seed probability is better */
4130 if (atom->seedProbability == 100)
4131 {
4132 i = 101;
4133 }
4134 else if (atom->seedProbability == -1)
4135 {
4136 i = 100;
4137 }
4138 else
4139 {
4140 i = atom->seedProbability;
4141 }
4142
4143 score = addValToKey(score, 8, i);
4144
4145 /* Prefer peers that we got from more trusted sources.
4146 * lower `fromBest' values indicate more trusted sources */
4147 score = addValToKey(score, 4, atom->fromBest);
4148
4149 /* salt */
4150 score = addValToKey(score, 8, salt);
4151
4152 return score;
4153 }
4154
comparePeerCandidates(void const * va,void const * vb)4155 static int comparePeerCandidates(void const* va, void const* vb)
4156 {
4157 int ret;
4158 struct peer_candidate const* a = va;
4159 struct peer_candidate const* b = vb;
4160
4161 if (a->score < b->score)
4162 {
4163 ret = -1;
4164 }
4165 else if (a->score > b->score)
4166 {
4167 ret = 1;
4168 }
4169 else
4170 {
4171 ret = 0;
4172 }
4173
4174 return ret;
4175 }
4176
4177 /* Partial sorting -- selecting the k best candidates
4178 Adapted from http://en.wikipedia.org/wiki/Selection_algorithm */
selectPeerCandidates(struct peer_candidate * candidates,int candidate_count,int select_count)4179 static void selectPeerCandidates(struct peer_candidate* candidates, int candidate_count, int select_count)
4180 {
4181 tr_quickfindFirstK(candidates, candidate_count, sizeof(struct peer_candidate), comparePeerCandidates, select_count);
4182 }
4183
4184 #ifdef TR_ENABLE_ASSERTS
4185
checkBestScoresComeFirst(struct peer_candidate const * candidates,int n,int k)4186 static bool checkBestScoresComeFirst(struct peer_candidate const* candidates, int n, int k)
4187 {
4188 uint64_t worstFirstScore = 0;
4189 int const x = MIN(n, k) - 1;
4190
4191 for (int i = 0; i < x; i++)
4192 {
4193 if (worstFirstScore < candidates[i].score)
4194 {
4195 worstFirstScore = candidates[i].score;
4196 }
4197 }
4198
4199 for (int i = 0; i < x; i++)
4200 {
4201 TR_ASSERT(candidates[i].score <= worstFirstScore);
4202 }
4203
4204 for (int i = x + 1; i < n; i++)
4205 {
4206 TR_ASSERT(candidates[i].score >= worstFirstScore);
4207 }
4208
4209 return true;
4210 }
4211
4212 #endif /* TR_ENABLE_ASSERTS */
4213
4214 /** @return an array of all the atoms we might want to connect to */
getPeerCandidates(tr_session * session,int * candidateCount,int max)4215 static struct peer_candidate* getPeerCandidates(tr_session* session, int* candidateCount, int max)
4216 {
4217 int atomCount;
4218 int peerCount;
4219 tr_torrent* tor;
4220 struct peer_candidate* candidates;
4221 struct peer_candidate* walk;
4222 time_t const now = tr_time();
4223 uint64_t const now_msec = tr_time_msec();
4224 /* leave 5% of connection slots for incoming connections -- ticket #2609 */
4225 int const maxCandidates = tr_sessionGetPeerLimit(session) * 0.95;
4226
4227 /* count how many peers and atoms we've got */
4228 tor = NULL;
4229 atomCount = 0;
4230 peerCount = 0;
4231
4232 while ((tor = tr_torrentNext(session, tor)) != NULL)
4233 {
4234 atomCount += tr_ptrArraySize(&tor->swarm->pool);
4235 peerCount += tr_ptrArraySize(&tor->swarm->peers);
4236 }
4237
4238 /* don't start any new handshakes if we're full up */
4239 if (maxCandidates <= peerCount)
4240 {
4241 *candidateCount = 0;
4242 return NULL;
4243 }
4244
4245 /* allocate an array of candidates */
4246 walk = candidates = tr_new(struct peer_candidate, atomCount);
4247
4248 /* populate the candidate array */
4249 tor = NULL;
4250
4251 while ((tor = tr_torrentNext(session, tor)) != NULL)
4252 {
4253 int nAtoms;
4254 struct peer_atom** atoms;
4255
4256 if (!tor->swarm->isRunning)
4257 {
4258 continue;
4259 }
4260
4261 /* if we've already got enough peers in this torrent... */
4262 if (tr_torrentGetPeerLimit(tor) <= tr_ptrArraySize(&tor->swarm->peers))
4263 {
4264 continue;
4265 }
4266
4267 /* if we've already got enough speed in this torrent... */
4268 if (tr_torrentIsSeed(tor) && isBandwidthMaxedOut(&tor->bandwidth, now_msec, TR_UP))
4269 {
4270 continue;
4271 }
4272
4273 atoms = (struct peer_atom**)tr_ptrArrayPeek(&tor->swarm->pool, &nAtoms);
4274
4275 for (int i = 0; i < nAtoms; ++i)
4276 {
4277 struct peer_atom* atom = atoms[i];
4278
4279 if (isPeerCandidate(tor, atom, now))
4280 {
4281 uint8_t const salt = tr_rand_int_weak(1024);
4282 walk->tor = tor;
4283 walk->atom = atom;
4284 walk->score = getPeerCandidateScore(tor, atom, salt);
4285 ++walk;
4286 }
4287 }
4288 }
4289
4290 *candidateCount = walk - candidates;
4291
4292 if (walk != candidates)
4293 {
4294 selectPeerCandidates(candidates, walk - candidates, max);
4295 }
4296
4297 TR_ASSERT(checkBestScoresComeFirst(candidates, *candidateCount, max));
4298 return candidates;
4299 }
4300
initiateConnection(tr_peerMgr * mgr,tr_swarm * s,struct peer_atom * atom)4301 static void initiateConnection(tr_peerMgr* mgr, tr_swarm* s, struct peer_atom* atom)
4302 {
4303 tr_peerIo* io;
4304 time_t const now = tr_time();
4305 bool utp = tr_sessionIsUTPEnabled(mgr->session) && !atom->utp_failed;
4306
4307 if (atom->fromFirst == TR_PEER_FROM_PEX)
4308 {
4309 /* PEX has explicit signalling for uTP support. If an atom
4310 originally came from PEX and doesn't have the uTP flag, skip the
4311 uTP connection attempt. Are we being optimistic here? */
4312 utp = utp && (atom->flags & ADDED_F_UTP_FLAGS) != 0;
4313 }
4314
4315 tordbg(s, "Starting an OUTGOING%s connection with %s", utp ? " µTP" : "", tr_atomAddrStr(atom));
4316
4317 io = tr_peerIoNewOutgoing(mgr->session, &mgr->session->bandwidth, &atom->addr, atom->port, s->tor->info.hash,
4318 s->tor->completeness == TR_SEED, utp);
4319
4320 if (io == NULL)
4321 {
4322 tordbg(s, "peerIo not created; marking peer %s as unreachable", tr_atomAddrStr(atom));
4323 atom->flags2 |= MYFLAG_UNREACHABLE;
4324 atom->numFails++;
4325 }
4326 else
4327 {
4328 tr_handshake* handshake = tr_handshakeNew(io, mgr->session->encryptionMode, myHandshakeDoneCB, mgr);
4329
4330 TR_ASSERT(tr_peerIoGetTorrentHash(io));
4331
4332 tr_peerIoUnref(io); /* balanced by the initial ref in tr_peerIoNewOutgoing() */
4333
4334 tr_ptrArrayInsertSorted(&s->outgoingHandshakes, handshake, handshakeCompare);
4335 }
4336
4337 atom->lastConnectionAttemptAt = now;
4338 atom->time = now;
4339 }
4340
initiateCandidateConnection(tr_peerMgr * mgr,struct peer_candidate * c)4341 static void initiateCandidateConnection(tr_peerMgr* mgr, struct peer_candidate* c)
4342 {
4343 #if 0
4344
4345 fprintf(stderr, "Starting an OUTGOING connection with %s - [%s] seedProbability==%d; %s, %s\n", tr_atomAddrStr(c->atom),
4346 tr_torrentName(c->tor), (int)c->atom->seedProbability, tr_torrentIsPrivate(c->tor) ? "private" : "public",
4347 tr_torrentIsSeed(c->tor) ? "seed" : "downloader");
4348
4349 #endif
4350
4351 initiateConnection(mgr, c->tor->swarm, c->atom);
4352 }
4353
makeNewPeerConnections(struct tr_peerMgr * mgr,int const max)4354 static void makeNewPeerConnections(struct tr_peerMgr* mgr, int const max)
4355 {
4356 int n;
4357 struct peer_candidate* candidates;
4358
4359 candidates = getPeerCandidates(mgr->session, &n, max);
4360
4361 for (int i = 0; i < n && i < max; ++i)
4362 {
4363 initiateCandidateConnection(mgr, &candidates[i]);
4364 }
4365
4366 tr_free(candidates);
4367 }
4368