1 /*
2  * This file Copyright (C) 2007-2014 Mnemosyne LLC
3  *
4  * It may be used under the GNU GPL versions 2 or 3
5  * or any future license endorsed by Mnemosyne LLC.
6  *
7  */
8 
9 #include <errno.h>
10 #include <stdarg.h>
11 #include <stdlib.h>
12 #include <string.h>
13 
14 #include <event2/buffer.h>
15 #include <event2/bufferevent.h>
16 #include <event2/event.h>
17 
18 #include "transmission.h"
19 #include "cache.h"
20 #include "completion.h"
21 #include "file.h"
22 #include "log.h"
23 #include "peer-io.h"
24 #include "peer-mgr.h"
25 #include "peer-msgs.h"
26 #include "session.h"
27 #include "torrent.h"
28 #include "torrent-magnet.h"
29 #include "tr-assert.h"
30 #include "tr-dht.h"
31 #include "utils.h"
32 #include "variant.h"
33 #include "version.h"
34 
35 #ifndef EBADMSG
36 #define EBADMSG EINVAL
37 #endif
38 
39 /**
40 ***
41 **/
42 
43 enum
44 {
45     BT_CHOKE = 0,
46     BT_UNCHOKE = 1,
47     BT_INTERESTED = 2,
48     BT_NOT_INTERESTED = 3,
49     BT_HAVE = 4,
50     BT_BITFIELD = 5,
51     BT_REQUEST = 6,
52     BT_PIECE = 7,
53     BT_CANCEL = 8,
54     BT_PORT = 9,
55     /* */
56     BT_FEXT_SUGGEST = 13,
57     BT_FEXT_HAVE_ALL = 14,
58     BT_FEXT_HAVE_NONE = 15,
59     BT_FEXT_REJECT = 16,
60     BT_FEXT_ALLOWED_FAST = 17,
61     /* */
62     BT_LTEP = 20,
63     /* */
64     LTEP_HANDSHAKE = 0,
65     /* */
66     UT_PEX_ID = 1,
67     UT_METADATA_ID = 3,
68     /* */
69     MAX_PEX_PEER_COUNT = 50,
70     /* */
71     MIN_CHOKE_PERIOD_SEC = 10,
72     /* idle seconds before we send a keepalive */
73     KEEPALIVE_INTERVAL_SECS = 100,
74     /* */
75     PEX_INTERVAL_SECS = 90, /* sec between sendPex() calls */
76     /* */
77     REQQ = 512,
78     /* */
79     METADATA_REQQ = 64,
80     /* */
81     MAGIC_NUMBER = 21549,
82     /* used in lowering the outMessages queue period */
83     IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
84     HIGH_PRIORITY_INTERVAL_SECS = 2,
85     LOW_PRIORITY_INTERVAL_SECS = 10,
86     /* number of pieces we'll allow in our fast set */
87     MAX_FAST_SET_SIZE = 3,
88     /* how many blocks to keep prefetched per peer */
89     PREFETCH_SIZE = 18,
90     /* when we're making requests from another peer,
91        batch them together to send enough requests to
92        meet our bandwidth goals for the next N seconds */
93     REQUEST_BUF_SECS = 10,
94     /* defined in BEP #9 */
95     METADATA_MSG_TYPE_REQUEST = 0,
96     METADATA_MSG_TYPE_DATA = 1,
97     METADATA_MSG_TYPE_REJECT = 2
98 };
99 
100 enum
101 {
102     AWAITING_BT_LENGTH,
103     AWAITING_BT_ID,
104     AWAITING_BT_MESSAGE,
105     AWAITING_BT_PIECE
106 };
107 
108 typedef enum
109 {
110     ENCRYPTION_PREFERENCE_UNKNOWN,
111     ENCRYPTION_PREFERENCE_YES,
112     ENCRYPTION_PREFERENCE_NO
113 }
114 encryption_preference_t;
115 
116 /**
117 ***
118 **/
119 
120 struct peer_request
121 {
122     uint32_t index;
123     uint32_t offset;
124     uint32_t length;
125 };
126 
blockToReq(tr_torrent const * tor,tr_block_index_t block,struct peer_request * setme)127 static void blockToReq(tr_torrent const* tor, tr_block_index_t block, struct peer_request* setme)
128 {
129     tr_torrentGetBlockLocation(tor, block, &setme->index, &setme->offset, &setme->length);
130 }
131 
132 /**
133 ***
134 **/
135 
136 /* this is raw, unchanged data from the peer regarding
137  * the current message that it's sending us. */
138 struct tr_incoming
139 {
140     uint8_t id;
141     uint32_t length; /* includes the +1 for id length */
142     struct peer_request blockReq; /* metadata for incoming blocks */
143     struct evbuffer* block; /* piece data for incoming blocks */
144 };
145 
146 /**
147  * Low-level communication state information about a connected peer.
148  *
149  * This structure remembers the low-level protocol states that we're
150  * in with this peer, such as active requests, pex messages, and so on.
151  * Its fields are all private to peer-msgs.c.
152  *
153  * Data not directly involved with sending & receiving messages is
154  * stored in tr_peer, where it can be accessed by both peermsgs and
155  * the peer manager.
156  *
157  * @see struct peer_atom
158  * @see tr_peer
159  */
160 struct tr_peerMsgs
161 {
162     struct tr_peer peer; /* parent */
163 
164     uint16_t magic_number;
165 
166     /* Whether or not we've choked this peer. */
167     bool peer_is_choked;
168 
169     /* whether or not the peer has indicated it will download from us. */
170     bool peer_is_interested;
171 
172     /* whether or not the peer is choking us. */
173     bool client_is_choked;
174 
175     /* whether or not we've indicated to the peer that we would download from them if unchoked. */
176     bool client_is_interested;
177 
178     bool peerSupportsPex;
179     bool peerSupportsMetadataXfer;
180     bool clientSentLtepHandshake;
181     bool peerSentLtepHandshake;
182 
183     /*bool haveFastSet;*/
184 
185     int desiredRequestCount;
186 
187     int prefetchCount;
188 
189     bool is_active[2];
190 
191     /* how long the outMessages batch should be allowed to grow before
192      * it's flushed -- some messages (like requests >:) should be sent
193      * very quickly; others aren't as urgent. */
194     int8_t outMessagesBatchPeriod;
195 
196     uint8_t state;
197     uint8_t ut_pex_id;
198     uint8_t ut_metadata_id;
199     uint16_t pexCount;
200     uint16_t pexCount6;
201 
202     tr_port dht_port;
203 
204     encryption_preference_t encryption_preference;
205 
206     size_t metadata_size_hint;
207 #if 0
208     size_t fastsetSize;
209     tr_piece_index_t fastset[MAX_FAST_SET_SIZE];
210 #endif
211 
212     tr_torrent* torrent;
213 
214     tr_peer_callback callback;
215     void* callbackData;
216 
217     struct evbuffer* outMessages; /* all the non-piece messages */
218 
219     struct peer_request peerAskedFor[REQQ];
220 
221     int peerAskedForMetadata[METADATA_REQQ];
222     int peerAskedForMetadataCount;
223 
224     tr_pex* pex;
225     tr_pex* pex6;
226 
227     /*time_t clientSentPexAt;*/
228     time_t clientSentAnythingAt;
229 
230     time_t chokeChangedAt;
231 
232     /* when we started batching the outMessages */
233     time_t outMessagesBatchedAt;
234 
235     struct tr_incoming incoming;
236 
237     /* if the peer supports the Extension Protocol in BEP 10 and
238        supplied a reqq argument, it's stored here. Otherwise, the
239        value is zero and should be ignored. */
240     int64_t reqq;
241 
242     struct event* pexTimer;
243 
244     struct tr_peerIo* io;
245 };
246 
247 /**
248 ***
249 **/
250 
getSession(struct tr_peerMsgs * msgs)251 static inline tr_session* getSession(struct tr_peerMsgs* msgs)
252 {
253     return msgs->torrent->session;
254 }
255 
256 /**
257 ***
258 **/
259 
260 static void myDebug(char const* file, int line, struct tr_peerMsgs const* msgs, char const* fmt, ...) TR_GNUC_PRINTF(4, 5);
261 
myDebug(char const * file,int line,struct tr_peerMsgs const * msgs,char const * fmt,...)262 static void myDebug(char const* file, int line, struct tr_peerMsgs const* msgs, char const* fmt, ...)
263 {
264     tr_sys_file_t const fp = tr_logGetFile();
265 
266     if (fp != TR_BAD_SYS_FILE)
267     {
268         va_list args;
269         char timestr[64];
270         struct evbuffer* buf = evbuffer_new();
271         char* base = tr_sys_path_basename(file, NULL);
272         char* message;
273 
274         evbuffer_add_printf(buf, "[%s] %s - %s [%s]: ", tr_logGetTimeStr(timestr, sizeof(timestr)), tr_torrentName(
275             msgs->torrent), tr_peerIoGetAddrStr(msgs->io), tr_quark_get_string(msgs->peer.client, NULL));
276         va_start(args, fmt);
277         evbuffer_add_vprintf(buf, fmt, args);
278         va_end(args);
279         evbuffer_add_printf(buf, " (%s:%d)", base, line);
280 
281         message = evbuffer_free_to_str(buf, NULL);
282         tr_sys_file_write_line(fp, message, NULL);
283 
284         tr_free(base);
285         tr_free(message);
286     }
287 }
288 
289 #define dbgmsg(msgs, ...) \
290     do \
291     { \
292         if (tr_logGetDeepEnabled()) \
293         { \
294             myDebug(__FILE__, __LINE__, msgs, __VA_ARGS__); \
295         } \
296     } \
297     while (0)
298 
299 /**
300 ***
301 **/
302 
pokeBatchPeriod(tr_peerMsgs * msgs,int interval)303 static void pokeBatchPeriod(tr_peerMsgs* msgs, int interval)
304 {
305     if (msgs->outMessagesBatchPeriod > interval)
306     {
307         msgs->outMessagesBatchPeriod = interval;
308         dbgmsg(msgs, "lowering batch interval to %d seconds", interval);
309     }
310 }
311 
dbgOutMessageLen(tr_peerMsgs * msgs)312 static void dbgOutMessageLen(tr_peerMsgs* msgs)
313 {
314     dbgmsg(msgs, "outMessage size is now %zu", evbuffer_get_length(msgs->outMessages));
315 }
316 
protocolSendReject(tr_peerMsgs * msgs,struct peer_request const * req)317 static void protocolSendReject(tr_peerMsgs* msgs, struct peer_request const* req)
318 {
319     TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));
320 
321     struct evbuffer* out = msgs->outMessages;
322 
323     evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t));
324     evbuffer_add_uint8(out, BT_FEXT_REJECT);
325     evbuffer_add_uint32(out, req->index);
326     evbuffer_add_uint32(out, req->offset);
327     evbuffer_add_uint32(out, req->length);
328 
329     dbgmsg(msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length);
330     dbgOutMessageLen(msgs);
331 }
332 
protocolSendRequest(tr_peerMsgs * msgs,struct peer_request const * req)333 static void protocolSendRequest(tr_peerMsgs* msgs, struct peer_request const* req)
334 {
335     struct evbuffer* out = msgs->outMessages;
336 
337     evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t));
338     evbuffer_add_uint8(out, BT_REQUEST);
339     evbuffer_add_uint32(out, req->index);
340     evbuffer_add_uint32(out, req->offset);
341     evbuffer_add_uint32(out, req->length);
342 
343     dbgmsg(msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length);
344     dbgOutMessageLen(msgs);
345     pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
346 }
347 
protocolSendCancel(tr_peerMsgs * msgs,struct peer_request const * req)348 static void protocolSendCancel(tr_peerMsgs* msgs, struct peer_request const* req)
349 {
350     struct evbuffer* out = msgs->outMessages;
351 
352     evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t));
353     evbuffer_add_uint8(out, BT_CANCEL);
354     evbuffer_add_uint32(out, req->index);
355     evbuffer_add_uint32(out, req->offset);
356     evbuffer_add_uint32(out, req->length);
357 
358     dbgmsg(msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length);
359     dbgOutMessageLen(msgs);
360     pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
361 }
362 
protocolSendPort(tr_peerMsgs * msgs,uint16_t port)363 static void protocolSendPort(tr_peerMsgs* msgs, uint16_t port)
364 {
365     struct evbuffer* out = msgs->outMessages;
366 
367     dbgmsg(msgs, "sending Port %u", port);
368     evbuffer_add_uint32(out, 3);
369     evbuffer_add_uint8(out, BT_PORT);
370     evbuffer_add_uint16(out, port);
371 }
372 
protocolSendHave(tr_peerMsgs * msgs,uint32_t index)373 static void protocolSendHave(tr_peerMsgs* msgs, uint32_t index)
374 {
375     struct evbuffer* out = msgs->outMessages;
376 
377     evbuffer_add_uint32(out, sizeof(uint8_t) + sizeof(uint32_t));
378     evbuffer_add_uint8(out, BT_HAVE);
379     evbuffer_add_uint32(out, index);
380 
381     dbgmsg(msgs, "sending Have %u", index);
382     dbgOutMessageLen(msgs);
383     pokeBatchPeriod(msgs, LOW_PRIORITY_INTERVAL_SECS);
384 }
385 
386 #if 0
387 
388 static void protocolSendAllowedFast(tr_peerMsgs* msgs, uint32_t pieceIndex)
389 {
390     TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));
391 
392     tr_peerIo* io = msgs->io;
393     struct evbuffer* out = msgs->outMessages;
394 
395     evbuffer_add_uint32(io, out, sizeof(uint8_t) + sizeof(uint32_t));
396     evbuffer_add_uint8(io, out, BT_FEXT_ALLOWED_FAST);
397     evbuffer_add_uint32(io, out, pieceIndex);
398 
399     dbgmsg(msgs, "sending Allowed Fast %u...", pieceIndex);
400     dbgOutMessageLen(msgs);
401 }
402 
403 #endif
404 
protocolSendChoke(tr_peerMsgs * msgs,bool choke)405 static void protocolSendChoke(tr_peerMsgs* msgs, bool choke)
406 {
407     struct evbuffer* out = msgs->outMessages;
408 
409     evbuffer_add_uint32(out, sizeof(uint8_t));
410     evbuffer_add_uint8(out, choke ? BT_CHOKE : BT_UNCHOKE);
411 
412     dbgmsg(msgs, "sending %s...", choke ? "Choke" : "Unchoke");
413     dbgOutMessageLen(msgs);
414     pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
415 }
416 
protocolSendHaveAll(tr_peerMsgs * msgs)417 static void protocolSendHaveAll(tr_peerMsgs* msgs)
418 {
419     TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));
420 
421     struct evbuffer* out = msgs->outMessages;
422 
423     evbuffer_add_uint32(out, sizeof(uint8_t));
424     evbuffer_add_uint8(out, BT_FEXT_HAVE_ALL);
425 
426     dbgmsg(msgs, "sending HAVE_ALL...");
427     dbgOutMessageLen(msgs);
428     pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
429 }
430 
protocolSendHaveNone(tr_peerMsgs * msgs)431 static void protocolSendHaveNone(tr_peerMsgs* msgs)
432 {
433     TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));
434 
435     struct evbuffer* out = msgs->outMessages;
436 
437     evbuffer_add_uint32(out, sizeof(uint8_t));
438     evbuffer_add_uint8(out, BT_FEXT_HAVE_NONE);
439 
440     dbgmsg(msgs, "sending HAVE_NONE...");
441     dbgOutMessageLen(msgs);
442     pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
443 }
444 
445 /**
446 ***  EVENTS
447 **/
448 
publish(tr_peerMsgs * msgs,tr_peer_event * e)449 static void publish(tr_peerMsgs* msgs, tr_peer_event* e)
450 {
451     if (msgs->callback != NULL)
452     {
453         (*msgs->callback)(&msgs->peer, e, msgs->callbackData);
454     }
455 }
456 
fireError(tr_peerMsgs * msgs,int err)457 static void fireError(tr_peerMsgs* msgs, int err)
458 {
459     tr_peer_event e = TR_PEER_EVENT_INIT;
460     e.eventType = TR_PEER_ERROR;
461     e.err = err;
462     publish(msgs, &e);
463 }
464 
fireGotBlock(tr_peerMsgs * msgs,struct peer_request const * req)465 static void fireGotBlock(tr_peerMsgs* msgs, struct peer_request const* req)
466 {
467     tr_peer_event e = TR_PEER_EVENT_INIT;
468     e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
469     e.pieceIndex = req->index;
470     e.offset = req->offset;
471     e.length = req->length;
472     publish(msgs, &e);
473 }
474 
fireGotRej(tr_peerMsgs * msgs,struct peer_request const * req)475 static void fireGotRej(tr_peerMsgs* msgs, struct peer_request const* req)
476 {
477     tr_peer_event e = TR_PEER_EVENT_INIT;
478     e.eventType = TR_PEER_CLIENT_GOT_REJ;
479     e.pieceIndex = req->index;
480     e.offset = req->offset;
481     e.length = req->length;
482     publish(msgs, &e);
483 }
484 
fireGotChoke(tr_peerMsgs * msgs)485 static void fireGotChoke(tr_peerMsgs* msgs)
486 {
487     tr_peer_event e = TR_PEER_EVENT_INIT;
488     e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
489     publish(msgs, &e);
490 }
491 
fireClientGotHaveAll(tr_peerMsgs * msgs)492 static void fireClientGotHaveAll(tr_peerMsgs* msgs)
493 {
494     tr_peer_event e = TR_PEER_EVENT_INIT;
495     e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
496     publish(msgs, &e);
497 }
498 
fireClientGotHaveNone(tr_peerMsgs * msgs)499 static void fireClientGotHaveNone(tr_peerMsgs* msgs)
500 {
501     tr_peer_event e = TR_PEER_EVENT_INIT;
502     e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
503     publish(msgs, &e);
504 }
505 
fireClientGotPieceData(tr_peerMsgs * msgs,uint32_t length)506 static void fireClientGotPieceData(tr_peerMsgs* msgs, uint32_t length)
507 {
508     tr_peer_event e = TR_PEER_EVENT_INIT;
509     e.length = length;
510     e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA;
511     publish(msgs, &e);
512 }
513 
firePeerGotPieceData(tr_peerMsgs * msgs,uint32_t length)514 static void firePeerGotPieceData(tr_peerMsgs* msgs, uint32_t length)
515 {
516     tr_peer_event e = TR_PEER_EVENT_INIT;
517     e.length = length;
518     e.eventType = TR_PEER_PEER_GOT_PIECE_DATA;
519     publish(msgs, &e);
520 }
521 
fireClientGotSuggest(tr_peerMsgs * msgs,uint32_t pieceIndex)522 static void fireClientGotSuggest(tr_peerMsgs* msgs, uint32_t pieceIndex)
523 {
524     tr_peer_event e = TR_PEER_EVENT_INIT;
525     e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
526     e.pieceIndex = pieceIndex;
527     publish(msgs, &e);
528 }
529 
fireClientGotPort(tr_peerMsgs * msgs,tr_port port)530 static void fireClientGotPort(tr_peerMsgs* msgs, tr_port port)
531 {
532     tr_peer_event e = TR_PEER_EVENT_INIT;
533     e.eventType = TR_PEER_CLIENT_GOT_PORT;
534     e.port = port;
535     publish(msgs, &e);
536 }
537 
fireClientGotAllowedFast(tr_peerMsgs * msgs,uint32_t pieceIndex)538 static void fireClientGotAllowedFast(tr_peerMsgs* msgs, uint32_t pieceIndex)
539 {
540     tr_peer_event e = TR_PEER_EVENT_INIT;
541     e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
542     e.pieceIndex = pieceIndex;
543     publish(msgs, &e);
544 }
545 
fireClientGotBitfield(tr_peerMsgs * msgs,tr_bitfield * bitfield)546 static void fireClientGotBitfield(tr_peerMsgs* msgs, tr_bitfield* bitfield)
547 {
548     tr_peer_event e = TR_PEER_EVENT_INIT;
549     e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
550     e.bitfield = bitfield;
551     publish(msgs, &e);
552 }
553 
fireClientGotHave(tr_peerMsgs * msgs,tr_piece_index_t index)554 static void fireClientGotHave(tr_peerMsgs* msgs, tr_piece_index_t index)
555 {
556     tr_peer_event e = TR_PEER_EVENT_INIT;
557     e.eventType = TR_PEER_CLIENT_GOT_HAVE;
558     e.pieceIndex = index;
559     publish(msgs, &e);
560 }
561 
562 /**
563 ***  ALLOWED FAST SET
564 ***  For explanation, see http://www.bittorrent.org/beps/bep_0006.html
565 **/
566 
567 #if 0
568 
569 size_t tr_generateAllowedSet(tr_piece_index_t* setmePieces, size_t desiredSetSize, size_t pieceCount, uint8_t const* infohash,
570     tr_address const* addr)
571 {
572     TR_ASSERT(setmePieces != NULL);
573     TR_ASSERT(desiredSetSize <= pieceCount);
574     TR_ASSERT(desiredSetSize != 0);
575     TR_ASSERT(pieceCount != 0);
576     TR_ASSERT(infohash != NULL);
577     TR_ASSERT(addr != NULL);
578 
579     size_t setSize = 0;
580 
581     if (addr->type == TR_AF_INET)
582     {
583         uint8_t w[SHA_DIGEST_LENGTH + 4];
584         uint8_t* walk = w;
585         uint8_t x[SHA_DIGEST_LENGTH];
586 
587         uint32_t ui32 = ntohl(htonl(addr->addr.addr4.s_addr) & 0xffffff00); /* (1) */
588         memcpy(w, &ui32, sizeof(uint32_t));
589         walk += sizeof(uint32_t);
590         memcpy(walk, infohash, SHA_DIGEST_LENGTH); /* (2) */
591         walk += SHA_DIGEST_LENGTH;
592         tr_sha1(x, w, walk - w, NULL); /* (3) */
593         TR_ASSERT(sizeof(w) == walk - w);
594 
595         while (setSize < desiredSetSize)
596         {
597             for (int i = 0; i < 5 && setSize < desiredSetSize; ++i) /* (4) */
598             {
599                 uint32_t j = i * 4; /* (5) */
600                 uint32_t y = ntohl(*(uint32_t*)(x + j)); /* (6) */
601                 uint32_t index = y % pieceCount; /* (7) */
602                 bool found = false;
603 
604                 for (size_t k = 0; !found && k < setSize; ++k) /* (8) */
605                 {
606                     found = setmePieces[k] == index;
607                 }
608 
609                 if (!found)
610                 {
611                     setmePieces[setSize++] = index; /* (9) */
612                 }
613             }
614 
615             tr_sha1(x, x, sizeof(x), NULL); /* (3) */
616         }
617     }
618 
619     return setSize;
620 }
621 
622 static void updateFastSet(tr_peerMsgs* msgs UNUSED)
623 {
624     bool const fext = tr_peerIoSupportsFEXT(msgs->io);
625     bool const peerIsNeedy = msgs->peer->progress < 0.10;
626 
627     if (fext && peerIsNeedy && !msgs->haveFastSet)
628     {
629         struct tr_address const* addr = tr_peerIoGetAddress(msgs->io, NULL);
630         tr_info const* inf = &msgs->torrent->info;
631         size_t const numwant = MIN(MAX_FAST_SET_SIZE, inf->pieceCount);
632 
633         /* build the fast set */
634         msgs->fastsetSize = tr_generateAllowedSet(msgs->fastset, numwant, inf->pieceCount, inf->hash, addr);
635         msgs->haveFastSet = true;
636 
637         /* send it to the peer */
638         for (size_t i = 0; i < msgs->fastsetSize; ++i)
639         {
640             protocolSendAllowedFast(msgs, msgs->fastset[i]);
641         }
642     }
643 }
644 
645 #endif
646 
647 /***
648 ****  ACTIVE
649 ***/
650 
tr_peerMsgsCalculateActive(tr_peerMsgs const * msgs,tr_direction direction)651 static bool tr_peerMsgsCalculateActive(tr_peerMsgs const* msgs, tr_direction direction)
652 {
653     TR_ASSERT(tr_isPeerMsgs(msgs));
654     TR_ASSERT(tr_isDirection(direction));
655 
656     bool is_active;
657 
658     if (direction == TR_CLIENT_TO_PEER)
659     {
660         is_active = tr_peerMsgsIsPeerInterested(msgs) && !tr_peerMsgsIsPeerChoked(msgs);
661 
662         /* FIXME: https://trac.transmissionbt.com/ticket/5505
663         if (is_active)
664         {
665             TR_ASSERT(!tr_peerIsSeed(&msgs->peer));
666         }
667         */
668     }
669     else /* TR_PEER_TO_CLIENT */
670     {
671         if (!tr_torrentHasMetadata(msgs->torrent))
672         {
673             is_active = true;
674         }
675         else
676         {
677             is_active = tr_peerMsgsIsClientInterested(msgs) && !tr_peerMsgsIsClientChoked(msgs);
678 
679             if (is_active)
680             {
681                 TR_ASSERT(!tr_torrentIsSeed(msgs->torrent));
682             }
683         }
684     }
685 
686     return is_active;
687 }
688 
tr_peerMsgsIsActive(tr_peerMsgs const * msgs,tr_direction direction)689 bool tr_peerMsgsIsActive(tr_peerMsgs const* msgs, tr_direction direction)
690 {
691     TR_ASSERT(tr_isPeerMsgs(msgs));
692     TR_ASSERT(tr_isDirection(direction));
693 
694     bool is_active = msgs->is_active[direction];
695 
696     TR_ASSERT(is_active == tr_peerMsgsCalculateActive(msgs, direction));
697 
698     return is_active;
699 }
700 
tr_peerMsgsSetActive(tr_peerMsgs * msgs,tr_direction direction,bool is_active)701 static void tr_peerMsgsSetActive(tr_peerMsgs* msgs, tr_direction direction, bool is_active)
702 {
703     dbgmsg(msgs, "direction [%d] is_active [%d]", (int)direction, (int)is_active);
704 
705     if (msgs->is_active[direction] != is_active)
706     {
707         msgs->is_active[direction] = is_active;
708 
709         tr_swarmIncrementActivePeers(msgs->torrent->swarm, direction, is_active);
710     }
711 }
712 
tr_peerMsgsUpdateActive(tr_peerMsgs * msgs,tr_direction direction)713 void tr_peerMsgsUpdateActive(tr_peerMsgs* msgs, tr_direction direction)
714 {
715     bool const is_active = tr_peerMsgsCalculateActive(msgs, direction);
716 
717     tr_peerMsgsSetActive(msgs, direction, is_active);
718 }
719 
720 /**
721 ***  INTEREST
722 **/
723 
sendInterest(tr_peerMsgs * msgs,bool b)724 static void sendInterest(tr_peerMsgs* msgs, bool b)
725 {
726     TR_ASSERT(msgs != NULL);
727 
728     struct evbuffer* out = msgs->outMessages;
729 
730     msgs->client_is_interested = b;
731     dbgmsg(msgs, "Sending %s", b ? "Interested" : "Not Interested");
732     evbuffer_add_uint32(out, sizeof(uint8_t));
733     evbuffer_add_uint8(out, b ? BT_INTERESTED : BT_NOT_INTERESTED);
734 
735     pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
736     dbgOutMessageLen(msgs);
737 }
738 
updateInterest(tr_peerMsgs * msgs UNUSED)739 static void updateInterest(tr_peerMsgs* msgs UNUSED)
740 {
741     /* FIXME -- might need to poke the mgr on startup */
742 }
743 
tr_peerMsgsSetInterested(tr_peerMsgs * msgs,bool b)744 void tr_peerMsgsSetInterested(tr_peerMsgs* msgs, bool b)
745 {
746     if (msgs->client_is_interested != b)
747     {
748         sendInterest(msgs, b);
749 
750         tr_peerMsgsUpdateActive(msgs, TR_PEER_TO_CLIENT);
751     }
752 }
753 
popNextMetadataRequest(tr_peerMsgs * msgs,int * piece)754 static bool popNextMetadataRequest(tr_peerMsgs* msgs, int* piece)
755 {
756     if (msgs->peerAskedForMetadataCount == 0)
757     {
758         return false;
759     }
760 
761     *piece = msgs->peerAskedForMetadata[0];
762 
763     tr_removeElementFromArray(msgs->peerAskedForMetadata, 0, sizeof(int), msgs->peerAskedForMetadataCount);
764     --msgs->peerAskedForMetadataCount;
765 
766     return true;
767 }
768 
popNextRequest(tr_peerMsgs * msgs,struct peer_request * setme)769 static bool popNextRequest(tr_peerMsgs* msgs, struct peer_request* setme)
770 {
771     if (msgs->peer.pendingReqsToClient == 0)
772     {
773         return false;
774     }
775 
776     *setme = msgs->peerAskedFor[0];
777 
778     tr_removeElementFromArray(msgs->peerAskedFor, 0, sizeof(struct peer_request), msgs->peer.pendingReqsToClient);
779     --msgs->peer.pendingReqsToClient;
780 
781     return true;
782 }
783 
cancelAllRequestsToClient(tr_peerMsgs * msgs)784 static void cancelAllRequestsToClient(tr_peerMsgs* msgs)
785 {
786     struct peer_request req;
787     bool const mustSendCancel = tr_peerIoSupportsFEXT(msgs->io);
788 
789     while (popNextRequest(msgs, &req))
790     {
791         if (mustSendCancel)
792         {
793             protocolSendReject(msgs, &req);
794         }
795     }
796 }
797 
tr_peerMsgsSetChoke(tr_peerMsgs * msgs,bool peer_is_choked)798 void tr_peerMsgsSetChoke(tr_peerMsgs* msgs, bool peer_is_choked)
799 {
800     TR_ASSERT(msgs != NULL);
801 
802     time_t const now = tr_time();
803     time_t const fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
804 
805     if (msgs->chokeChangedAt > fibrillationTime)
806     {
807         dbgmsg(msgs, "Not changing choke to %d to avoid fibrillation", peer_is_choked);
808     }
809     else if (msgs->peer_is_choked != peer_is_choked)
810     {
811         msgs->peer_is_choked = peer_is_choked;
812 
813         if (peer_is_choked)
814         {
815             cancelAllRequestsToClient(msgs);
816         }
817 
818         protocolSendChoke(msgs, peer_is_choked);
819         msgs->chokeChangedAt = now;
820         tr_peerMsgsUpdateActive(msgs, TR_CLIENT_TO_PEER);
821     }
822 }
823 
824 /**
825 ***
826 **/
827 
tr_peerMsgsHave(tr_peerMsgs * msgs,uint32_t index)828 void tr_peerMsgsHave(tr_peerMsgs* msgs, uint32_t index)
829 {
830     protocolSendHave(msgs, index);
831 
832     /* since we have more pieces now, we might not be interested in this peer */
833     updateInterest(msgs);
834 }
835 
836 /**
837 ***
838 **/
839 
reqIsValid(tr_peerMsgs const * peer,uint32_t index,uint32_t offset,uint32_t length)840 static bool reqIsValid(tr_peerMsgs const* peer, uint32_t index, uint32_t offset, uint32_t length)
841 {
842     return tr_torrentReqIsValid(peer->torrent, index, offset, length);
843 }
844 
requestIsValid(tr_peerMsgs const * msgs,struct peer_request const * req)845 static bool requestIsValid(tr_peerMsgs const* msgs, struct peer_request const* req)
846 {
847     return reqIsValid(msgs, req->index, req->offset, req->length);
848 }
849 
tr_peerMsgsCancel(tr_peerMsgs * msgs,tr_block_index_t block)850 void tr_peerMsgsCancel(tr_peerMsgs* msgs, tr_block_index_t block)
851 {
852     struct peer_request req;
853     // fprintf(stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n",
854     //     (size_t)block, msgs->peer);
855     blockToReq(msgs->torrent, block, &req);
856     protocolSendCancel(msgs, &req);
857 }
858 
859 /**
860 ***
861 **/
862 
sendLtepHandshake(tr_peerMsgs * msgs)863 static void sendLtepHandshake(tr_peerMsgs* msgs)
864 {
865     tr_variant val;
866     bool allow_pex;
867     bool allow_metadata_xfer;
868     struct evbuffer* payload;
869     struct evbuffer* out = msgs->outMessages;
870     unsigned char const* ipv6 = tr_globalIPv6();
871     static tr_quark version_quark = 0;
872 
873     if (msgs->clientSentLtepHandshake)
874     {
875         return;
876     }
877 
878     if (version_quark == 0)
879     {
880         version_quark = tr_quark_new(TR_NAME " " USERAGENT_PREFIX, TR_BAD_SIZE);
881     }
882 
883     dbgmsg(msgs, "sending an ltep handshake");
884     msgs->clientSentLtepHandshake = true;
885 
886     /* decide if we want to advertise metadata xfer support (BEP 9) */
887     if (tr_torrentIsPrivate(msgs->torrent))
888     {
889         allow_metadata_xfer = false;
890     }
891     else
892     {
893         allow_metadata_xfer = true;
894     }
895 
896     /* decide if we want to advertise pex support */
897     if (!tr_torrentAllowsPex(msgs->torrent))
898     {
899         allow_pex = false;
900     }
901     else if (msgs->peerSentLtepHandshake)
902     {
903         allow_pex = msgs->peerSupportsPex;
904     }
905     else
906     {
907         allow_pex = true;
908     }
909 
910     tr_variantInitDict(&val, 8);
911     tr_variantDictAddBool(&val, TR_KEY_e, getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED);
912 
913     if (ipv6 != NULL)
914     {
915         tr_variantDictAddRaw(&val, TR_KEY_ipv6, ipv6, 16);
916     }
917 
918     if (allow_metadata_xfer && tr_torrentHasMetadata(msgs->torrent) && msgs->torrent->infoDictLength > 0)
919     {
920         tr_variantDictAddInt(&val, TR_KEY_metadata_size, msgs->torrent->infoDictLength);
921     }
922 
923     tr_variantDictAddInt(&val, TR_KEY_p, tr_sessionGetPublicPeerPort(getSession(msgs)));
924     tr_variantDictAddInt(&val, TR_KEY_reqq, REQQ);
925     tr_variantDictAddBool(&val, TR_KEY_upload_only, tr_torrentIsSeed(msgs->torrent));
926     tr_variantDictAddQuark(&val, TR_KEY_v, version_quark);
927 
928     if (allow_metadata_xfer || allow_pex)
929     {
930         tr_variant* m = tr_variantDictAddDict(&val, TR_KEY_m, 2);
931 
932         if (allow_metadata_xfer)
933         {
934             tr_variantDictAddInt(m, TR_KEY_ut_metadata, UT_METADATA_ID);
935         }
936 
937         if (allow_pex)
938         {
939             tr_variantDictAddInt(m, TR_KEY_ut_pex, UT_PEX_ID);
940         }
941     }
942 
943     payload = tr_variantToBuf(&val, TR_VARIANT_FMT_BENC);
944 
945     evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
946     evbuffer_add_uint8(out, BT_LTEP);
947     evbuffer_add_uint8(out, LTEP_HANDSHAKE);
948     evbuffer_add_buffer(out, payload);
949     pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
950     dbgOutMessageLen(msgs);
951 
952     /* cleanup */
953     evbuffer_free(payload);
954     tr_variantFree(&val);
955 }
956 
parseLtepHandshake(tr_peerMsgs * msgs,uint32_t len,struct evbuffer * inbuf)957 static void parseLtepHandshake(tr_peerMsgs* msgs, uint32_t len, struct evbuffer* inbuf)
958 {
959     int64_t i;
960     tr_variant val;
961     tr_variant* sub;
962     uint8_t* tmp = tr_new(uint8_t, len);
963     uint8_t const* addr;
964     size_t addr_len;
965     tr_pex pex;
966     int8_t seedProbability = -1;
967 
968     memset(&pex, 0, sizeof(tr_pex));
969 
970     tr_peerIoReadBytes(msgs->io, inbuf, tmp, len);
971     msgs->peerSentLtepHandshake = true;
972 
973     if (tr_variantFromBenc(&val, tmp, len) != 0 || !tr_variantIsDict(&val))
974     {
975         dbgmsg(msgs, "GET  extended-handshake, couldn't get dictionary");
976         tr_free(tmp);
977         return;
978     }
979 
980     /* arbitrary limit, should be more than enough */
981     if (len <= 4096)
982     {
983         dbgmsg(msgs, "here is the handshake: [%*.*s]", (int)len, (int)len, tmp);
984     }
985     else
986     {
987         dbgmsg(msgs, "handshake length is too big (%" PRIu32 "), printing skipped", len);
988     }
989 
990     /* does the peer prefer encrypted connections? */
991     if (tr_variantDictFindInt(&val, TR_KEY_e, &i))
992     {
993         msgs->encryption_preference = i != 0 ? ENCRYPTION_PREFERENCE_YES : ENCRYPTION_PREFERENCE_NO;
994 
995         if (i != 0)
996         {
997             pex.flags |= ADDED_F_ENCRYPTION_FLAG;
998         }
999     }
1000 
1001     /* check supported messages for utorrent pex */
1002     msgs->peerSupportsPex = false;
1003     msgs->peerSupportsMetadataXfer = false;
1004 
1005     if (tr_variantDictFindDict(&val, TR_KEY_m, &sub))
1006     {
1007         if (tr_variantDictFindInt(sub, TR_KEY_ut_pex, &i))
1008         {
1009             msgs->peerSupportsPex = i != 0;
1010             msgs->ut_pex_id = (uint8_t)i;
1011             dbgmsg(msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id);
1012         }
1013 
1014         if (tr_variantDictFindInt(sub, TR_KEY_ut_metadata, &i))
1015         {
1016             msgs->peerSupportsMetadataXfer = i != 0;
1017             msgs->ut_metadata_id = (uint8_t)i;
1018             dbgmsg(msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id);
1019         }
1020 
1021         if (tr_variantDictFindInt(sub, TR_KEY_ut_holepunch, &i))
1022         {
1023             /* Mysterious µTorrent extension that we don't grok.  However,
1024                it implies support for µTP, so use it to indicate that. */
1025             tr_peerMgrSetUtpFailed(msgs->torrent, tr_peerIoGetAddress(msgs->io, NULL), false);
1026         }
1027     }
1028 
1029     /* look for metainfo size (BEP 9) */
1030     if (tr_variantDictFindInt(&val, TR_KEY_metadata_size, &i))
1031     {
1032         if (tr_torrentSetMetadataSizeHint(msgs->torrent, i))
1033         {
1034             msgs->metadata_size_hint = (size_t)i;
1035         }
1036     }
1037 
1038     /* look for upload_only (BEP 21) */
1039     if (tr_variantDictFindInt(&val, TR_KEY_upload_only, &i))
1040     {
1041         seedProbability = i == 0 ? 0 : 100;
1042     }
1043 
1044     /* get peer's listening port */
1045     if (tr_variantDictFindInt(&val, TR_KEY_p, &i))
1046     {
1047         pex.port = htons((uint16_t)i);
1048         fireClientGotPort(msgs, pex.port);
1049         dbgmsg(msgs, "peer's port is now %d", (int)i);
1050     }
1051 
1052     if (tr_peerIoIsIncoming(msgs->io) && tr_variantDictFindRaw(&val, TR_KEY_ipv4, &addr, &addr_len) && addr_len == 4)
1053     {
1054         pex.addr.type = TR_AF_INET;
1055         memcpy(&pex.addr.addr.addr4, addr, 4);
1056         tr_peerMgrAddPex(msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
1057     }
1058 
1059     if (tr_peerIoIsIncoming(msgs->io) && tr_variantDictFindRaw(&val, TR_KEY_ipv6, &addr, &addr_len) && addr_len == 16)
1060     {
1061         pex.addr.type = TR_AF_INET6;
1062         memcpy(&pex.addr.addr.addr6, addr, 16);
1063         tr_peerMgrAddPex(msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
1064     }
1065 
1066     /* get peer's maximum request queue size */
1067     if (tr_variantDictFindInt(&val, TR_KEY_reqq, &i))
1068     {
1069         msgs->reqq = i;
1070     }
1071 
1072     tr_variantFree(&val);
1073     tr_free(tmp);
1074 }
1075 
parseUtMetadata(tr_peerMsgs * msgs,uint32_t msglen,struct evbuffer * inbuf)1076 static void parseUtMetadata(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf)
1077 {
1078     tr_variant dict;
1079     char* msg_end;
1080     char const* benc_end;
1081     int64_t msg_type = -1;
1082     int64_t piece = -1;
1083     int64_t total_size = 0;
1084     uint8_t* tmp = tr_new(uint8_t, msglen);
1085 
1086     tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen);
1087     msg_end = (char*)tmp + msglen;
1088 
1089     if (tr_variantFromBencFull(&dict, tmp, msglen, NULL, &benc_end) == 0)
1090     {
1091         (void)tr_variantDictFindInt(&dict, TR_KEY_msg_type, &msg_type);
1092         (void)tr_variantDictFindInt(&dict, TR_KEY_piece, &piece);
1093         (void)tr_variantDictFindInt(&dict, TR_KEY_total_size, &total_size);
1094         tr_variantFree(&dict);
1095     }
1096 
1097     dbgmsg(msgs, "got ut_metadata msg: type %d, piece %d, total_size %d", (int)msg_type, (int)piece, (int)total_size);
1098 
1099     if (msg_type == METADATA_MSG_TYPE_REJECT)
1100     {
1101         /* NOOP */
1102     }
1103 
1104     if (msg_type == METADATA_MSG_TYPE_DATA && !tr_torrentHasMetadata(msgs->torrent) &&
1105         msg_end - benc_end <= METADATA_PIECE_SIZE && piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size)
1106     {
1107         int const pieceLen = msg_end - benc_end;
1108         tr_torrentSetMetadataPiece(msgs->torrent, piece, benc_end, pieceLen);
1109     }
1110 
1111     if (msg_type == METADATA_MSG_TYPE_REQUEST)
1112     {
1113         if (piece >= 0 && tr_torrentHasMetadata(msgs->torrent) && !tr_torrentIsPrivate(msgs->torrent) &&
1114             msgs->peerAskedForMetadataCount < METADATA_REQQ)
1115         {
1116             msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1117         }
1118         else
1119         {
1120             tr_variant tmp;
1121             struct evbuffer* payload;
1122             struct evbuffer* out = msgs->outMessages;
1123 
1124             /* build the rejection message */
1125             tr_variantInitDict(&tmp, 2);
1126             tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1127             tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
1128             payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC);
1129 
1130             /* write it out as a LTEP message to our outMessages buffer */
1131             evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
1132             evbuffer_add_uint8(out, BT_LTEP);
1133             evbuffer_add_uint8(out, msgs->ut_metadata_id);
1134             evbuffer_add_buffer(out, payload);
1135             pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
1136             dbgOutMessageLen(msgs);
1137 
1138             /* cleanup */
1139             evbuffer_free(payload);
1140             tr_variantFree(&tmp);
1141         }
1142     }
1143 
1144     tr_free(tmp);
1145 }
1146 
parseUtPex(tr_peerMsgs * msgs,uint32_t msglen,struct evbuffer * inbuf)1147 static void parseUtPex(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf)
1148 {
1149     tr_torrent* tor = msgs->torrent;
1150     if (!tr_torrentAllowsPex(tor))
1151     {
1152         return;
1153     }
1154 
1155     uint8_t* tmp = tr_new(uint8_t, msglen);
1156     tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen);
1157 
1158     tr_variant val;
1159     bool const loaded = tr_variantFromBenc(&val, tmp, msglen) == 0;
1160 
1161     tr_free(tmp);
1162 
1163     if (!loaded)
1164     {
1165         return;
1166     }
1167 
1168     uint8_t const* added;
1169     size_t added_len;
1170 
1171     if (tr_variantDictFindRaw(&val, TR_KEY_added, &added, &added_len))
1172     {
1173         tr_pex* pex;
1174         size_t n;
1175         size_t added_f_len;
1176         uint8_t const* added_f;
1177 
1178         if (!tr_variantDictFindRaw(&val, TR_KEY_added_f, &added_f, &added_f_len))
1179         {
1180             added_f_len = 0;
1181             added_f = NULL;
1182         }
1183 
1184         pex = tr_peerMgrCompactToPex(added, added_len, added_f, added_f_len, &n);
1185 
1186         n = MIN(n, MAX_PEX_PEER_COUNT);
1187 
1188         for (size_t i = 0; i < n; ++i)
1189         {
1190             int seedProbability = -1;
1191 
1192             if (i < added_f_len)
1193             {
1194                 seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) != 0 ? 100 : 0;
1195             }
1196 
1197             tr_peerMgrAddPex(tor, TR_PEER_FROM_PEX, pex + i, seedProbability);
1198         }
1199 
1200         tr_free(pex);
1201     }
1202 
1203     if (tr_variantDictFindRaw(&val, TR_KEY_added6, &added, &added_len))
1204     {
1205         tr_pex* pex;
1206         size_t n;
1207         size_t added_f_len;
1208         uint8_t const* added_f;
1209 
1210         if (!tr_variantDictFindRaw(&val, TR_KEY_added6_f, &added_f, &added_f_len))
1211         {
1212             added_f_len = 0;
1213             added_f = NULL;
1214         }
1215 
1216         pex = tr_peerMgrCompact6ToPex(added, added_len, added_f, added_f_len, &n);
1217 
1218         n = MIN(n, MAX_PEX_PEER_COUNT);
1219 
1220         for (size_t i = 0; i < n; ++i)
1221         {
1222             int seedProbability = -1;
1223 
1224             if (i < added_f_len)
1225             {
1226                 seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) != 0 ? 100 : 0;
1227             }
1228 
1229             tr_peerMgrAddPex(tor, TR_PEER_FROM_PEX, pex + i, seedProbability);
1230         }
1231 
1232         tr_free(pex);
1233     }
1234 
1235     tr_variantFree(&val);
1236 }
1237 
1238 static void sendPex(tr_peerMsgs* msgs);
1239 
parseLtep(tr_peerMsgs * msgs,uint32_t msglen,struct evbuffer * inbuf)1240 static void parseLtep(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf)
1241 {
1242     TR_ASSERT(msglen > 0);
1243 
1244     uint8_t ltep_msgid;
1245 
1246     tr_peerIoReadUint8(msgs->io, inbuf, &ltep_msgid);
1247     msglen--;
1248 
1249     if (ltep_msgid == LTEP_HANDSHAKE)
1250     {
1251         dbgmsg(msgs, "got ltep handshake");
1252         parseLtepHandshake(msgs, msglen, inbuf);
1253 
1254         if (tr_peerIoSupportsLTEP(msgs->io))
1255         {
1256             sendLtepHandshake(msgs);
1257             sendPex(msgs);
1258         }
1259     }
1260     else if (ltep_msgid == UT_PEX_ID)
1261     {
1262         dbgmsg(msgs, "got ut pex");
1263         msgs->peerSupportsPex = true;
1264         parseUtPex(msgs, msglen, inbuf);
1265     }
1266     else if (ltep_msgid == UT_METADATA_ID)
1267     {
1268         dbgmsg(msgs, "got ut metadata");
1269         msgs->peerSupportsMetadataXfer = true;
1270         parseUtMetadata(msgs, msglen, inbuf);
1271     }
1272     else
1273     {
1274         dbgmsg(msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid);
1275         evbuffer_drain(inbuf, msglen);
1276     }
1277 }
1278 
readBtLength(tr_peerMsgs * msgs,struct evbuffer * inbuf,size_t inlen)1279 static int readBtLength(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen)
1280 {
1281     uint32_t len;
1282 
1283     if (inlen < sizeof(len))
1284     {
1285         return READ_LATER;
1286     }
1287 
1288     tr_peerIoReadUint32(msgs->io, inbuf, &len);
1289 
1290     if (len == 0) /* peer sent us a keepalive message */
1291     {
1292         dbgmsg(msgs, "got KeepAlive");
1293     }
1294     else
1295     {
1296         msgs->incoming.length = len;
1297         msgs->state = AWAITING_BT_ID;
1298     }
1299 
1300     return READ_NOW;
1301 }
1302 
1303 static int readBtMessage(tr_peerMsgs*, struct evbuffer*, size_t);
1304 
readBtId(tr_peerMsgs * msgs,struct evbuffer * inbuf,size_t inlen)1305 static int readBtId(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen)
1306 {
1307     uint8_t id;
1308 
1309     if (inlen < sizeof(uint8_t))
1310     {
1311         return READ_LATER;
1312     }
1313 
1314     tr_peerIoReadUint8(msgs->io, inbuf, &id);
1315     msgs->incoming.id = id;
1316     dbgmsg(msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length);
1317 
1318     if (id == BT_PIECE)
1319     {
1320         msgs->state = AWAITING_BT_PIECE;
1321         return READ_NOW;
1322     }
1323     else if (msgs->incoming.length != 1)
1324     {
1325         msgs->state = AWAITING_BT_MESSAGE;
1326         return READ_NOW;
1327     }
1328     else
1329     {
1330         return readBtMessage(msgs, inbuf, inlen - 1);
1331     }
1332 }
1333 
updatePeerProgress(tr_peerMsgs * msgs)1334 static void updatePeerProgress(tr_peerMsgs* msgs)
1335 {
1336     tr_peerUpdateProgress(msgs->torrent, &msgs->peer);
1337 
1338     /* updateFastSet(msgs); */
1339     updateInterest(msgs);
1340 }
1341 
prefetchPieces(tr_peerMsgs * msgs)1342 static void prefetchPieces(tr_peerMsgs* msgs)
1343 {
1344     if (!getSession(msgs)->isPrefetchEnabled)
1345     {
1346         return;
1347     }
1348 
1349     for (int i = msgs->prefetchCount; i < msgs->peer.pendingReqsToClient && i < PREFETCH_SIZE; ++i)
1350     {
1351         struct peer_request const* req = msgs->peerAskedFor + i;
1352 
1353         if (requestIsValid(msgs, req))
1354         {
1355             tr_cachePrefetchBlock(getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length);
1356             ++msgs->prefetchCount;
1357         }
1358     }
1359 }
1360 
peerMadeRequest(tr_peerMsgs * msgs,struct peer_request const * req)1361 static void peerMadeRequest(tr_peerMsgs* msgs, struct peer_request const* req)
1362 {
1363     bool const fext = tr_peerIoSupportsFEXT(msgs->io);
1364     bool const reqIsValid = requestIsValid(msgs, req);
1365     bool const clientHasPiece = reqIsValid && tr_torrentPieceIsComplete(msgs->torrent, req->index);
1366     bool const peerIsChoked = msgs->peer_is_choked;
1367 
1368     bool allow = false;
1369 
1370     if (!reqIsValid)
1371     {
1372         dbgmsg(msgs, "rejecting an invalid request.");
1373     }
1374     else if (!clientHasPiece)
1375     {
1376         dbgmsg(msgs, "rejecting request for a piece we don't have.");
1377     }
1378     else if (peerIsChoked)
1379     {
1380         dbgmsg(msgs, "rejecting request from choked peer");
1381     }
1382     else if (msgs->peer.pendingReqsToClient + 1 >= REQQ)
1383     {
1384         dbgmsg(msgs, "rejecting request ... reqq is full");
1385     }
1386     else
1387     {
1388         allow = true;
1389     }
1390 
1391     if (allow)
1392     {
1393         msgs->peerAskedFor[msgs->peer.pendingReqsToClient++] = *req;
1394         prefetchPieces(msgs);
1395     }
1396     else if (fext)
1397     {
1398         protocolSendReject(msgs, req);
1399     }
1400 }
1401 
messageLengthIsCorrect(tr_peerMsgs const * msg,uint8_t id,uint32_t len)1402 static bool messageLengthIsCorrect(tr_peerMsgs const* msg, uint8_t id, uint32_t len)
1403 {
1404     switch (id)
1405     {
1406     case BT_CHOKE:
1407     case BT_UNCHOKE:
1408     case BT_INTERESTED:
1409     case BT_NOT_INTERESTED:
1410     case BT_FEXT_HAVE_ALL:
1411     case BT_FEXT_HAVE_NONE:
1412         return len == 1;
1413 
1414     case BT_HAVE:
1415     case BT_FEXT_SUGGEST:
1416     case BT_FEXT_ALLOWED_FAST:
1417         return len == 5;
1418 
1419     case BT_BITFIELD:
1420         if (tr_torrentHasMetadata(msg->torrent))
1421         {
1422             return len == (msg->torrent->info.pieceCount >> 3) + ((msg->torrent->info.pieceCount & 7) != 0 ? 1 : 0) + 1U;
1423         }
1424 
1425         /* we don't know the piece count yet,
1426            so we can only guess whether to send true or false */
1427         if (msg->metadata_size_hint > 0)
1428         {
1429             return len <= msg->metadata_size_hint;
1430         }
1431 
1432         return true;
1433 
1434     case BT_REQUEST:
1435     case BT_CANCEL:
1436     case BT_FEXT_REJECT:
1437         return len == 13;
1438 
1439     case BT_PIECE:
1440         return len > 9 && len <= 16393;
1441 
1442     case BT_PORT:
1443         return len == 3;
1444 
1445     case BT_LTEP:
1446         return len >= 2;
1447 
1448     default:
1449         return false;
1450     }
1451 }
1452 
1453 static int clientGotBlock(tr_peerMsgs* msgs, struct evbuffer* block, struct peer_request const* req);
1454 
readBtPiece(tr_peerMsgs * msgs,struct evbuffer * inbuf,size_t inlen,size_t * setme_piece_bytes_read)1455 static int readBtPiece(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen, size_t* setme_piece_bytes_read)
1456 {
1457     TR_ASSERT(evbuffer_get_length(inbuf) >= inlen);
1458 
1459     dbgmsg(msgs, "In readBtPiece");
1460 
1461     struct peer_request* req = &msgs->incoming.blockReq;
1462 
1463     if (req->length == 0)
1464     {
1465         if (inlen < 8)
1466         {
1467             return READ_LATER;
1468         }
1469 
1470         tr_peerIoReadUint32(msgs->io, inbuf, &req->index);
1471         tr_peerIoReadUint32(msgs->io, inbuf, &req->offset);
1472         req->length = msgs->incoming.length - 9;
1473         dbgmsg(msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length);
1474         return READ_NOW;
1475     }
1476     else
1477     {
1478         int err;
1479         size_t n;
1480         size_t nLeft;
1481         struct evbuffer* block_buffer;
1482 
1483         if (msgs->incoming.block == NULL)
1484         {
1485             msgs->incoming.block = evbuffer_new();
1486         }
1487 
1488         block_buffer = msgs->incoming.block;
1489 
1490         /* read in another chunk of data */
1491         nLeft = req->length - evbuffer_get_length(block_buffer);
1492         n = MIN(nLeft, inlen);
1493 
1494         tr_peerIoReadBytesToBuf(msgs->io, inbuf, block_buffer, n);
1495 
1496         fireClientGotPieceData(msgs, n);
1497         *setme_piece_bytes_read += n;
1498         dbgmsg(msgs, "got %zu bytes for block %u:%u->%u ... %d remain", n, req->index, req->offset, req->length,
1499             (int)(req->length - evbuffer_get_length(block_buffer)));
1500 
1501         if (evbuffer_get_length(block_buffer) < req->length)
1502         {
1503             return READ_LATER;
1504         }
1505 
1506         /* pass the block along... */
1507         err = clientGotBlock(msgs, block_buffer, req);
1508         evbuffer_drain(block_buffer, evbuffer_get_length(block_buffer));
1509 
1510         /* cleanup */
1511         req->length = 0;
1512         msgs->state = AWAITING_BT_LENGTH;
1513         return err != 0 ? READ_ERR : READ_NOW;
1514     }
1515 }
1516 
1517 static void updateDesiredRequestCount(tr_peerMsgs* msgs);
1518 
readBtMessage(tr_peerMsgs * msgs,struct evbuffer * inbuf,size_t inlen)1519 static int readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen)
1520 {
1521     uint8_t const id = msgs->incoming.id;
1522 #ifdef TR_ENABLE_ASSERTS
1523     size_t const startBufLen = evbuffer_get_length(inbuf);
1524 #endif
1525     bool const fext = tr_peerIoSupportsFEXT(msgs->io);
1526 
1527     uint32_t ui32;
1528     uint32_t msglen = msgs->incoming.length;
1529 
1530     TR_ASSERT(msglen > 0);
1531 
1532     --msglen; /* id length */
1533 
1534     dbgmsg(msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen);
1535 
1536     if (inlen < msglen)
1537     {
1538         return READ_LATER;
1539     }
1540 
1541     if (!messageLengthIsCorrect(msgs, id, msglen + 1))
1542     {
1543         dbgmsg(msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen);
1544         fireError(msgs, EMSGSIZE);
1545         return READ_ERR;
1546     }
1547 
1548     switch (id)
1549     {
1550     case BT_CHOKE:
1551         dbgmsg(msgs, "got Choke");
1552         msgs->client_is_choked = true;
1553 
1554         if (!fext)
1555         {
1556             fireGotChoke(msgs);
1557         }
1558 
1559         tr_peerMsgsUpdateActive(msgs, TR_PEER_TO_CLIENT);
1560         break;
1561 
1562     case BT_UNCHOKE:
1563         dbgmsg(msgs, "got Unchoke");
1564         msgs->client_is_choked = false;
1565         tr_peerMsgsUpdateActive(msgs, TR_PEER_TO_CLIENT);
1566         updateDesiredRequestCount(msgs);
1567         break;
1568 
1569     case BT_INTERESTED:
1570         dbgmsg(msgs, "got Interested");
1571         msgs->peer_is_interested = true;
1572         tr_peerMsgsUpdateActive(msgs, TR_CLIENT_TO_PEER);
1573         break;
1574 
1575     case BT_NOT_INTERESTED:
1576         dbgmsg(msgs, "got Not Interested");
1577         msgs->peer_is_interested = false;
1578         tr_peerMsgsUpdateActive(msgs, TR_CLIENT_TO_PEER);
1579         break;
1580 
1581     case BT_HAVE:
1582         tr_peerIoReadUint32(msgs->io, inbuf, &ui32);
1583         dbgmsg(msgs, "got Have: %u", ui32);
1584 
1585         if (tr_torrentHasMetadata(msgs->torrent) && ui32 >= msgs->torrent->info.pieceCount)
1586         {
1587             fireError(msgs, ERANGE);
1588             return READ_ERR;
1589         }
1590 
1591         /* a peer can send the same HAVE message twice... */
1592         if (!tr_bitfieldHas(&msgs->peer.have, ui32))
1593         {
1594             tr_bitfieldAdd(&msgs->peer.have, ui32);
1595             fireClientGotHave(msgs, ui32);
1596         }
1597 
1598         updatePeerProgress(msgs);
1599         break;
1600 
1601     case BT_BITFIELD:
1602         {
1603             uint8_t* tmp = tr_new(uint8_t, msglen);
1604             dbgmsg(msgs, "got a bitfield");
1605             tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen);
1606             tr_bitfieldSetRaw(&msgs->peer.have, tmp, msglen, tr_torrentHasMetadata(msgs->torrent));
1607             fireClientGotBitfield(msgs, &msgs->peer.have);
1608             updatePeerProgress(msgs);
1609             tr_free(tmp);
1610             break;
1611         }
1612 
1613     case BT_REQUEST:
1614         {
1615             struct peer_request r;
1616             tr_peerIoReadUint32(msgs->io, inbuf, &r.index);
1617             tr_peerIoReadUint32(msgs->io, inbuf, &r.offset);
1618             tr_peerIoReadUint32(msgs->io, inbuf, &r.length);
1619             dbgmsg(msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length);
1620             peerMadeRequest(msgs, &r);
1621             break;
1622         }
1623 
1624     case BT_CANCEL:
1625         {
1626             struct peer_request r;
1627             tr_peerIoReadUint32(msgs->io, inbuf, &r.index);
1628             tr_peerIoReadUint32(msgs->io, inbuf, &r.offset);
1629             tr_peerIoReadUint32(msgs->io, inbuf, &r.length);
1630             tr_historyAdd(&msgs->peer.cancelsSentToClient, tr_time(), 1);
1631             dbgmsg(msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length);
1632 
1633             for (int i = 0; i < msgs->peer.pendingReqsToClient; ++i)
1634             {
1635                 struct peer_request const* req = msgs->peerAskedFor + i;
1636 
1637                 if (req->index == r.index && req->offset == r.offset && req->length == r.length)
1638                 {
1639                     tr_removeElementFromArray(msgs->peerAskedFor, i, sizeof(struct peer_request),
1640                         msgs->peer.pendingReqsToClient);
1641                     --msgs->peer.pendingReqsToClient;
1642                     break;
1643                 }
1644             }
1645 
1646             break;
1647         }
1648 
1649     case BT_PIECE:
1650         TR_ASSERT(false); /* handled elsewhere! */
1651         break;
1652 
1653     case BT_PORT:
1654         dbgmsg(msgs, "Got a BT_PORT");
1655         tr_peerIoReadUint16(msgs->io, inbuf, &msgs->dht_port);
1656 
1657         if (msgs->dht_port > 0)
1658         {
1659             tr_dhtAddNode(getSession(msgs), tr_peerAddress(&msgs->peer), msgs->dht_port, false);
1660         }
1661 
1662         break;
1663 
1664     case BT_FEXT_SUGGEST:
1665         dbgmsg(msgs, "Got a BT_FEXT_SUGGEST");
1666         tr_peerIoReadUint32(msgs->io, inbuf, &ui32);
1667 
1668         if (fext)
1669         {
1670             fireClientGotSuggest(msgs, ui32);
1671         }
1672         else
1673         {
1674             fireError(msgs, EMSGSIZE);
1675             return READ_ERR;
1676         }
1677 
1678         break;
1679 
1680     case BT_FEXT_ALLOWED_FAST:
1681         dbgmsg(msgs, "Got a BT_FEXT_ALLOWED_FAST");
1682         tr_peerIoReadUint32(msgs->io, inbuf, &ui32);
1683 
1684         if (fext)
1685         {
1686             fireClientGotAllowedFast(msgs, ui32);
1687         }
1688         else
1689         {
1690             fireError(msgs, EMSGSIZE);
1691             return READ_ERR;
1692         }
1693 
1694         break;
1695 
1696     case BT_FEXT_HAVE_ALL:
1697         dbgmsg(msgs, "Got a BT_FEXT_HAVE_ALL");
1698 
1699         if (fext)
1700         {
1701             tr_bitfieldSetHasAll(&msgs->peer.have);
1702             TR_ASSERT(tr_bitfieldHasAll(&msgs->peer.have));
1703             fireClientGotHaveAll(msgs);
1704             updatePeerProgress(msgs);
1705         }
1706         else
1707         {
1708             fireError(msgs, EMSGSIZE);
1709             return READ_ERR;
1710         }
1711 
1712         break;
1713 
1714     case BT_FEXT_HAVE_NONE:
1715         dbgmsg(msgs, "Got a BT_FEXT_HAVE_NONE");
1716 
1717         if (fext)
1718         {
1719             tr_bitfieldSetHasNone(&msgs->peer.have);
1720             fireClientGotHaveNone(msgs);
1721             updatePeerProgress(msgs);
1722         }
1723         else
1724         {
1725             fireError(msgs, EMSGSIZE);
1726             return READ_ERR;
1727         }
1728 
1729         break;
1730 
1731     case BT_FEXT_REJECT:
1732         {
1733             struct peer_request r;
1734             dbgmsg(msgs, "Got a BT_FEXT_REJECT");
1735             tr_peerIoReadUint32(msgs->io, inbuf, &r.index);
1736             tr_peerIoReadUint32(msgs->io, inbuf, &r.offset);
1737             tr_peerIoReadUint32(msgs->io, inbuf, &r.length);
1738 
1739             if (fext)
1740             {
1741                 fireGotRej(msgs, &r);
1742             }
1743             else
1744             {
1745                 fireError(msgs, EMSGSIZE);
1746                 return READ_ERR;
1747             }
1748 
1749             break;
1750         }
1751 
1752     case BT_LTEP:
1753         dbgmsg(msgs, "Got a BT_LTEP");
1754         parseLtep(msgs, msglen, inbuf);
1755         break;
1756 
1757     default:
1758         dbgmsg(msgs, "peer sent us an UNKNOWN: %d", (int)id);
1759         tr_peerIoDrain(msgs->io, inbuf, msglen);
1760         break;
1761     }
1762 
1763     TR_ASSERT(msglen + 1 == msgs->incoming.length);
1764     TR_ASSERT(evbuffer_get_length(inbuf) == startBufLen - msglen);
1765 
1766     msgs->state = AWAITING_BT_LENGTH;
1767     return READ_NOW;
1768 }
1769 
1770 /* returns 0 on success, or an errno on failure */
clientGotBlock(tr_peerMsgs * msgs,struct evbuffer * data,struct peer_request const * req)1771 static int clientGotBlock(tr_peerMsgs* msgs, struct evbuffer* data, struct peer_request const* req)
1772 {
1773     TR_ASSERT(msgs != NULL);
1774     TR_ASSERT(req != NULL);
1775 
1776     int err;
1777     tr_torrent* tor = msgs->torrent;
1778     tr_block_index_t const block = _tr_block(tor, req->index, req->offset);
1779 
1780     if (!requestIsValid(msgs, req))
1781     {
1782         dbgmsg(msgs, "dropping invalid block %u:%u->%u", req->index, req->offset, req->length);
1783         return EBADMSG;
1784     }
1785 
1786     if (req->length != tr_torBlockCountBytes(msgs->torrent, block))
1787     {
1788         dbgmsg(msgs, "wrong block size -- expected %u, got %d", tr_torBlockCountBytes(msgs->torrent, block), req->length);
1789         return EMSGSIZE;
1790     }
1791 
1792     dbgmsg(msgs, "got block %u:%u->%u", req->index, req->offset, req->length);
1793 
1794     if (!tr_peerMgrDidPeerRequest(msgs->torrent, &msgs->peer, block))
1795     {
1796         dbgmsg(msgs, "we didn't ask for this message...");
1797         return 0;
1798     }
1799 
1800     if (tr_torrentPieceIsComplete(msgs->torrent, req->index))
1801     {
1802         dbgmsg(msgs, "we did ask for this message, but the piece is already complete...");
1803         return 0;
1804     }
1805 
1806     /**
1807     ***  Save the block
1808     **/
1809 
1810     if ((err = tr_cacheWriteBlock(getSession(msgs)->cache, tor, req->index, req->offset, req->length, data)) != 0)
1811     {
1812         return err;
1813     }
1814 
1815     tr_bitfieldAdd(&msgs->peer.blame, req->index);
1816     fireGotBlock(msgs, req);
1817     return 0;
1818 }
1819 
1820 static void peerPulse(void* vmsgs);
1821 
didWrite(tr_peerIo * io,size_t bytesWritten,bool wasPieceData,void * vmsgs)1822 static void didWrite(tr_peerIo* io, size_t bytesWritten, bool wasPieceData, void* vmsgs)
1823 {
1824     tr_peerMsgs* msgs = vmsgs;
1825 
1826     if (wasPieceData)
1827     {
1828         firePeerGotPieceData(msgs, bytesWritten);
1829     }
1830 
1831     if (tr_isPeerIo(io) && io->userData != NULL)
1832     {
1833         peerPulse(msgs);
1834     }
1835 }
1836 
canRead(tr_peerIo * io,void * vmsgs,size_t * piece)1837 static ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece)
1838 {
1839     ReadState ret;
1840     tr_peerMsgs* msgs = vmsgs;
1841     struct evbuffer* in = tr_peerIoGetReadBuffer(io);
1842     size_t const inlen = evbuffer_get_length(in);
1843 
1844     dbgmsg(msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state);
1845 
1846     if (inlen == 0)
1847     {
1848         ret = READ_LATER;
1849     }
1850     else if (msgs->state == AWAITING_BT_PIECE)
1851     {
1852         ret = readBtPiece(msgs, in, inlen, piece);
1853     }
1854     else
1855     {
1856         switch (msgs->state)
1857         {
1858         case AWAITING_BT_LENGTH:
1859             ret = readBtLength(msgs, in, inlen);
1860             break;
1861 
1862         case AWAITING_BT_ID:
1863             ret = readBtId(msgs, in, inlen);
1864             break;
1865 
1866         case AWAITING_BT_MESSAGE:
1867             ret = readBtMessage(msgs, in, inlen);
1868             break;
1869 
1870         default:
1871             ret = READ_ERR;
1872             TR_ASSERT_MSG(false, "unhandled peer messages state %d", (int)msgs->state);
1873         }
1874     }
1875 
1876     dbgmsg(msgs, "canRead: ret is %d", (int)ret);
1877 
1878     return ret;
1879 }
1880 
tr_peerMsgsIsReadingBlock(tr_peerMsgs const * msgs,tr_block_index_t block)1881 bool tr_peerMsgsIsReadingBlock(tr_peerMsgs const* msgs, tr_block_index_t block)
1882 {
1883     if (msgs->state != AWAITING_BT_PIECE)
1884     {
1885         return false;
1886     }
1887 
1888     return block == _tr_block(msgs->torrent, msgs->incoming.blockReq.index, msgs->incoming.blockReq.offset);
1889 }
1890 
1891 /**
1892 ***
1893 **/
1894 
updateDesiredRequestCount(tr_peerMsgs * msgs)1895 static void updateDesiredRequestCount(tr_peerMsgs* msgs)
1896 {
1897     tr_torrent* const torrent = msgs->torrent;
1898 
1899     /* there are lots of reasons we might not want to request any blocks... */
1900     if (tr_torrentIsSeed(torrent) || !tr_torrentHasMetadata(torrent) || msgs->client_is_choked || !msgs->client_is_interested)
1901     {
1902         msgs->desiredRequestCount = 0;
1903     }
1904     else
1905     {
1906         int estimatedBlocksInPeriod;
1907         unsigned int rate_Bps;
1908         unsigned int irate_Bps;
1909         int const floor = 4;
1910         int const seconds = REQUEST_BUF_SECS;
1911         uint64_t const now = tr_time_msec();
1912 
1913         /* Get the rate limit we should use.
1914          * FIXME: this needs to consider all the other peers as well... */
1915         rate_Bps = tr_peerGetPieceSpeed_Bps(&msgs->peer, now, TR_PEER_TO_CLIENT);
1916 
1917         if (tr_torrentUsesSpeedLimit(torrent, TR_PEER_TO_CLIENT))
1918         {
1919             rate_Bps = MIN(rate_Bps, tr_torrentGetSpeedLimit_Bps(torrent, TR_PEER_TO_CLIENT));
1920         }
1921 
1922         /* honor the session limits, if enabled */
1923         if (tr_torrentUsesSessionLimits(torrent) &&
1924             tr_sessionGetActiveSpeedLimit_Bps(torrent->session, TR_PEER_TO_CLIENT, &irate_Bps))
1925         {
1926             rate_Bps = MIN(rate_Bps, irate_Bps);
1927         }
1928 
1929         /* use this desired rate to figure out how
1930          * many requests we should send to this peer */
1931         estimatedBlocksInPeriod = (rate_Bps * seconds) / torrent->blockSize;
1932         msgs->desiredRequestCount = MAX(floor, estimatedBlocksInPeriod);
1933 
1934         /* honor the peer's maximum request count, if specified */
1935         if (msgs->reqq > 0)
1936         {
1937             if (msgs->desiredRequestCount > msgs->reqq)
1938             {
1939                 msgs->desiredRequestCount = msgs->reqq;
1940             }
1941         }
1942     }
1943 }
1944 
updateMetadataRequests(tr_peerMsgs * msgs,time_t now)1945 static void updateMetadataRequests(tr_peerMsgs* msgs, time_t now)
1946 {
1947     int piece;
1948 
1949     if (msgs->peerSupportsMetadataXfer && tr_torrentGetNextMetadataRequest(msgs->torrent, now, &piece))
1950     {
1951         tr_variant tmp;
1952         struct evbuffer* payload;
1953         struct evbuffer* out = msgs->outMessages;
1954 
1955         /* build the data message */
1956         tr_variantInitDict(&tmp, 3);
1957         tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REQUEST);
1958         tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
1959         payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC);
1960 
1961         dbgmsg(msgs, "requesting metadata piece #%d", piece);
1962 
1963         /* write it out as a LTEP message to our outMessages buffer */
1964         evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
1965         evbuffer_add_uint8(out, BT_LTEP);
1966         evbuffer_add_uint8(out, msgs->ut_metadata_id);
1967         evbuffer_add_buffer(out, payload);
1968         pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
1969         dbgOutMessageLen(msgs);
1970 
1971         /* cleanup */
1972         evbuffer_free(payload);
1973         tr_variantFree(&tmp);
1974     }
1975 }
1976 
updateBlockRequests(tr_peerMsgs * msgs)1977 static void updateBlockRequests(tr_peerMsgs* msgs)
1978 {
1979     if (tr_torrentIsPieceTransferAllowed(msgs->torrent, TR_PEER_TO_CLIENT) && msgs->desiredRequestCount > 0 &&
1980         msgs->peer.pendingReqsToPeer <= msgs->desiredRequestCount * 0.66)
1981     {
1982         TR_ASSERT(tr_peerMsgsIsClientInterested(msgs));
1983         TR_ASSERT(!tr_peerMsgsIsClientChoked(msgs));
1984 
1985         int n;
1986         tr_block_index_t* blocks;
1987         int const numwant = msgs->desiredRequestCount - msgs->peer.pendingReqsToPeer;
1988 
1989         blocks = tr_new(tr_block_index_t, numwant);
1990         tr_peerMgrGetNextRequests(msgs->torrent, &msgs->peer, numwant, blocks, &n, false);
1991 
1992         for (int i = 0; i < n; ++i)
1993         {
1994             struct peer_request req;
1995             blockToReq(msgs->torrent, blocks[i], &req);
1996             protocolSendRequest(msgs, &req);
1997         }
1998 
1999         tr_free(blocks);
2000     }
2001 }
2002 
fillOutputBuffer(tr_peerMsgs * msgs,time_t now)2003 static size_t fillOutputBuffer(tr_peerMsgs* msgs, time_t now)
2004 {
2005     int piece;
2006     size_t bytesWritten = 0;
2007     struct peer_request req;
2008     bool const haveMessages = evbuffer_get_length(msgs->outMessages) != 0;
2009     bool const fext = tr_peerIoSupportsFEXT(msgs->io);
2010 
2011     /**
2012     ***  Protocol messages
2013     **/
2014 
2015     if (haveMessages && msgs->outMessagesBatchedAt == 0) /* fresh batch */
2016     {
2017         dbgmsg(msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length(msgs->outMessages));
2018         msgs->outMessagesBatchedAt = now;
2019     }
2020     else if (haveMessages && now - msgs->outMessagesBatchedAt >= msgs->outMessagesBatchPeriod)
2021     {
2022         size_t const len = evbuffer_get_length(msgs->outMessages);
2023         /* flush the protocol messages */
2024         dbgmsg(msgs, "flushing outMessages... to %p (length is %zu)", (void*)msgs->io, len);
2025         tr_peerIoWriteBuf(msgs->io, msgs->outMessages, false);
2026         msgs->clientSentAnythingAt = now;
2027         msgs->outMessagesBatchedAt = 0;
2028         msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2029         bytesWritten += len;
2030     }
2031 
2032     /**
2033     ***  Metadata Pieces
2034     **/
2035 
2036     if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= METADATA_PIECE_SIZE && popNextMetadataRequest(msgs, &piece))
2037     {
2038         char* data;
2039         size_t dataLen;
2040         bool ok = false;
2041 
2042         data = tr_torrentGetMetadataPiece(msgs->torrent, piece, &dataLen);
2043 
2044         if (data != NULL)
2045         {
2046             tr_variant tmp;
2047             struct evbuffer* payload;
2048             struct evbuffer* out = msgs->outMessages;
2049 
2050             /* build the data message */
2051             tr_variantInitDict(&tmp, 3);
2052             tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_DATA);
2053             tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
2054             tr_variantDictAddInt(&tmp, TR_KEY_total_size, msgs->torrent->infoDictLength);
2055             payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC);
2056 
2057             /* write it out as a LTEP message to our outMessages buffer */
2058             evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload) + dataLen);
2059             evbuffer_add_uint8(out, BT_LTEP);
2060             evbuffer_add_uint8(out, msgs->ut_metadata_id);
2061             evbuffer_add_buffer(out, payload);
2062             evbuffer_add(out, data, dataLen);
2063             pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
2064             dbgOutMessageLen(msgs);
2065 
2066             evbuffer_free(payload);
2067             tr_variantFree(&tmp);
2068             tr_free(data);
2069 
2070             ok = true;
2071         }
2072 
2073         if (!ok) /* send a rejection message */
2074         {
2075             tr_variant tmp;
2076             struct evbuffer* payload;
2077             struct evbuffer* out = msgs->outMessages;
2078 
2079             /* build the rejection message */
2080             tr_variantInitDict(&tmp, 2);
2081             tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
2082             tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
2083             payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC);
2084 
2085             /* write it out as a LTEP message to our outMessages buffer */
2086             evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
2087             evbuffer_add_uint8(out, BT_LTEP);
2088             evbuffer_add_uint8(out, msgs->ut_metadata_id);
2089             evbuffer_add_buffer(out, payload);
2090             pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
2091             dbgOutMessageLen(msgs);
2092 
2093             evbuffer_free(payload);
2094             tr_variantFree(&tmp);
2095         }
2096     }
2097 
2098     /**
2099     ***  Data Blocks
2100     **/
2101 
2102     if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= msgs->torrent->blockSize && popNextRequest(msgs, &req))
2103     {
2104         --msgs->prefetchCount;
2105 
2106         if (requestIsValid(msgs, &req) && tr_torrentPieceIsComplete(msgs->torrent, req.index))
2107         {
2108             bool err;
2109             uint32_t const msglen = 4 + 1 + 4 + 4 + req.length;
2110             struct evbuffer* out;
2111             struct evbuffer_iovec iovec[1];
2112 
2113             out = evbuffer_new();
2114             evbuffer_expand(out, msglen);
2115 
2116             evbuffer_add_uint32(out, sizeof(uint8_t) + 2 * sizeof(uint32_t) + req.length);
2117             evbuffer_add_uint8(out, BT_PIECE);
2118             evbuffer_add_uint32(out, req.index);
2119             evbuffer_add_uint32(out, req.offset);
2120 
2121             evbuffer_reserve_space(out, req.length, iovec, 1);
2122             err = tr_cacheReadBlock(getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length,
2123                 iovec[0].iov_base) != 0;
2124             iovec[0].iov_len = req.length;
2125             evbuffer_commit_space(out, iovec, 1);
2126 
2127             /* check the piece if it needs checking... */
2128             if (!err && tr_torrentPieceNeedsCheck(msgs->torrent, req.index))
2129             {
2130                 err = !tr_torrentCheckPiece(msgs->torrent, req.index);
2131 
2132                 if (err)
2133                 {
2134                     tr_torrentSetLocalError(msgs->torrent, _("Please Verify Local Data! Piece #%zu is corrupt."),
2135                         (size_t)req.index);
2136                 }
2137             }
2138 
2139             if (err)
2140             {
2141                 if (fext)
2142                 {
2143                     protocolSendReject(msgs, &req);
2144                 }
2145             }
2146             else
2147             {
2148                 size_t const n = evbuffer_get_length(out);
2149                 dbgmsg(msgs, "sending block %u:%u->%u", req.index, req.offset, req.length);
2150                 TR_ASSERT(n == msglen);
2151                 tr_peerIoWriteBuf(msgs->io, out, true);
2152                 bytesWritten += n;
2153                 msgs->clientSentAnythingAt = now;
2154                 tr_historyAdd(&msgs->peer.blocksSentToPeer, tr_time(), 1);
2155             }
2156 
2157             evbuffer_free(out);
2158 
2159             if (err)
2160             {
2161                 bytesWritten = 0;
2162                 msgs = NULL;
2163             }
2164         }
2165         else if (fext) /* peer needs a reject message */
2166         {
2167             protocolSendReject(msgs, &req);
2168         }
2169 
2170         if (msgs != NULL)
2171         {
2172             prefetchPieces(msgs);
2173         }
2174     }
2175 
2176     /**
2177     ***  Keepalive
2178     **/
2179 
2180     if (msgs != NULL && msgs->clientSentAnythingAt != 0 && now - msgs->clientSentAnythingAt > KEEPALIVE_INTERVAL_SECS)
2181     {
2182         dbgmsg(msgs, "sending a keepalive message");
2183         evbuffer_add_uint32(msgs->outMessages, 0);
2184         pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2185     }
2186 
2187     return bytesWritten;
2188 }
2189 
peerPulse(void * vmsgs)2190 static void peerPulse(void* vmsgs)
2191 {
2192     tr_peerMsgs* msgs = vmsgs;
2193     time_t const now = tr_time();
2194 
2195     if (tr_isPeerIo(msgs->io))
2196     {
2197         updateDesiredRequestCount(msgs);
2198         updateBlockRequests(msgs);
2199         updateMetadataRequests(msgs, now);
2200     }
2201 
2202     for (;;)
2203     {
2204         if (fillOutputBuffer(msgs, now) < 1)
2205         {
2206             break;
2207         }
2208     }
2209 }
2210 
tr_peerMsgsPulse(tr_peerMsgs * msgs)2211 void tr_peerMsgsPulse(tr_peerMsgs* msgs)
2212 {
2213     if (msgs != NULL)
2214     {
2215         peerPulse(msgs);
2216     }
2217 }
2218 
gotError(tr_peerIo * io UNUSED,short what,void * vmsgs)2219 static void gotError(tr_peerIo* io UNUSED, short what, void* vmsgs)
2220 {
2221     if ((what & BEV_EVENT_TIMEOUT) != 0)
2222     {
2223         dbgmsg(vmsgs, "libevent got a timeout, what=%hd", what);
2224     }
2225 
2226     if ((what & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) != 0)
2227     {
2228         dbgmsg(vmsgs, "libevent got an error! what=%hd, errno=%d (%s)", what, errno, tr_strerror(errno));
2229     }
2230 
2231     fireError(vmsgs, ENOTCONN);
2232 }
2233 
sendBitfield(tr_peerMsgs * msgs)2234 static void sendBitfield(tr_peerMsgs* msgs)
2235 {
2236     TR_ASSERT(tr_torrentHasMetadata(msgs->torrent));
2237 
2238     void* bytes;
2239     size_t byte_count = 0;
2240     struct evbuffer* out = msgs->outMessages;
2241 
2242     bytes = tr_torrentCreatePieceBitfield(msgs->torrent, &byte_count);
2243     evbuffer_add_uint32(out, sizeof(uint8_t) + byte_count);
2244     evbuffer_add_uint8(out, BT_BITFIELD);
2245     evbuffer_add(out, bytes, byte_count);
2246     dbgmsg(msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length(out));
2247     pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2248 
2249     tr_free(bytes);
2250 }
2251 
tellPeerWhatWeHave(tr_peerMsgs * msgs)2252 static void tellPeerWhatWeHave(tr_peerMsgs* msgs)
2253 {
2254     bool const fext = tr_peerIoSupportsFEXT(msgs->io);
2255 
2256     if (fext && tr_torrentHasAll(msgs->torrent))
2257     {
2258         protocolSendHaveAll(msgs);
2259     }
2260     else if (fext && tr_torrentHasNone(msgs->torrent))
2261     {
2262         protocolSendHaveNone(msgs);
2263     }
2264     else if (!tr_torrentHasNone(msgs->torrent))
2265     {
2266         sendBitfield(msgs);
2267     }
2268 }
2269 
2270 /**
2271 ***
2272 **/
2273 
2274 /* some peers give us error messages if we send
2275    more than this many peers in a single pex message
2276    http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2277 #define MAX_PEX_ADDED 50
2278 #define MAX_PEX_DROPPED 50
2279 
2280 typedef struct
2281 {
2282     tr_pex* added;
2283     tr_pex* dropped;
2284     tr_pex* elements;
2285     int addedCount;
2286     int droppedCount;
2287     int elementCount;
2288 }
2289 PexDiffs;
2290 
pexAddedCb(void * vpex,void * userData)2291 static void pexAddedCb(void* vpex, void* userData)
2292 {
2293     PexDiffs* diffs = userData;
2294     tr_pex* pex = vpex;
2295 
2296     if (diffs->addedCount < MAX_PEX_ADDED)
2297     {
2298         diffs->added[diffs->addedCount++] = *pex;
2299         diffs->elements[diffs->elementCount++] = *pex;
2300     }
2301 }
2302 
pexDroppedCb(void * vpex,void * userData)2303 static inline void pexDroppedCb(void* vpex, void* userData)
2304 {
2305     PexDiffs* diffs = userData;
2306     tr_pex* pex = vpex;
2307 
2308     if (diffs->droppedCount < MAX_PEX_DROPPED)
2309     {
2310         diffs->dropped[diffs->droppedCount++] = *pex;
2311     }
2312 }
2313 
pexElementCb(void * vpex,void * userData)2314 static inline void pexElementCb(void* vpex, void* userData)
2315 {
2316     PexDiffs* diffs = userData;
2317     tr_pex* pex = vpex;
2318 
2319     diffs->elements[diffs->elementCount++] = *pex;
2320 }
2321 
2322 typedef void (* tr_set_func)(void* element, void* userData);
2323 
2324 /**
2325  * @brief find the differences and commonalities in two sorted sets
2326  * @param a the first set
2327  * @param aCount the number of elements in the set 'a'
2328  * @param b the second set
2329  * @param bCount the number of elements in the set 'b'
2330  * @param compare the sorting method for both sets
2331  * @param elementSize the sizeof the element in the two sorted sets
2332  * @param in_a called for items in set 'a' but not set 'b'
2333  * @param in_b called for items in set 'b' but not set 'a'
2334  * @param in_both called for items that are in both sets
2335  * @param userData user data passed along to in_a, in_b, and in_both
2336  */
tr_set_compare(void const * va,size_t aCount,void const * vb,size_t bCount,tr_voidptr_compare_func compare,size_t elementSize,tr_set_func in_a_cb,tr_set_func in_b_cb,tr_set_func in_both_cb,void * userData)2337 static void tr_set_compare(void const* va, size_t aCount, void const* vb, size_t bCount, tr_voidptr_compare_func compare,
2338     size_t elementSize, tr_set_func in_a_cb, tr_set_func in_b_cb, tr_set_func in_both_cb, void* userData)
2339 {
2340     uint8_t const* a = va;
2341     uint8_t const* b = vb;
2342     uint8_t const* aend = a + elementSize * aCount;
2343     uint8_t const* bend = b + elementSize * bCount;
2344 
2345     while (a != aend || b != bend)
2346     {
2347         if (a == aend)
2348         {
2349             (*in_b_cb)((void*)b, userData);
2350             b += elementSize;
2351         }
2352         else if (b == bend)
2353         {
2354             (*in_a_cb)((void*)a, userData);
2355             a += elementSize;
2356         }
2357         else
2358         {
2359             int const val = (*compare)(a, b);
2360 
2361             if (val == 0)
2362             {
2363                 (*in_both_cb)((void*)a, userData);
2364                 a += elementSize;
2365                 b += elementSize;
2366             }
2367             else if (val < 0)
2368             {
2369                 (*in_a_cb)((void*)a, userData);
2370                 a += elementSize;
2371             }
2372             else if (val > 0)
2373             {
2374                 (*in_b_cb)((void*)b, userData);
2375                 b += elementSize;
2376             }
2377         }
2378     }
2379 }
2380 
sendPex(tr_peerMsgs * msgs)2381 static void sendPex(tr_peerMsgs* msgs)
2382 {
2383     if (msgs->peerSupportsPex && tr_torrentAllowsPex(msgs->torrent))
2384     {
2385         PexDiffs diffs;
2386         PexDiffs diffs6;
2387         tr_pex* newPex = NULL;
2388         tr_pex* newPex6 = NULL;
2389         int const newCount = tr_peerMgrGetPeers(msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2390         int const newCount6 = tr_peerMgrGetPeers(msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2391 
2392         /* build the diffs */
2393         diffs.added = tr_new(tr_pex, newCount);
2394         diffs.addedCount = 0;
2395         diffs.dropped = tr_new(tr_pex, msgs->pexCount);
2396         diffs.droppedCount = 0;
2397         diffs.elements = tr_new(tr_pex, newCount + msgs->pexCount);
2398         diffs.elementCount = 0;
2399         tr_set_compare(msgs->pex, msgs->pexCount, newPex, newCount, tr_pexCompare, sizeof(tr_pex), pexDroppedCb, pexAddedCb,
2400             pexElementCb, &diffs);
2401         diffs6.added = tr_new(tr_pex, newCount6);
2402         diffs6.addedCount = 0;
2403         diffs6.dropped = tr_new(tr_pex, msgs->pexCount6);
2404         diffs6.droppedCount = 0;
2405         diffs6.elements = tr_new(tr_pex, newCount6 + msgs->pexCount6);
2406         diffs6.elementCount = 0;
2407         tr_set_compare(msgs->pex6, msgs->pexCount6, newPex6, newCount6, tr_pexCompare, sizeof(tr_pex), pexDroppedCb, pexAddedCb,
2408             pexElementCb, &diffs6);
2409         dbgmsg(msgs, "pex: old peer count %d+%d, new peer count %d+%d, added %d+%d, removed %d+%d", msgs->pexCount,
2410             msgs->pexCount6, newCount, newCount6, diffs.addedCount, diffs6.addedCount, diffs.droppedCount, diffs6.droppedCount);
2411 
2412         if (diffs.addedCount == 0 && diffs.droppedCount == 0 && diffs6.addedCount == 0 && diffs6.droppedCount == 0)
2413         {
2414             tr_free(diffs.elements);
2415             tr_free(diffs6.elements);
2416         }
2417         else
2418         {
2419             tr_variant val;
2420             uint8_t* tmp;
2421             uint8_t* walk;
2422             struct evbuffer* payload;
2423             struct evbuffer* out = msgs->outMessages;
2424 
2425             /* update peer */
2426             tr_free(msgs->pex);
2427             msgs->pex = diffs.elements;
2428             msgs->pexCount = diffs.elementCount;
2429             tr_free(msgs->pex6);
2430             msgs->pex6 = diffs6.elements;
2431             msgs->pexCount6 = diffs6.elementCount;
2432 
2433             /* build the pex payload */
2434             tr_variantInitDict(&val, 3); /* ipv6 support: left as 3: speed vs. likelihood? */
2435 
2436             if (diffs.addedCount > 0)
2437             {
2438                 /* "added" */
2439                 tmp = walk = tr_new(uint8_t, diffs.addedCount * 6);
2440 
2441                 for (int i = 0; i < diffs.addedCount; ++i)
2442                 {
2443                     memcpy(walk, &diffs.added[i].addr.addr, 4);
2444                     walk += 4;
2445                     memcpy(walk, &diffs.added[i].port, 2);
2446                     walk += 2;
2447                 }
2448 
2449                 TR_ASSERT(walk - tmp == diffs.addedCount * 6);
2450                 tr_variantDictAddRaw(&val, TR_KEY_added, tmp, walk - tmp);
2451                 tr_free(tmp);
2452 
2453                 /* "added.f"
2454                  * unset each holepunch flag because we don't support it. */
2455                 tmp = walk = tr_new(uint8_t, diffs.addedCount);
2456 
2457                 for (int i = 0; i < diffs.addedCount; ++i)
2458                 {
2459                     *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2460                 }
2461 
2462                 TR_ASSERT(walk - tmp == diffs.addedCount);
2463                 tr_variantDictAddRaw(&val, TR_KEY_added_f, tmp, walk - tmp);
2464                 tr_free(tmp);
2465             }
2466 
2467             if (diffs.droppedCount > 0)
2468             {
2469                 /* "dropped" */
2470                 tmp = walk = tr_new(uint8_t, diffs.droppedCount * 6);
2471 
2472                 for (int i = 0; i < diffs.droppedCount; ++i)
2473                 {
2474                     memcpy(walk, &diffs.dropped[i].addr.addr, 4);
2475                     walk += 4;
2476                     memcpy(walk, &diffs.dropped[i].port, 2);
2477                     walk += 2;
2478                 }
2479 
2480                 TR_ASSERT(walk - tmp == diffs.droppedCount * 6);
2481                 tr_variantDictAddRaw(&val, TR_KEY_dropped, tmp, walk - tmp);
2482                 tr_free(tmp);
2483             }
2484 
2485             if (diffs6.addedCount > 0)
2486             {
2487                 /* "added6" */
2488                 tmp = walk = tr_new(uint8_t, diffs6.addedCount * 18);
2489 
2490                 for (int i = 0; i < diffs6.addedCount; ++i)
2491                 {
2492                     memcpy(walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16);
2493                     walk += 16;
2494                     memcpy(walk, &diffs6.added[i].port, 2);
2495                     walk += 2;
2496                 }
2497 
2498                 TR_ASSERT(walk - tmp == diffs6.addedCount * 18);
2499                 tr_variantDictAddRaw(&val, TR_KEY_added6, tmp, walk - tmp);
2500                 tr_free(tmp);
2501 
2502                 /* "added6.f"
2503                  * unset each holepunch flag because we don't support it. */
2504                 tmp = walk = tr_new(uint8_t, diffs6.addedCount);
2505 
2506                 for (int i = 0; i < diffs6.addedCount; ++i)
2507                 {
2508                     *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2509                 }
2510 
2511                 TR_ASSERT(walk - tmp == diffs6.addedCount);
2512                 tr_variantDictAddRaw(&val, TR_KEY_added6_f, tmp, walk - tmp);
2513                 tr_free(tmp);
2514             }
2515 
2516             if (diffs6.droppedCount > 0)
2517             {
2518                 /* "dropped6" */
2519                 tmp = walk = tr_new(uint8_t, diffs6.droppedCount * 18);
2520 
2521                 for (int i = 0; i < diffs6.droppedCount; ++i)
2522                 {
2523                     memcpy(walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16);
2524                     walk += 16;
2525                     memcpy(walk, &diffs6.dropped[i].port, 2);
2526                     walk += 2;
2527                 }
2528 
2529                 TR_ASSERT(walk - tmp == diffs6.droppedCount * 18);
2530                 tr_variantDictAddRaw(&val, TR_KEY_dropped6, tmp, walk - tmp);
2531                 tr_free(tmp);
2532             }
2533 
2534             /* write the pex message */
2535             payload = tr_variantToBuf(&val, TR_VARIANT_FMT_BENC);
2536             evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
2537             evbuffer_add_uint8(out, BT_LTEP);
2538             evbuffer_add_uint8(out, msgs->ut_pex_id);
2539             evbuffer_add_buffer(out, payload);
2540             pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
2541             dbgmsg(msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length(out));
2542             dbgOutMessageLen(msgs);
2543 
2544             evbuffer_free(payload);
2545             tr_variantFree(&val);
2546         }
2547 
2548         /* cleanup */
2549         tr_free(diffs.added);
2550         tr_free(diffs.dropped);
2551         tr_free(newPex);
2552         tr_free(diffs6.added);
2553         tr_free(diffs6.dropped);
2554         tr_free(newPex6);
2555 
2556         /* msgs->clientSentPexAt = tr_time(); */
2557     }
2558 }
2559 
pexPulse(evutil_socket_t foo UNUSED,short bar UNUSED,void * vmsgs)2560 static void pexPulse(evutil_socket_t foo UNUSED, short bar UNUSED, void* vmsgs)
2561 {
2562     struct tr_peerMsgs* msgs = vmsgs;
2563 
2564     sendPex(msgs);
2565 
2566     TR_ASSERT(msgs->pexTimer != NULL);
2567     tr_timerAdd(msgs->pexTimer, PEX_INTERVAL_SECS, 0);
2568 }
2569 
2570 /***
2571 ****  tr_peer virtual functions
2572 ***/
2573 
peermsgs_is_transferring_pieces(struct tr_peer const * peer,uint64_t now,tr_direction direction,unsigned int * setme_Bps)2574 static bool peermsgs_is_transferring_pieces(struct tr_peer const* peer, uint64_t now, tr_direction direction,
2575     unsigned int* setme_Bps)
2576 {
2577     unsigned int Bps = 0;
2578 
2579     if (tr_isPeerMsgs(peer))
2580     {
2581         tr_peerMsgs const* msgs = (tr_peerMsgs const*)peer;
2582         Bps = tr_peerIoGetPieceSpeed_Bps(msgs->io, now, direction);
2583     }
2584 
2585     if (setme_Bps != NULL)
2586     {
2587         *setme_Bps = Bps;
2588     }
2589 
2590     return Bps > 0;
2591 }
2592 
peermsgs_destruct(tr_peer * peer)2593 static void peermsgs_destruct(tr_peer* peer)
2594 {
2595     tr_peerMsgs* msgs = PEER_MSGS(peer);
2596 
2597     TR_ASSERT(msgs != NULL);
2598 
2599     tr_peerMsgsSetActive(msgs, TR_UP, false);
2600     tr_peerMsgsSetActive(msgs, TR_DOWN, false);
2601 
2602     if (msgs->pexTimer != NULL)
2603     {
2604         event_free(msgs->pexTimer);
2605     }
2606 
2607     if (msgs->incoming.block != NULL)
2608     {
2609         evbuffer_free(msgs->incoming.block);
2610     }
2611 
2612     if (msgs->io != NULL)
2613     {
2614         tr_peerIoClear(msgs->io);
2615         tr_peerIoUnref(msgs->io); /* balanced by the ref in handshakeDoneCB() */
2616     }
2617 
2618     evbuffer_free(msgs->outMessages);
2619     tr_free(msgs->pex6);
2620     tr_free(msgs->pex);
2621 
2622     tr_peerDestruct(&msgs->peer);
2623 
2624     memset(msgs, ~0, sizeof(tr_peerMsgs));
2625 }
2626 
2627 static struct tr_peer_virtual_funcs const my_funcs =
2628 {
2629     .destruct = peermsgs_destruct,
2630     .is_transferring_pieces = peermsgs_is_transferring_pieces
2631 };
2632 
2633 /***
2634 ****
2635 ***/
2636 
tr_peerMsgsGetConnectionAge(tr_peerMsgs const * msgs)2637 time_t tr_peerMsgsGetConnectionAge(tr_peerMsgs const* msgs)
2638 {
2639     TR_ASSERT(tr_isPeerMsgs(msgs));
2640 
2641     return tr_peerIoGetAge(msgs->io);
2642 }
2643 
tr_peerMsgsIsPeerChoked(tr_peerMsgs const * msgs)2644 bool tr_peerMsgsIsPeerChoked(tr_peerMsgs const* msgs)
2645 {
2646     TR_ASSERT(tr_isPeerMsgs(msgs));
2647 
2648     return msgs->peer_is_choked;
2649 }
2650 
tr_peerMsgsIsPeerInterested(tr_peerMsgs const * msgs)2651 bool tr_peerMsgsIsPeerInterested(tr_peerMsgs const* msgs)
2652 {
2653     TR_ASSERT(tr_isPeerMsgs(msgs));
2654 
2655     return msgs->peer_is_interested;
2656 }
2657 
tr_peerMsgsIsClientChoked(tr_peerMsgs const * msgs)2658 bool tr_peerMsgsIsClientChoked(tr_peerMsgs const* msgs)
2659 {
2660     TR_ASSERT(tr_isPeerMsgs(msgs));
2661 
2662     return msgs->client_is_choked;
2663 }
2664 
tr_peerMsgsIsClientInterested(tr_peerMsgs const * msgs)2665 bool tr_peerMsgsIsClientInterested(tr_peerMsgs const* msgs)
2666 {
2667     TR_ASSERT(tr_isPeerMsgs(msgs));
2668 
2669     return msgs->client_is_interested;
2670 }
2671 
tr_peerMsgsIsUtpConnection(tr_peerMsgs const * msgs)2672 bool tr_peerMsgsIsUtpConnection(tr_peerMsgs const* msgs)
2673 {
2674     TR_ASSERT(tr_isPeerMsgs(msgs));
2675 
2676     return msgs->io->socket.type == TR_PEER_SOCKET_TYPE_UTP;
2677 }
2678 
tr_peerMsgsIsEncrypted(tr_peerMsgs const * msgs)2679 bool tr_peerMsgsIsEncrypted(tr_peerMsgs const* msgs)
2680 {
2681     TR_ASSERT(tr_isPeerMsgs(msgs));
2682 
2683     return tr_peerIoIsEncrypted(msgs->io);
2684 }
2685 
tr_peerMsgsIsIncomingConnection(tr_peerMsgs const * msgs)2686 bool tr_peerMsgsIsIncomingConnection(tr_peerMsgs const* msgs)
2687 {
2688     TR_ASSERT(tr_isPeerMsgs(msgs));
2689 
2690     return tr_peerIoIsIncoming(msgs->io);
2691 }
2692 
2693 /***
2694 ****
2695 ***/
2696 
tr_isPeerMsgs(void const * msgs)2697 bool tr_isPeerMsgs(void const* msgs)
2698 {
2699     /* FIXME: this is pretty crude */
2700     return msgs != NULL && ((struct tr_peerMsgs*)msgs)->magic_number == MAGIC_NUMBER;
2701 }
2702 
tr_peerMsgsCast(void * vm)2703 tr_peerMsgs* tr_peerMsgsCast(void* vm)
2704 {
2705     return tr_isPeerMsgs(vm) ? vm : NULL;
2706 }
2707 
tr_peerMsgsNew(struct tr_torrent * torrent,struct tr_peerIo * io,tr_peer_callback callback,void * callbackData)2708 tr_peerMsgs* tr_peerMsgsNew(struct tr_torrent* torrent, struct tr_peerIo* io, tr_peer_callback callback, void* callbackData)
2709 {
2710     TR_ASSERT(io != NULL);
2711 
2712     tr_peerMsgs* m = tr_new0(tr_peerMsgs, 1);
2713 
2714     tr_peerConstruct(&m->peer, torrent);
2715     m->peer.funcs = &my_funcs;
2716 
2717     m->magic_number = MAGIC_NUMBER;
2718     m->client_is_choked = true;
2719     m->peer_is_choked = true;
2720     m->client_is_interested = false;
2721     m->peer_is_interested = false;
2722     m->is_active[TR_UP] = false;
2723     m->is_active[TR_DOWN] = false;
2724     m->callback = callback;
2725     m->callbackData = callbackData;
2726     m->io = io;
2727     m->torrent = torrent;
2728     m->state = AWAITING_BT_LENGTH;
2729     m->outMessages = evbuffer_new();
2730     m->outMessagesBatchedAt = 0;
2731     m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2732 
2733     if (tr_torrentAllowsPex(torrent))
2734     {
2735         m->pexTimer = evtimer_new(torrent->session->event_base, pexPulse, m);
2736         tr_timerAdd(m->pexTimer, PEX_INTERVAL_SECS, 0);
2737     }
2738 
2739     if (tr_peerIoSupportsUTP(m->io))
2740     {
2741         tr_address const* addr = tr_peerIoGetAddress(m->io, NULL);
2742         tr_peerMgrSetUtpSupported(torrent, addr);
2743         tr_peerMgrSetUtpFailed(torrent, addr, false);
2744     }
2745 
2746     if (tr_peerIoSupportsLTEP(m->io))
2747     {
2748         sendLtepHandshake(m);
2749     }
2750 
2751     tellPeerWhatWeHave(m);
2752 
2753     if (tr_dhtEnabled(torrent->session) && tr_peerIoSupportsDHT(m->io))
2754     {
2755         /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2756         struct tr_address const* addr = tr_peerIoGetAddress(m->io, NULL);
2757 
2758         if (addr->type == TR_AF_INET || tr_globalIPv6() != NULL)
2759         {
2760             protocolSendPort(m, tr_dhtPort(torrent->session));
2761         }
2762     }
2763 
2764     tr_peerIoSetIOFuncs(m->io, canRead, didWrite, gotError, m);
2765     updateDesiredRequestCount(m);
2766 
2767     return m;
2768 }
2769