1 /*
2 * This file Copyright (C) 2007-2014 Mnemosyne LLC
3 *
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
6 *
7 */
8
9 #include <errno.h>
10 #include <stdarg.h>
11 #include <stdlib.h>
12 #include <string.h>
13
14 #include <event2/buffer.h>
15 #include <event2/bufferevent.h>
16 #include <event2/event.h>
17
18 #include "transmission.h"
19 #include "cache.h"
20 #include "completion.h"
21 #include "file.h"
22 #include "log.h"
23 #include "peer-io.h"
24 #include "peer-mgr.h"
25 #include "peer-msgs.h"
26 #include "session.h"
27 #include "torrent.h"
28 #include "torrent-magnet.h"
29 #include "tr-assert.h"
30 #include "tr-dht.h"
31 #include "utils.h"
32 #include "variant.h"
33 #include "version.h"
34
35 #ifndef EBADMSG
36 #define EBADMSG EINVAL
37 #endif
38
39 /**
40 ***
41 **/
42
43 enum
44 {
45 BT_CHOKE = 0,
46 BT_UNCHOKE = 1,
47 BT_INTERESTED = 2,
48 BT_NOT_INTERESTED = 3,
49 BT_HAVE = 4,
50 BT_BITFIELD = 5,
51 BT_REQUEST = 6,
52 BT_PIECE = 7,
53 BT_CANCEL = 8,
54 BT_PORT = 9,
55 /* */
56 BT_FEXT_SUGGEST = 13,
57 BT_FEXT_HAVE_ALL = 14,
58 BT_FEXT_HAVE_NONE = 15,
59 BT_FEXT_REJECT = 16,
60 BT_FEXT_ALLOWED_FAST = 17,
61 /* */
62 BT_LTEP = 20,
63 /* */
64 LTEP_HANDSHAKE = 0,
65 /* */
66 UT_PEX_ID = 1,
67 UT_METADATA_ID = 3,
68 /* */
69 MAX_PEX_PEER_COUNT = 50,
70 /* */
71 MIN_CHOKE_PERIOD_SEC = 10,
72 /* idle seconds before we send a keepalive */
73 KEEPALIVE_INTERVAL_SECS = 100,
74 /* */
75 PEX_INTERVAL_SECS = 90, /* sec between sendPex() calls */
76 /* */
77 REQQ = 512,
78 /* */
79 METADATA_REQQ = 64,
80 /* */
81 MAGIC_NUMBER = 21549,
82 /* used in lowering the outMessages queue period */
83 IMMEDIATE_PRIORITY_INTERVAL_SECS = 0,
84 HIGH_PRIORITY_INTERVAL_SECS = 2,
85 LOW_PRIORITY_INTERVAL_SECS = 10,
86 /* number of pieces we'll allow in our fast set */
87 MAX_FAST_SET_SIZE = 3,
88 /* how many blocks to keep prefetched per peer */
89 PREFETCH_SIZE = 18,
90 /* when we're making requests from another peer,
91 batch them together to send enough requests to
92 meet our bandwidth goals for the next N seconds */
93 REQUEST_BUF_SECS = 10,
94 /* defined in BEP #9 */
95 METADATA_MSG_TYPE_REQUEST = 0,
96 METADATA_MSG_TYPE_DATA = 1,
97 METADATA_MSG_TYPE_REJECT = 2
98 };
99
100 enum
101 {
102 AWAITING_BT_LENGTH,
103 AWAITING_BT_ID,
104 AWAITING_BT_MESSAGE,
105 AWAITING_BT_PIECE
106 };
107
108 typedef enum
109 {
110 ENCRYPTION_PREFERENCE_UNKNOWN,
111 ENCRYPTION_PREFERENCE_YES,
112 ENCRYPTION_PREFERENCE_NO
113 }
114 encryption_preference_t;
115
116 /**
117 ***
118 **/
119
120 struct peer_request
121 {
122 uint32_t index;
123 uint32_t offset;
124 uint32_t length;
125 };
126
blockToReq(tr_torrent const * tor,tr_block_index_t block,struct peer_request * setme)127 static void blockToReq(tr_torrent const* tor, tr_block_index_t block, struct peer_request* setme)
128 {
129 tr_torrentGetBlockLocation(tor, block, &setme->index, &setme->offset, &setme->length);
130 }
131
132 /**
133 ***
134 **/
135
136 /* this is raw, unchanged data from the peer regarding
137 * the current message that it's sending us. */
138 struct tr_incoming
139 {
140 uint8_t id;
141 uint32_t length; /* includes the +1 for id length */
142 struct peer_request blockReq; /* metadata for incoming blocks */
143 struct evbuffer* block; /* piece data for incoming blocks */
144 };
145
146 /**
147 * Low-level communication state information about a connected peer.
148 *
149 * This structure remembers the low-level protocol states that we're
150 * in with this peer, such as active requests, pex messages, and so on.
151 * Its fields are all private to peer-msgs.c.
152 *
153 * Data not directly involved with sending & receiving messages is
154 * stored in tr_peer, where it can be accessed by both peermsgs and
155 * the peer manager.
156 *
157 * @see struct peer_atom
158 * @see tr_peer
159 */
160 struct tr_peerMsgs
161 {
162 struct tr_peer peer; /* parent */
163
164 uint16_t magic_number;
165
166 /* Whether or not we've choked this peer. */
167 bool peer_is_choked;
168
169 /* whether or not the peer has indicated it will download from us. */
170 bool peer_is_interested;
171
172 /* whether or not the peer is choking us. */
173 bool client_is_choked;
174
175 /* whether or not we've indicated to the peer that we would download from them if unchoked. */
176 bool client_is_interested;
177
178 bool peerSupportsPex;
179 bool peerSupportsMetadataXfer;
180 bool clientSentLtepHandshake;
181 bool peerSentLtepHandshake;
182
183 /*bool haveFastSet;*/
184
185 int desiredRequestCount;
186
187 int prefetchCount;
188
189 bool is_active[2];
190
191 /* how long the outMessages batch should be allowed to grow before
192 * it's flushed -- some messages (like requests >:) should be sent
193 * very quickly; others aren't as urgent. */
194 int8_t outMessagesBatchPeriod;
195
196 uint8_t state;
197 uint8_t ut_pex_id;
198 uint8_t ut_metadata_id;
199 uint16_t pexCount;
200 uint16_t pexCount6;
201
202 tr_port dht_port;
203
204 encryption_preference_t encryption_preference;
205
206 size_t metadata_size_hint;
207 #if 0
208 size_t fastsetSize;
209 tr_piece_index_t fastset[MAX_FAST_SET_SIZE];
210 #endif
211
212 tr_torrent* torrent;
213
214 tr_peer_callback callback;
215 void* callbackData;
216
217 struct evbuffer* outMessages; /* all the non-piece messages */
218
219 struct peer_request peerAskedFor[REQQ];
220
221 int peerAskedForMetadata[METADATA_REQQ];
222 int peerAskedForMetadataCount;
223
224 tr_pex* pex;
225 tr_pex* pex6;
226
227 /*time_t clientSentPexAt;*/
228 time_t clientSentAnythingAt;
229
230 time_t chokeChangedAt;
231
232 /* when we started batching the outMessages */
233 time_t outMessagesBatchedAt;
234
235 struct tr_incoming incoming;
236
237 /* if the peer supports the Extension Protocol in BEP 10 and
238 supplied a reqq argument, it's stored here. Otherwise, the
239 value is zero and should be ignored. */
240 int64_t reqq;
241
242 struct event* pexTimer;
243
244 struct tr_peerIo* io;
245 };
246
247 /**
248 ***
249 **/
250
getSession(struct tr_peerMsgs * msgs)251 static inline tr_session* getSession(struct tr_peerMsgs* msgs)
252 {
253 return msgs->torrent->session;
254 }
255
256 /**
257 ***
258 **/
259
260 static void myDebug(char const* file, int line, struct tr_peerMsgs const* msgs, char const* fmt, ...) TR_GNUC_PRINTF(4, 5);
261
myDebug(char const * file,int line,struct tr_peerMsgs const * msgs,char const * fmt,...)262 static void myDebug(char const* file, int line, struct tr_peerMsgs const* msgs, char const* fmt, ...)
263 {
264 tr_sys_file_t const fp = tr_logGetFile();
265
266 if (fp != TR_BAD_SYS_FILE)
267 {
268 va_list args;
269 char timestr[64];
270 struct evbuffer* buf = evbuffer_new();
271 char* base = tr_sys_path_basename(file, NULL);
272 char* message;
273
274 evbuffer_add_printf(buf, "[%s] %s - %s [%s]: ", tr_logGetTimeStr(timestr, sizeof(timestr)), tr_torrentName(
275 msgs->torrent), tr_peerIoGetAddrStr(msgs->io), tr_quark_get_string(msgs->peer.client, NULL));
276 va_start(args, fmt);
277 evbuffer_add_vprintf(buf, fmt, args);
278 va_end(args);
279 evbuffer_add_printf(buf, " (%s:%d)", base, line);
280
281 message = evbuffer_free_to_str(buf, NULL);
282 tr_sys_file_write_line(fp, message, NULL);
283
284 tr_free(base);
285 tr_free(message);
286 }
287 }
288
289 #define dbgmsg(msgs, ...) \
290 do \
291 { \
292 if (tr_logGetDeepEnabled()) \
293 { \
294 myDebug(__FILE__, __LINE__, msgs, __VA_ARGS__); \
295 } \
296 } \
297 while (0)
298
299 /**
300 ***
301 **/
302
pokeBatchPeriod(tr_peerMsgs * msgs,int interval)303 static void pokeBatchPeriod(tr_peerMsgs* msgs, int interval)
304 {
305 if (msgs->outMessagesBatchPeriod > interval)
306 {
307 msgs->outMessagesBatchPeriod = interval;
308 dbgmsg(msgs, "lowering batch interval to %d seconds", interval);
309 }
310 }
311
dbgOutMessageLen(tr_peerMsgs * msgs)312 static void dbgOutMessageLen(tr_peerMsgs* msgs)
313 {
314 dbgmsg(msgs, "outMessage size is now %zu", evbuffer_get_length(msgs->outMessages));
315 }
316
protocolSendReject(tr_peerMsgs * msgs,struct peer_request const * req)317 static void protocolSendReject(tr_peerMsgs* msgs, struct peer_request const* req)
318 {
319 TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));
320
321 struct evbuffer* out = msgs->outMessages;
322
323 evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t));
324 evbuffer_add_uint8(out, BT_FEXT_REJECT);
325 evbuffer_add_uint32(out, req->index);
326 evbuffer_add_uint32(out, req->offset);
327 evbuffer_add_uint32(out, req->length);
328
329 dbgmsg(msgs, "rejecting %u:%u->%u...", req->index, req->offset, req->length);
330 dbgOutMessageLen(msgs);
331 }
332
protocolSendRequest(tr_peerMsgs * msgs,struct peer_request const * req)333 static void protocolSendRequest(tr_peerMsgs* msgs, struct peer_request const* req)
334 {
335 struct evbuffer* out = msgs->outMessages;
336
337 evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t));
338 evbuffer_add_uint8(out, BT_REQUEST);
339 evbuffer_add_uint32(out, req->index);
340 evbuffer_add_uint32(out, req->offset);
341 evbuffer_add_uint32(out, req->length);
342
343 dbgmsg(msgs, "requesting %u:%u->%u...", req->index, req->offset, req->length);
344 dbgOutMessageLen(msgs);
345 pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
346 }
347
protocolSendCancel(tr_peerMsgs * msgs,struct peer_request const * req)348 static void protocolSendCancel(tr_peerMsgs* msgs, struct peer_request const* req)
349 {
350 struct evbuffer* out = msgs->outMessages;
351
352 evbuffer_add_uint32(out, sizeof(uint8_t) + 3 * sizeof(uint32_t));
353 evbuffer_add_uint8(out, BT_CANCEL);
354 evbuffer_add_uint32(out, req->index);
355 evbuffer_add_uint32(out, req->offset);
356 evbuffer_add_uint32(out, req->length);
357
358 dbgmsg(msgs, "cancelling %u:%u->%u...", req->index, req->offset, req->length);
359 dbgOutMessageLen(msgs);
360 pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
361 }
362
protocolSendPort(tr_peerMsgs * msgs,uint16_t port)363 static void protocolSendPort(tr_peerMsgs* msgs, uint16_t port)
364 {
365 struct evbuffer* out = msgs->outMessages;
366
367 dbgmsg(msgs, "sending Port %u", port);
368 evbuffer_add_uint32(out, 3);
369 evbuffer_add_uint8(out, BT_PORT);
370 evbuffer_add_uint16(out, port);
371 }
372
protocolSendHave(tr_peerMsgs * msgs,uint32_t index)373 static void protocolSendHave(tr_peerMsgs* msgs, uint32_t index)
374 {
375 struct evbuffer* out = msgs->outMessages;
376
377 evbuffer_add_uint32(out, sizeof(uint8_t) + sizeof(uint32_t));
378 evbuffer_add_uint8(out, BT_HAVE);
379 evbuffer_add_uint32(out, index);
380
381 dbgmsg(msgs, "sending Have %u", index);
382 dbgOutMessageLen(msgs);
383 pokeBatchPeriod(msgs, LOW_PRIORITY_INTERVAL_SECS);
384 }
385
386 #if 0
387
388 static void protocolSendAllowedFast(tr_peerMsgs* msgs, uint32_t pieceIndex)
389 {
390 TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));
391
392 tr_peerIo* io = msgs->io;
393 struct evbuffer* out = msgs->outMessages;
394
395 evbuffer_add_uint32(io, out, sizeof(uint8_t) + sizeof(uint32_t));
396 evbuffer_add_uint8(io, out, BT_FEXT_ALLOWED_FAST);
397 evbuffer_add_uint32(io, out, pieceIndex);
398
399 dbgmsg(msgs, "sending Allowed Fast %u...", pieceIndex);
400 dbgOutMessageLen(msgs);
401 }
402
403 #endif
404
protocolSendChoke(tr_peerMsgs * msgs,bool choke)405 static void protocolSendChoke(tr_peerMsgs* msgs, bool choke)
406 {
407 struct evbuffer* out = msgs->outMessages;
408
409 evbuffer_add_uint32(out, sizeof(uint8_t));
410 evbuffer_add_uint8(out, choke ? BT_CHOKE : BT_UNCHOKE);
411
412 dbgmsg(msgs, "sending %s...", choke ? "Choke" : "Unchoke");
413 dbgOutMessageLen(msgs);
414 pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
415 }
416
protocolSendHaveAll(tr_peerMsgs * msgs)417 static void protocolSendHaveAll(tr_peerMsgs* msgs)
418 {
419 TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));
420
421 struct evbuffer* out = msgs->outMessages;
422
423 evbuffer_add_uint32(out, sizeof(uint8_t));
424 evbuffer_add_uint8(out, BT_FEXT_HAVE_ALL);
425
426 dbgmsg(msgs, "sending HAVE_ALL...");
427 dbgOutMessageLen(msgs);
428 pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
429 }
430
protocolSendHaveNone(tr_peerMsgs * msgs)431 static void protocolSendHaveNone(tr_peerMsgs* msgs)
432 {
433 TR_ASSERT(tr_peerIoSupportsFEXT(msgs->io));
434
435 struct evbuffer* out = msgs->outMessages;
436
437 evbuffer_add_uint32(out, sizeof(uint8_t));
438 evbuffer_add_uint8(out, BT_FEXT_HAVE_NONE);
439
440 dbgmsg(msgs, "sending HAVE_NONE...");
441 dbgOutMessageLen(msgs);
442 pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
443 }
444
445 /**
446 *** EVENTS
447 **/
448
publish(tr_peerMsgs * msgs,tr_peer_event * e)449 static void publish(tr_peerMsgs* msgs, tr_peer_event* e)
450 {
451 if (msgs->callback != NULL)
452 {
453 (*msgs->callback)(&msgs->peer, e, msgs->callbackData);
454 }
455 }
456
fireError(tr_peerMsgs * msgs,int err)457 static void fireError(tr_peerMsgs* msgs, int err)
458 {
459 tr_peer_event e = TR_PEER_EVENT_INIT;
460 e.eventType = TR_PEER_ERROR;
461 e.err = err;
462 publish(msgs, &e);
463 }
464
fireGotBlock(tr_peerMsgs * msgs,struct peer_request const * req)465 static void fireGotBlock(tr_peerMsgs* msgs, struct peer_request const* req)
466 {
467 tr_peer_event e = TR_PEER_EVENT_INIT;
468 e.eventType = TR_PEER_CLIENT_GOT_BLOCK;
469 e.pieceIndex = req->index;
470 e.offset = req->offset;
471 e.length = req->length;
472 publish(msgs, &e);
473 }
474
fireGotRej(tr_peerMsgs * msgs,struct peer_request const * req)475 static void fireGotRej(tr_peerMsgs* msgs, struct peer_request const* req)
476 {
477 tr_peer_event e = TR_PEER_EVENT_INIT;
478 e.eventType = TR_PEER_CLIENT_GOT_REJ;
479 e.pieceIndex = req->index;
480 e.offset = req->offset;
481 e.length = req->length;
482 publish(msgs, &e);
483 }
484
fireGotChoke(tr_peerMsgs * msgs)485 static void fireGotChoke(tr_peerMsgs* msgs)
486 {
487 tr_peer_event e = TR_PEER_EVENT_INIT;
488 e.eventType = TR_PEER_CLIENT_GOT_CHOKE;
489 publish(msgs, &e);
490 }
491
fireClientGotHaveAll(tr_peerMsgs * msgs)492 static void fireClientGotHaveAll(tr_peerMsgs* msgs)
493 {
494 tr_peer_event e = TR_PEER_EVENT_INIT;
495 e.eventType = TR_PEER_CLIENT_GOT_HAVE_ALL;
496 publish(msgs, &e);
497 }
498
fireClientGotHaveNone(tr_peerMsgs * msgs)499 static void fireClientGotHaveNone(tr_peerMsgs* msgs)
500 {
501 tr_peer_event e = TR_PEER_EVENT_INIT;
502 e.eventType = TR_PEER_CLIENT_GOT_HAVE_NONE;
503 publish(msgs, &e);
504 }
505
fireClientGotPieceData(tr_peerMsgs * msgs,uint32_t length)506 static void fireClientGotPieceData(tr_peerMsgs* msgs, uint32_t length)
507 {
508 tr_peer_event e = TR_PEER_EVENT_INIT;
509 e.length = length;
510 e.eventType = TR_PEER_CLIENT_GOT_PIECE_DATA;
511 publish(msgs, &e);
512 }
513
firePeerGotPieceData(tr_peerMsgs * msgs,uint32_t length)514 static void firePeerGotPieceData(tr_peerMsgs* msgs, uint32_t length)
515 {
516 tr_peer_event e = TR_PEER_EVENT_INIT;
517 e.length = length;
518 e.eventType = TR_PEER_PEER_GOT_PIECE_DATA;
519 publish(msgs, &e);
520 }
521
fireClientGotSuggest(tr_peerMsgs * msgs,uint32_t pieceIndex)522 static void fireClientGotSuggest(tr_peerMsgs* msgs, uint32_t pieceIndex)
523 {
524 tr_peer_event e = TR_PEER_EVENT_INIT;
525 e.eventType = TR_PEER_CLIENT_GOT_SUGGEST;
526 e.pieceIndex = pieceIndex;
527 publish(msgs, &e);
528 }
529
fireClientGotPort(tr_peerMsgs * msgs,tr_port port)530 static void fireClientGotPort(tr_peerMsgs* msgs, tr_port port)
531 {
532 tr_peer_event e = TR_PEER_EVENT_INIT;
533 e.eventType = TR_PEER_CLIENT_GOT_PORT;
534 e.port = port;
535 publish(msgs, &e);
536 }
537
fireClientGotAllowedFast(tr_peerMsgs * msgs,uint32_t pieceIndex)538 static void fireClientGotAllowedFast(tr_peerMsgs* msgs, uint32_t pieceIndex)
539 {
540 tr_peer_event e = TR_PEER_EVENT_INIT;
541 e.eventType = TR_PEER_CLIENT_GOT_ALLOWED_FAST;
542 e.pieceIndex = pieceIndex;
543 publish(msgs, &e);
544 }
545
fireClientGotBitfield(tr_peerMsgs * msgs,tr_bitfield * bitfield)546 static void fireClientGotBitfield(tr_peerMsgs* msgs, tr_bitfield* bitfield)
547 {
548 tr_peer_event e = TR_PEER_EVENT_INIT;
549 e.eventType = TR_PEER_CLIENT_GOT_BITFIELD;
550 e.bitfield = bitfield;
551 publish(msgs, &e);
552 }
553
fireClientGotHave(tr_peerMsgs * msgs,tr_piece_index_t index)554 static void fireClientGotHave(tr_peerMsgs* msgs, tr_piece_index_t index)
555 {
556 tr_peer_event e = TR_PEER_EVENT_INIT;
557 e.eventType = TR_PEER_CLIENT_GOT_HAVE;
558 e.pieceIndex = index;
559 publish(msgs, &e);
560 }
561
562 /**
563 *** ALLOWED FAST SET
564 *** For explanation, see http://www.bittorrent.org/beps/bep_0006.html
565 **/
566
567 #if 0
568
569 size_t tr_generateAllowedSet(tr_piece_index_t* setmePieces, size_t desiredSetSize, size_t pieceCount, uint8_t const* infohash,
570 tr_address const* addr)
571 {
572 TR_ASSERT(setmePieces != NULL);
573 TR_ASSERT(desiredSetSize <= pieceCount);
574 TR_ASSERT(desiredSetSize != 0);
575 TR_ASSERT(pieceCount != 0);
576 TR_ASSERT(infohash != NULL);
577 TR_ASSERT(addr != NULL);
578
579 size_t setSize = 0;
580
581 if (addr->type == TR_AF_INET)
582 {
583 uint8_t w[SHA_DIGEST_LENGTH + 4];
584 uint8_t* walk = w;
585 uint8_t x[SHA_DIGEST_LENGTH];
586
587 uint32_t ui32 = ntohl(htonl(addr->addr.addr4.s_addr) & 0xffffff00); /* (1) */
588 memcpy(w, &ui32, sizeof(uint32_t));
589 walk += sizeof(uint32_t);
590 memcpy(walk, infohash, SHA_DIGEST_LENGTH); /* (2) */
591 walk += SHA_DIGEST_LENGTH;
592 tr_sha1(x, w, walk - w, NULL); /* (3) */
593 TR_ASSERT(sizeof(w) == walk - w);
594
595 while (setSize < desiredSetSize)
596 {
597 for (int i = 0; i < 5 && setSize < desiredSetSize; ++i) /* (4) */
598 {
599 uint32_t j = i * 4; /* (5) */
600 uint32_t y = ntohl(*(uint32_t*)(x + j)); /* (6) */
601 uint32_t index = y % pieceCount; /* (7) */
602 bool found = false;
603
604 for (size_t k = 0; !found && k < setSize; ++k) /* (8) */
605 {
606 found = setmePieces[k] == index;
607 }
608
609 if (!found)
610 {
611 setmePieces[setSize++] = index; /* (9) */
612 }
613 }
614
615 tr_sha1(x, x, sizeof(x), NULL); /* (3) */
616 }
617 }
618
619 return setSize;
620 }
621
622 static void updateFastSet(tr_peerMsgs* msgs UNUSED)
623 {
624 bool const fext = tr_peerIoSupportsFEXT(msgs->io);
625 bool const peerIsNeedy = msgs->peer->progress < 0.10;
626
627 if (fext && peerIsNeedy && !msgs->haveFastSet)
628 {
629 struct tr_address const* addr = tr_peerIoGetAddress(msgs->io, NULL);
630 tr_info const* inf = &msgs->torrent->info;
631 size_t const numwant = MIN(MAX_FAST_SET_SIZE, inf->pieceCount);
632
633 /* build the fast set */
634 msgs->fastsetSize = tr_generateAllowedSet(msgs->fastset, numwant, inf->pieceCount, inf->hash, addr);
635 msgs->haveFastSet = true;
636
637 /* send it to the peer */
638 for (size_t i = 0; i < msgs->fastsetSize; ++i)
639 {
640 protocolSendAllowedFast(msgs, msgs->fastset[i]);
641 }
642 }
643 }
644
645 #endif
646
647 /***
648 **** ACTIVE
649 ***/
650
tr_peerMsgsCalculateActive(tr_peerMsgs const * msgs,tr_direction direction)651 static bool tr_peerMsgsCalculateActive(tr_peerMsgs const* msgs, tr_direction direction)
652 {
653 TR_ASSERT(tr_isPeerMsgs(msgs));
654 TR_ASSERT(tr_isDirection(direction));
655
656 bool is_active;
657
658 if (direction == TR_CLIENT_TO_PEER)
659 {
660 is_active = tr_peerMsgsIsPeerInterested(msgs) && !tr_peerMsgsIsPeerChoked(msgs);
661
662 /* FIXME: https://trac.transmissionbt.com/ticket/5505
663 if (is_active)
664 {
665 TR_ASSERT(!tr_peerIsSeed(&msgs->peer));
666 }
667 */
668 }
669 else /* TR_PEER_TO_CLIENT */
670 {
671 if (!tr_torrentHasMetadata(msgs->torrent))
672 {
673 is_active = true;
674 }
675 else
676 {
677 is_active = tr_peerMsgsIsClientInterested(msgs) && !tr_peerMsgsIsClientChoked(msgs);
678
679 if (is_active)
680 {
681 TR_ASSERT(!tr_torrentIsSeed(msgs->torrent));
682 }
683 }
684 }
685
686 return is_active;
687 }
688
tr_peerMsgsIsActive(tr_peerMsgs const * msgs,tr_direction direction)689 bool tr_peerMsgsIsActive(tr_peerMsgs const* msgs, tr_direction direction)
690 {
691 TR_ASSERT(tr_isPeerMsgs(msgs));
692 TR_ASSERT(tr_isDirection(direction));
693
694 bool is_active = msgs->is_active[direction];
695
696 TR_ASSERT(is_active == tr_peerMsgsCalculateActive(msgs, direction));
697
698 return is_active;
699 }
700
tr_peerMsgsSetActive(tr_peerMsgs * msgs,tr_direction direction,bool is_active)701 static void tr_peerMsgsSetActive(tr_peerMsgs* msgs, tr_direction direction, bool is_active)
702 {
703 dbgmsg(msgs, "direction [%d] is_active [%d]", (int)direction, (int)is_active);
704
705 if (msgs->is_active[direction] != is_active)
706 {
707 msgs->is_active[direction] = is_active;
708
709 tr_swarmIncrementActivePeers(msgs->torrent->swarm, direction, is_active);
710 }
711 }
712
tr_peerMsgsUpdateActive(tr_peerMsgs * msgs,tr_direction direction)713 void tr_peerMsgsUpdateActive(tr_peerMsgs* msgs, tr_direction direction)
714 {
715 bool const is_active = tr_peerMsgsCalculateActive(msgs, direction);
716
717 tr_peerMsgsSetActive(msgs, direction, is_active);
718 }
719
720 /**
721 *** INTEREST
722 **/
723
sendInterest(tr_peerMsgs * msgs,bool b)724 static void sendInterest(tr_peerMsgs* msgs, bool b)
725 {
726 TR_ASSERT(msgs != NULL);
727
728 struct evbuffer* out = msgs->outMessages;
729
730 msgs->client_is_interested = b;
731 dbgmsg(msgs, "Sending %s", b ? "Interested" : "Not Interested");
732 evbuffer_add_uint32(out, sizeof(uint8_t));
733 evbuffer_add_uint8(out, b ? BT_INTERESTED : BT_NOT_INTERESTED);
734
735 pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
736 dbgOutMessageLen(msgs);
737 }
738
updateInterest(tr_peerMsgs * msgs UNUSED)739 static void updateInterest(tr_peerMsgs* msgs UNUSED)
740 {
741 /* FIXME -- might need to poke the mgr on startup */
742 }
743
tr_peerMsgsSetInterested(tr_peerMsgs * msgs,bool b)744 void tr_peerMsgsSetInterested(tr_peerMsgs* msgs, bool b)
745 {
746 if (msgs->client_is_interested != b)
747 {
748 sendInterest(msgs, b);
749
750 tr_peerMsgsUpdateActive(msgs, TR_PEER_TO_CLIENT);
751 }
752 }
753
popNextMetadataRequest(tr_peerMsgs * msgs,int * piece)754 static bool popNextMetadataRequest(tr_peerMsgs* msgs, int* piece)
755 {
756 if (msgs->peerAskedForMetadataCount == 0)
757 {
758 return false;
759 }
760
761 *piece = msgs->peerAskedForMetadata[0];
762
763 tr_removeElementFromArray(msgs->peerAskedForMetadata, 0, sizeof(int), msgs->peerAskedForMetadataCount);
764 --msgs->peerAskedForMetadataCount;
765
766 return true;
767 }
768
popNextRequest(tr_peerMsgs * msgs,struct peer_request * setme)769 static bool popNextRequest(tr_peerMsgs* msgs, struct peer_request* setme)
770 {
771 if (msgs->peer.pendingReqsToClient == 0)
772 {
773 return false;
774 }
775
776 *setme = msgs->peerAskedFor[0];
777
778 tr_removeElementFromArray(msgs->peerAskedFor, 0, sizeof(struct peer_request), msgs->peer.pendingReqsToClient);
779 --msgs->peer.pendingReqsToClient;
780
781 return true;
782 }
783
cancelAllRequestsToClient(tr_peerMsgs * msgs)784 static void cancelAllRequestsToClient(tr_peerMsgs* msgs)
785 {
786 struct peer_request req;
787 bool const mustSendCancel = tr_peerIoSupportsFEXT(msgs->io);
788
789 while (popNextRequest(msgs, &req))
790 {
791 if (mustSendCancel)
792 {
793 protocolSendReject(msgs, &req);
794 }
795 }
796 }
797
tr_peerMsgsSetChoke(tr_peerMsgs * msgs,bool peer_is_choked)798 void tr_peerMsgsSetChoke(tr_peerMsgs* msgs, bool peer_is_choked)
799 {
800 TR_ASSERT(msgs != NULL);
801
802 time_t const now = tr_time();
803 time_t const fibrillationTime = now - MIN_CHOKE_PERIOD_SEC;
804
805 if (msgs->chokeChangedAt > fibrillationTime)
806 {
807 dbgmsg(msgs, "Not changing choke to %d to avoid fibrillation", peer_is_choked);
808 }
809 else if (msgs->peer_is_choked != peer_is_choked)
810 {
811 msgs->peer_is_choked = peer_is_choked;
812
813 if (peer_is_choked)
814 {
815 cancelAllRequestsToClient(msgs);
816 }
817
818 protocolSendChoke(msgs, peer_is_choked);
819 msgs->chokeChangedAt = now;
820 tr_peerMsgsUpdateActive(msgs, TR_CLIENT_TO_PEER);
821 }
822 }
823
824 /**
825 ***
826 **/
827
tr_peerMsgsHave(tr_peerMsgs * msgs,uint32_t index)828 void tr_peerMsgsHave(tr_peerMsgs* msgs, uint32_t index)
829 {
830 protocolSendHave(msgs, index);
831
832 /* since we have more pieces now, we might not be interested in this peer */
833 updateInterest(msgs);
834 }
835
836 /**
837 ***
838 **/
839
reqIsValid(tr_peerMsgs const * peer,uint32_t index,uint32_t offset,uint32_t length)840 static bool reqIsValid(tr_peerMsgs const* peer, uint32_t index, uint32_t offset, uint32_t length)
841 {
842 return tr_torrentReqIsValid(peer->torrent, index, offset, length);
843 }
844
requestIsValid(tr_peerMsgs const * msgs,struct peer_request const * req)845 static bool requestIsValid(tr_peerMsgs const* msgs, struct peer_request const* req)
846 {
847 return reqIsValid(msgs, req->index, req->offset, req->length);
848 }
849
tr_peerMsgsCancel(tr_peerMsgs * msgs,tr_block_index_t block)850 void tr_peerMsgsCancel(tr_peerMsgs* msgs, tr_block_index_t block)
851 {
852 struct peer_request req;
853 // fprintf(stderr, "SENDING CANCEL MESSAGE FOR BLOCK %zu\n\t\tFROM PEER %p ------------------------------------\n",
854 // (size_t)block, msgs->peer);
855 blockToReq(msgs->torrent, block, &req);
856 protocolSendCancel(msgs, &req);
857 }
858
859 /**
860 ***
861 **/
862
sendLtepHandshake(tr_peerMsgs * msgs)863 static void sendLtepHandshake(tr_peerMsgs* msgs)
864 {
865 tr_variant val;
866 bool allow_pex;
867 bool allow_metadata_xfer;
868 struct evbuffer* payload;
869 struct evbuffer* out = msgs->outMessages;
870 unsigned char const* ipv6 = tr_globalIPv6();
871 static tr_quark version_quark = 0;
872
873 if (msgs->clientSentLtepHandshake)
874 {
875 return;
876 }
877
878 if (version_quark == 0)
879 {
880 version_quark = tr_quark_new(TR_NAME " " USERAGENT_PREFIX, TR_BAD_SIZE);
881 }
882
883 dbgmsg(msgs, "sending an ltep handshake");
884 msgs->clientSentLtepHandshake = true;
885
886 /* decide if we want to advertise metadata xfer support (BEP 9) */
887 if (tr_torrentIsPrivate(msgs->torrent))
888 {
889 allow_metadata_xfer = false;
890 }
891 else
892 {
893 allow_metadata_xfer = true;
894 }
895
896 /* decide if we want to advertise pex support */
897 if (!tr_torrentAllowsPex(msgs->torrent))
898 {
899 allow_pex = false;
900 }
901 else if (msgs->peerSentLtepHandshake)
902 {
903 allow_pex = msgs->peerSupportsPex;
904 }
905 else
906 {
907 allow_pex = true;
908 }
909
910 tr_variantInitDict(&val, 8);
911 tr_variantDictAddBool(&val, TR_KEY_e, getSession(msgs)->encryptionMode != TR_CLEAR_PREFERRED);
912
913 if (ipv6 != NULL)
914 {
915 tr_variantDictAddRaw(&val, TR_KEY_ipv6, ipv6, 16);
916 }
917
918 if (allow_metadata_xfer && tr_torrentHasMetadata(msgs->torrent) && msgs->torrent->infoDictLength > 0)
919 {
920 tr_variantDictAddInt(&val, TR_KEY_metadata_size, msgs->torrent->infoDictLength);
921 }
922
923 tr_variantDictAddInt(&val, TR_KEY_p, tr_sessionGetPublicPeerPort(getSession(msgs)));
924 tr_variantDictAddInt(&val, TR_KEY_reqq, REQQ);
925 tr_variantDictAddBool(&val, TR_KEY_upload_only, tr_torrentIsSeed(msgs->torrent));
926 tr_variantDictAddQuark(&val, TR_KEY_v, version_quark);
927
928 if (allow_metadata_xfer || allow_pex)
929 {
930 tr_variant* m = tr_variantDictAddDict(&val, TR_KEY_m, 2);
931
932 if (allow_metadata_xfer)
933 {
934 tr_variantDictAddInt(m, TR_KEY_ut_metadata, UT_METADATA_ID);
935 }
936
937 if (allow_pex)
938 {
939 tr_variantDictAddInt(m, TR_KEY_ut_pex, UT_PEX_ID);
940 }
941 }
942
943 payload = tr_variantToBuf(&val, TR_VARIANT_FMT_BENC);
944
945 evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
946 evbuffer_add_uint8(out, BT_LTEP);
947 evbuffer_add_uint8(out, LTEP_HANDSHAKE);
948 evbuffer_add_buffer(out, payload);
949 pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
950 dbgOutMessageLen(msgs);
951
952 /* cleanup */
953 evbuffer_free(payload);
954 tr_variantFree(&val);
955 }
956
parseLtepHandshake(tr_peerMsgs * msgs,uint32_t len,struct evbuffer * inbuf)957 static void parseLtepHandshake(tr_peerMsgs* msgs, uint32_t len, struct evbuffer* inbuf)
958 {
959 int64_t i;
960 tr_variant val;
961 tr_variant* sub;
962 uint8_t* tmp = tr_new(uint8_t, len);
963 uint8_t const* addr;
964 size_t addr_len;
965 tr_pex pex;
966 int8_t seedProbability = -1;
967
968 memset(&pex, 0, sizeof(tr_pex));
969
970 tr_peerIoReadBytes(msgs->io, inbuf, tmp, len);
971 msgs->peerSentLtepHandshake = true;
972
973 if (tr_variantFromBenc(&val, tmp, len) != 0 || !tr_variantIsDict(&val))
974 {
975 dbgmsg(msgs, "GET extended-handshake, couldn't get dictionary");
976 tr_free(tmp);
977 return;
978 }
979
980 /* arbitrary limit, should be more than enough */
981 if (len <= 4096)
982 {
983 dbgmsg(msgs, "here is the handshake: [%*.*s]", (int)len, (int)len, tmp);
984 }
985 else
986 {
987 dbgmsg(msgs, "handshake length is too big (%" PRIu32 "), printing skipped", len);
988 }
989
990 /* does the peer prefer encrypted connections? */
991 if (tr_variantDictFindInt(&val, TR_KEY_e, &i))
992 {
993 msgs->encryption_preference = i != 0 ? ENCRYPTION_PREFERENCE_YES : ENCRYPTION_PREFERENCE_NO;
994
995 if (i != 0)
996 {
997 pex.flags |= ADDED_F_ENCRYPTION_FLAG;
998 }
999 }
1000
1001 /* check supported messages for utorrent pex */
1002 msgs->peerSupportsPex = false;
1003 msgs->peerSupportsMetadataXfer = false;
1004
1005 if (tr_variantDictFindDict(&val, TR_KEY_m, &sub))
1006 {
1007 if (tr_variantDictFindInt(sub, TR_KEY_ut_pex, &i))
1008 {
1009 msgs->peerSupportsPex = i != 0;
1010 msgs->ut_pex_id = (uint8_t)i;
1011 dbgmsg(msgs, "msgs->ut_pex is %d", (int)msgs->ut_pex_id);
1012 }
1013
1014 if (tr_variantDictFindInt(sub, TR_KEY_ut_metadata, &i))
1015 {
1016 msgs->peerSupportsMetadataXfer = i != 0;
1017 msgs->ut_metadata_id = (uint8_t)i;
1018 dbgmsg(msgs, "msgs->ut_metadata_id is %d", (int)msgs->ut_metadata_id);
1019 }
1020
1021 if (tr_variantDictFindInt(sub, TR_KEY_ut_holepunch, &i))
1022 {
1023 /* Mysterious µTorrent extension that we don't grok. However,
1024 it implies support for µTP, so use it to indicate that. */
1025 tr_peerMgrSetUtpFailed(msgs->torrent, tr_peerIoGetAddress(msgs->io, NULL), false);
1026 }
1027 }
1028
1029 /* look for metainfo size (BEP 9) */
1030 if (tr_variantDictFindInt(&val, TR_KEY_metadata_size, &i))
1031 {
1032 if (tr_torrentSetMetadataSizeHint(msgs->torrent, i))
1033 {
1034 msgs->metadata_size_hint = (size_t)i;
1035 }
1036 }
1037
1038 /* look for upload_only (BEP 21) */
1039 if (tr_variantDictFindInt(&val, TR_KEY_upload_only, &i))
1040 {
1041 seedProbability = i == 0 ? 0 : 100;
1042 }
1043
1044 /* get peer's listening port */
1045 if (tr_variantDictFindInt(&val, TR_KEY_p, &i))
1046 {
1047 pex.port = htons((uint16_t)i);
1048 fireClientGotPort(msgs, pex.port);
1049 dbgmsg(msgs, "peer's port is now %d", (int)i);
1050 }
1051
1052 if (tr_peerIoIsIncoming(msgs->io) && tr_variantDictFindRaw(&val, TR_KEY_ipv4, &addr, &addr_len) && addr_len == 4)
1053 {
1054 pex.addr.type = TR_AF_INET;
1055 memcpy(&pex.addr.addr.addr4, addr, 4);
1056 tr_peerMgrAddPex(msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
1057 }
1058
1059 if (tr_peerIoIsIncoming(msgs->io) && tr_variantDictFindRaw(&val, TR_KEY_ipv6, &addr, &addr_len) && addr_len == 16)
1060 {
1061 pex.addr.type = TR_AF_INET6;
1062 memcpy(&pex.addr.addr.addr6, addr, 16);
1063 tr_peerMgrAddPex(msgs->torrent, TR_PEER_FROM_LTEP, &pex, seedProbability);
1064 }
1065
1066 /* get peer's maximum request queue size */
1067 if (tr_variantDictFindInt(&val, TR_KEY_reqq, &i))
1068 {
1069 msgs->reqq = i;
1070 }
1071
1072 tr_variantFree(&val);
1073 tr_free(tmp);
1074 }
1075
parseUtMetadata(tr_peerMsgs * msgs,uint32_t msglen,struct evbuffer * inbuf)1076 static void parseUtMetadata(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf)
1077 {
1078 tr_variant dict;
1079 char* msg_end;
1080 char const* benc_end;
1081 int64_t msg_type = -1;
1082 int64_t piece = -1;
1083 int64_t total_size = 0;
1084 uint8_t* tmp = tr_new(uint8_t, msglen);
1085
1086 tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen);
1087 msg_end = (char*)tmp + msglen;
1088
1089 if (tr_variantFromBencFull(&dict, tmp, msglen, NULL, &benc_end) == 0)
1090 {
1091 (void)tr_variantDictFindInt(&dict, TR_KEY_msg_type, &msg_type);
1092 (void)tr_variantDictFindInt(&dict, TR_KEY_piece, &piece);
1093 (void)tr_variantDictFindInt(&dict, TR_KEY_total_size, &total_size);
1094 tr_variantFree(&dict);
1095 }
1096
1097 dbgmsg(msgs, "got ut_metadata msg: type %d, piece %d, total_size %d", (int)msg_type, (int)piece, (int)total_size);
1098
1099 if (msg_type == METADATA_MSG_TYPE_REJECT)
1100 {
1101 /* NOOP */
1102 }
1103
1104 if (msg_type == METADATA_MSG_TYPE_DATA && !tr_torrentHasMetadata(msgs->torrent) &&
1105 msg_end - benc_end <= METADATA_PIECE_SIZE && piece * METADATA_PIECE_SIZE + (msg_end - benc_end) <= total_size)
1106 {
1107 int const pieceLen = msg_end - benc_end;
1108 tr_torrentSetMetadataPiece(msgs->torrent, piece, benc_end, pieceLen);
1109 }
1110
1111 if (msg_type == METADATA_MSG_TYPE_REQUEST)
1112 {
1113 if (piece >= 0 && tr_torrentHasMetadata(msgs->torrent) && !tr_torrentIsPrivate(msgs->torrent) &&
1114 msgs->peerAskedForMetadataCount < METADATA_REQQ)
1115 {
1116 msgs->peerAskedForMetadata[msgs->peerAskedForMetadataCount++] = piece;
1117 }
1118 else
1119 {
1120 tr_variant tmp;
1121 struct evbuffer* payload;
1122 struct evbuffer* out = msgs->outMessages;
1123
1124 /* build the rejection message */
1125 tr_variantInitDict(&tmp, 2);
1126 tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
1127 tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
1128 payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC);
1129
1130 /* write it out as a LTEP message to our outMessages buffer */
1131 evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
1132 evbuffer_add_uint8(out, BT_LTEP);
1133 evbuffer_add_uint8(out, msgs->ut_metadata_id);
1134 evbuffer_add_buffer(out, payload);
1135 pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
1136 dbgOutMessageLen(msgs);
1137
1138 /* cleanup */
1139 evbuffer_free(payload);
1140 tr_variantFree(&tmp);
1141 }
1142 }
1143
1144 tr_free(tmp);
1145 }
1146
parseUtPex(tr_peerMsgs * msgs,uint32_t msglen,struct evbuffer * inbuf)1147 static void parseUtPex(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf)
1148 {
1149 tr_torrent* tor = msgs->torrent;
1150 if (!tr_torrentAllowsPex(tor))
1151 {
1152 return;
1153 }
1154
1155 uint8_t* tmp = tr_new(uint8_t, msglen);
1156 tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen);
1157
1158 tr_variant val;
1159 bool const loaded = tr_variantFromBenc(&val, tmp, msglen) == 0;
1160
1161 tr_free(tmp);
1162
1163 if (!loaded)
1164 {
1165 return;
1166 }
1167
1168 uint8_t const* added;
1169 size_t added_len;
1170
1171 if (tr_variantDictFindRaw(&val, TR_KEY_added, &added, &added_len))
1172 {
1173 tr_pex* pex;
1174 size_t n;
1175 size_t added_f_len;
1176 uint8_t const* added_f;
1177
1178 if (!tr_variantDictFindRaw(&val, TR_KEY_added_f, &added_f, &added_f_len))
1179 {
1180 added_f_len = 0;
1181 added_f = NULL;
1182 }
1183
1184 pex = tr_peerMgrCompactToPex(added, added_len, added_f, added_f_len, &n);
1185
1186 n = MIN(n, MAX_PEX_PEER_COUNT);
1187
1188 for (size_t i = 0; i < n; ++i)
1189 {
1190 int seedProbability = -1;
1191
1192 if (i < added_f_len)
1193 {
1194 seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) != 0 ? 100 : 0;
1195 }
1196
1197 tr_peerMgrAddPex(tor, TR_PEER_FROM_PEX, pex + i, seedProbability);
1198 }
1199
1200 tr_free(pex);
1201 }
1202
1203 if (tr_variantDictFindRaw(&val, TR_KEY_added6, &added, &added_len))
1204 {
1205 tr_pex* pex;
1206 size_t n;
1207 size_t added_f_len;
1208 uint8_t const* added_f;
1209
1210 if (!tr_variantDictFindRaw(&val, TR_KEY_added6_f, &added_f, &added_f_len))
1211 {
1212 added_f_len = 0;
1213 added_f = NULL;
1214 }
1215
1216 pex = tr_peerMgrCompact6ToPex(added, added_len, added_f, added_f_len, &n);
1217
1218 n = MIN(n, MAX_PEX_PEER_COUNT);
1219
1220 for (size_t i = 0; i < n; ++i)
1221 {
1222 int seedProbability = -1;
1223
1224 if (i < added_f_len)
1225 {
1226 seedProbability = (added_f[i] & ADDED_F_SEED_FLAG) != 0 ? 100 : 0;
1227 }
1228
1229 tr_peerMgrAddPex(tor, TR_PEER_FROM_PEX, pex + i, seedProbability);
1230 }
1231
1232 tr_free(pex);
1233 }
1234
1235 tr_variantFree(&val);
1236 }
1237
1238 static void sendPex(tr_peerMsgs* msgs);
1239
parseLtep(tr_peerMsgs * msgs,uint32_t msglen,struct evbuffer * inbuf)1240 static void parseLtep(tr_peerMsgs* msgs, uint32_t msglen, struct evbuffer* inbuf)
1241 {
1242 TR_ASSERT(msglen > 0);
1243
1244 uint8_t ltep_msgid;
1245
1246 tr_peerIoReadUint8(msgs->io, inbuf, <ep_msgid);
1247 msglen--;
1248
1249 if (ltep_msgid == LTEP_HANDSHAKE)
1250 {
1251 dbgmsg(msgs, "got ltep handshake");
1252 parseLtepHandshake(msgs, msglen, inbuf);
1253
1254 if (tr_peerIoSupportsLTEP(msgs->io))
1255 {
1256 sendLtepHandshake(msgs);
1257 sendPex(msgs);
1258 }
1259 }
1260 else if (ltep_msgid == UT_PEX_ID)
1261 {
1262 dbgmsg(msgs, "got ut pex");
1263 msgs->peerSupportsPex = true;
1264 parseUtPex(msgs, msglen, inbuf);
1265 }
1266 else if (ltep_msgid == UT_METADATA_ID)
1267 {
1268 dbgmsg(msgs, "got ut metadata");
1269 msgs->peerSupportsMetadataXfer = true;
1270 parseUtMetadata(msgs, msglen, inbuf);
1271 }
1272 else
1273 {
1274 dbgmsg(msgs, "skipping unknown ltep message (%d)", (int)ltep_msgid);
1275 evbuffer_drain(inbuf, msglen);
1276 }
1277 }
1278
readBtLength(tr_peerMsgs * msgs,struct evbuffer * inbuf,size_t inlen)1279 static int readBtLength(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen)
1280 {
1281 uint32_t len;
1282
1283 if (inlen < sizeof(len))
1284 {
1285 return READ_LATER;
1286 }
1287
1288 tr_peerIoReadUint32(msgs->io, inbuf, &len);
1289
1290 if (len == 0) /* peer sent us a keepalive message */
1291 {
1292 dbgmsg(msgs, "got KeepAlive");
1293 }
1294 else
1295 {
1296 msgs->incoming.length = len;
1297 msgs->state = AWAITING_BT_ID;
1298 }
1299
1300 return READ_NOW;
1301 }
1302
1303 static int readBtMessage(tr_peerMsgs*, struct evbuffer*, size_t);
1304
readBtId(tr_peerMsgs * msgs,struct evbuffer * inbuf,size_t inlen)1305 static int readBtId(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen)
1306 {
1307 uint8_t id;
1308
1309 if (inlen < sizeof(uint8_t))
1310 {
1311 return READ_LATER;
1312 }
1313
1314 tr_peerIoReadUint8(msgs->io, inbuf, &id);
1315 msgs->incoming.id = id;
1316 dbgmsg(msgs, "msgs->incoming.id is now %d; msgs->incoming.length is %zu", id, (size_t)msgs->incoming.length);
1317
1318 if (id == BT_PIECE)
1319 {
1320 msgs->state = AWAITING_BT_PIECE;
1321 return READ_NOW;
1322 }
1323 else if (msgs->incoming.length != 1)
1324 {
1325 msgs->state = AWAITING_BT_MESSAGE;
1326 return READ_NOW;
1327 }
1328 else
1329 {
1330 return readBtMessage(msgs, inbuf, inlen - 1);
1331 }
1332 }
1333
updatePeerProgress(tr_peerMsgs * msgs)1334 static void updatePeerProgress(tr_peerMsgs* msgs)
1335 {
1336 tr_peerUpdateProgress(msgs->torrent, &msgs->peer);
1337
1338 /* updateFastSet(msgs); */
1339 updateInterest(msgs);
1340 }
1341
prefetchPieces(tr_peerMsgs * msgs)1342 static void prefetchPieces(tr_peerMsgs* msgs)
1343 {
1344 if (!getSession(msgs)->isPrefetchEnabled)
1345 {
1346 return;
1347 }
1348
1349 for (int i = msgs->prefetchCount; i < msgs->peer.pendingReqsToClient && i < PREFETCH_SIZE; ++i)
1350 {
1351 struct peer_request const* req = msgs->peerAskedFor + i;
1352
1353 if (requestIsValid(msgs, req))
1354 {
1355 tr_cachePrefetchBlock(getSession(msgs)->cache, msgs->torrent, req->index, req->offset, req->length);
1356 ++msgs->prefetchCount;
1357 }
1358 }
1359 }
1360
peerMadeRequest(tr_peerMsgs * msgs,struct peer_request const * req)1361 static void peerMadeRequest(tr_peerMsgs* msgs, struct peer_request const* req)
1362 {
1363 bool const fext = tr_peerIoSupportsFEXT(msgs->io);
1364 bool const reqIsValid = requestIsValid(msgs, req);
1365 bool const clientHasPiece = reqIsValid && tr_torrentPieceIsComplete(msgs->torrent, req->index);
1366 bool const peerIsChoked = msgs->peer_is_choked;
1367
1368 bool allow = false;
1369
1370 if (!reqIsValid)
1371 {
1372 dbgmsg(msgs, "rejecting an invalid request.");
1373 }
1374 else if (!clientHasPiece)
1375 {
1376 dbgmsg(msgs, "rejecting request for a piece we don't have.");
1377 }
1378 else if (peerIsChoked)
1379 {
1380 dbgmsg(msgs, "rejecting request from choked peer");
1381 }
1382 else if (msgs->peer.pendingReqsToClient + 1 >= REQQ)
1383 {
1384 dbgmsg(msgs, "rejecting request ... reqq is full");
1385 }
1386 else
1387 {
1388 allow = true;
1389 }
1390
1391 if (allow)
1392 {
1393 msgs->peerAskedFor[msgs->peer.pendingReqsToClient++] = *req;
1394 prefetchPieces(msgs);
1395 }
1396 else if (fext)
1397 {
1398 protocolSendReject(msgs, req);
1399 }
1400 }
1401
messageLengthIsCorrect(tr_peerMsgs const * msg,uint8_t id,uint32_t len)1402 static bool messageLengthIsCorrect(tr_peerMsgs const* msg, uint8_t id, uint32_t len)
1403 {
1404 switch (id)
1405 {
1406 case BT_CHOKE:
1407 case BT_UNCHOKE:
1408 case BT_INTERESTED:
1409 case BT_NOT_INTERESTED:
1410 case BT_FEXT_HAVE_ALL:
1411 case BT_FEXT_HAVE_NONE:
1412 return len == 1;
1413
1414 case BT_HAVE:
1415 case BT_FEXT_SUGGEST:
1416 case BT_FEXT_ALLOWED_FAST:
1417 return len == 5;
1418
1419 case BT_BITFIELD:
1420 if (tr_torrentHasMetadata(msg->torrent))
1421 {
1422 return len == (msg->torrent->info.pieceCount >> 3) + ((msg->torrent->info.pieceCount & 7) != 0 ? 1 : 0) + 1U;
1423 }
1424
1425 /* we don't know the piece count yet,
1426 so we can only guess whether to send true or false */
1427 if (msg->metadata_size_hint > 0)
1428 {
1429 return len <= msg->metadata_size_hint;
1430 }
1431
1432 return true;
1433
1434 case BT_REQUEST:
1435 case BT_CANCEL:
1436 case BT_FEXT_REJECT:
1437 return len == 13;
1438
1439 case BT_PIECE:
1440 return len > 9 && len <= 16393;
1441
1442 case BT_PORT:
1443 return len == 3;
1444
1445 case BT_LTEP:
1446 return len >= 2;
1447
1448 default:
1449 return false;
1450 }
1451 }
1452
1453 static int clientGotBlock(tr_peerMsgs* msgs, struct evbuffer* block, struct peer_request const* req);
1454
readBtPiece(tr_peerMsgs * msgs,struct evbuffer * inbuf,size_t inlen,size_t * setme_piece_bytes_read)1455 static int readBtPiece(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen, size_t* setme_piece_bytes_read)
1456 {
1457 TR_ASSERT(evbuffer_get_length(inbuf) >= inlen);
1458
1459 dbgmsg(msgs, "In readBtPiece");
1460
1461 struct peer_request* req = &msgs->incoming.blockReq;
1462
1463 if (req->length == 0)
1464 {
1465 if (inlen < 8)
1466 {
1467 return READ_LATER;
1468 }
1469
1470 tr_peerIoReadUint32(msgs->io, inbuf, &req->index);
1471 tr_peerIoReadUint32(msgs->io, inbuf, &req->offset);
1472 req->length = msgs->incoming.length - 9;
1473 dbgmsg(msgs, "got incoming block header %u:%u->%u", req->index, req->offset, req->length);
1474 return READ_NOW;
1475 }
1476 else
1477 {
1478 int err;
1479 size_t n;
1480 size_t nLeft;
1481 struct evbuffer* block_buffer;
1482
1483 if (msgs->incoming.block == NULL)
1484 {
1485 msgs->incoming.block = evbuffer_new();
1486 }
1487
1488 block_buffer = msgs->incoming.block;
1489
1490 /* read in another chunk of data */
1491 nLeft = req->length - evbuffer_get_length(block_buffer);
1492 n = MIN(nLeft, inlen);
1493
1494 tr_peerIoReadBytesToBuf(msgs->io, inbuf, block_buffer, n);
1495
1496 fireClientGotPieceData(msgs, n);
1497 *setme_piece_bytes_read += n;
1498 dbgmsg(msgs, "got %zu bytes for block %u:%u->%u ... %d remain", n, req->index, req->offset, req->length,
1499 (int)(req->length - evbuffer_get_length(block_buffer)));
1500
1501 if (evbuffer_get_length(block_buffer) < req->length)
1502 {
1503 return READ_LATER;
1504 }
1505
1506 /* pass the block along... */
1507 err = clientGotBlock(msgs, block_buffer, req);
1508 evbuffer_drain(block_buffer, evbuffer_get_length(block_buffer));
1509
1510 /* cleanup */
1511 req->length = 0;
1512 msgs->state = AWAITING_BT_LENGTH;
1513 return err != 0 ? READ_ERR : READ_NOW;
1514 }
1515 }
1516
1517 static void updateDesiredRequestCount(tr_peerMsgs* msgs);
1518
readBtMessage(tr_peerMsgs * msgs,struct evbuffer * inbuf,size_t inlen)1519 static int readBtMessage(tr_peerMsgs* msgs, struct evbuffer* inbuf, size_t inlen)
1520 {
1521 uint8_t const id = msgs->incoming.id;
1522 #ifdef TR_ENABLE_ASSERTS
1523 size_t const startBufLen = evbuffer_get_length(inbuf);
1524 #endif
1525 bool const fext = tr_peerIoSupportsFEXT(msgs->io);
1526
1527 uint32_t ui32;
1528 uint32_t msglen = msgs->incoming.length;
1529
1530 TR_ASSERT(msglen > 0);
1531
1532 --msglen; /* id length */
1533
1534 dbgmsg(msgs, "got BT id %d, len %d, buffer size is %zu", (int)id, (int)msglen, inlen);
1535
1536 if (inlen < msglen)
1537 {
1538 return READ_LATER;
1539 }
1540
1541 if (!messageLengthIsCorrect(msgs, id, msglen + 1))
1542 {
1543 dbgmsg(msgs, "bad packet - BT message #%d with a length of %d", (int)id, (int)msglen);
1544 fireError(msgs, EMSGSIZE);
1545 return READ_ERR;
1546 }
1547
1548 switch (id)
1549 {
1550 case BT_CHOKE:
1551 dbgmsg(msgs, "got Choke");
1552 msgs->client_is_choked = true;
1553
1554 if (!fext)
1555 {
1556 fireGotChoke(msgs);
1557 }
1558
1559 tr_peerMsgsUpdateActive(msgs, TR_PEER_TO_CLIENT);
1560 break;
1561
1562 case BT_UNCHOKE:
1563 dbgmsg(msgs, "got Unchoke");
1564 msgs->client_is_choked = false;
1565 tr_peerMsgsUpdateActive(msgs, TR_PEER_TO_CLIENT);
1566 updateDesiredRequestCount(msgs);
1567 break;
1568
1569 case BT_INTERESTED:
1570 dbgmsg(msgs, "got Interested");
1571 msgs->peer_is_interested = true;
1572 tr_peerMsgsUpdateActive(msgs, TR_CLIENT_TO_PEER);
1573 break;
1574
1575 case BT_NOT_INTERESTED:
1576 dbgmsg(msgs, "got Not Interested");
1577 msgs->peer_is_interested = false;
1578 tr_peerMsgsUpdateActive(msgs, TR_CLIENT_TO_PEER);
1579 break;
1580
1581 case BT_HAVE:
1582 tr_peerIoReadUint32(msgs->io, inbuf, &ui32);
1583 dbgmsg(msgs, "got Have: %u", ui32);
1584
1585 if (tr_torrentHasMetadata(msgs->torrent) && ui32 >= msgs->torrent->info.pieceCount)
1586 {
1587 fireError(msgs, ERANGE);
1588 return READ_ERR;
1589 }
1590
1591 /* a peer can send the same HAVE message twice... */
1592 if (!tr_bitfieldHas(&msgs->peer.have, ui32))
1593 {
1594 tr_bitfieldAdd(&msgs->peer.have, ui32);
1595 fireClientGotHave(msgs, ui32);
1596 }
1597
1598 updatePeerProgress(msgs);
1599 break;
1600
1601 case BT_BITFIELD:
1602 {
1603 uint8_t* tmp = tr_new(uint8_t, msglen);
1604 dbgmsg(msgs, "got a bitfield");
1605 tr_peerIoReadBytes(msgs->io, inbuf, tmp, msglen);
1606 tr_bitfieldSetRaw(&msgs->peer.have, tmp, msglen, tr_torrentHasMetadata(msgs->torrent));
1607 fireClientGotBitfield(msgs, &msgs->peer.have);
1608 updatePeerProgress(msgs);
1609 tr_free(tmp);
1610 break;
1611 }
1612
1613 case BT_REQUEST:
1614 {
1615 struct peer_request r;
1616 tr_peerIoReadUint32(msgs->io, inbuf, &r.index);
1617 tr_peerIoReadUint32(msgs->io, inbuf, &r.offset);
1618 tr_peerIoReadUint32(msgs->io, inbuf, &r.length);
1619 dbgmsg(msgs, "got Request: %u:%u->%u", r.index, r.offset, r.length);
1620 peerMadeRequest(msgs, &r);
1621 break;
1622 }
1623
1624 case BT_CANCEL:
1625 {
1626 struct peer_request r;
1627 tr_peerIoReadUint32(msgs->io, inbuf, &r.index);
1628 tr_peerIoReadUint32(msgs->io, inbuf, &r.offset);
1629 tr_peerIoReadUint32(msgs->io, inbuf, &r.length);
1630 tr_historyAdd(&msgs->peer.cancelsSentToClient, tr_time(), 1);
1631 dbgmsg(msgs, "got a Cancel %u:%u->%u", r.index, r.offset, r.length);
1632
1633 for (int i = 0; i < msgs->peer.pendingReqsToClient; ++i)
1634 {
1635 struct peer_request const* req = msgs->peerAskedFor + i;
1636
1637 if (req->index == r.index && req->offset == r.offset && req->length == r.length)
1638 {
1639 tr_removeElementFromArray(msgs->peerAskedFor, i, sizeof(struct peer_request),
1640 msgs->peer.pendingReqsToClient);
1641 --msgs->peer.pendingReqsToClient;
1642 break;
1643 }
1644 }
1645
1646 break;
1647 }
1648
1649 case BT_PIECE:
1650 TR_ASSERT(false); /* handled elsewhere! */
1651 break;
1652
1653 case BT_PORT:
1654 dbgmsg(msgs, "Got a BT_PORT");
1655 tr_peerIoReadUint16(msgs->io, inbuf, &msgs->dht_port);
1656
1657 if (msgs->dht_port > 0)
1658 {
1659 tr_dhtAddNode(getSession(msgs), tr_peerAddress(&msgs->peer), msgs->dht_port, false);
1660 }
1661
1662 break;
1663
1664 case BT_FEXT_SUGGEST:
1665 dbgmsg(msgs, "Got a BT_FEXT_SUGGEST");
1666 tr_peerIoReadUint32(msgs->io, inbuf, &ui32);
1667
1668 if (fext)
1669 {
1670 fireClientGotSuggest(msgs, ui32);
1671 }
1672 else
1673 {
1674 fireError(msgs, EMSGSIZE);
1675 return READ_ERR;
1676 }
1677
1678 break;
1679
1680 case BT_FEXT_ALLOWED_FAST:
1681 dbgmsg(msgs, "Got a BT_FEXT_ALLOWED_FAST");
1682 tr_peerIoReadUint32(msgs->io, inbuf, &ui32);
1683
1684 if (fext)
1685 {
1686 fireClientGotAllowedFast(msgs, ui32);
1687 }
1688 else
1689 {
1690 fireError(msgs, EMSGSIZE);
1691 return READ_ERR;
1692 }
1693
1694 break;
1695
1696 case BT_FEXT_HAVE_ALL:
1697 dbgmsg(msgs, "Got a BT_FEXT_HAVE_ALL");
1698
1699 if (fext)
1700 {
1701 tr_bitfieldSetHasAll(&msgs->peer.have);
1702 TR_ASSERT(tr_bitfieldHasAll(&msgs->peer.have));
1703 fireClientGotHaveAll(msgs);
1704 updatePeerProgress(msgs);
1705 }
1706 else
1707 {
1708 fireError(msgs, EMSGSIZE);
1709 return READ_ERR;
1710 }
1711
1712 break;
1713
1714 case BT_FEXT_HAVE_NONE:
1715 dbgmsg(msgs, "Got a BT_FEXT_HAVE_NONE");
1716
1717 if (fext)
1718 {
1719 tr_bitfieldSetHasNone(&msgs->peer.have);
1720 fireClientGotHaveNone(msgs);
1721 updatePeerProgress(msgs);
1722 }
1723 else
1724 {
1725 fireError(msgs, EMSGSIZE);
1726 return READ_ERR;
1727 }
1728
1729 break;
1730
1731 case BT_FEXT_REJECT:
1732 {
1733 struct peer_request r;
1734 dbgmsg(msgs, "Got a BT_FEXT_REJECT");
1735 tr_peerIoReadUint32(msgs->io, inbuf, &r.index);
1736 tr_peerIoReadUint32(msgs->io, inbuf, &r.offset);
1737 tr_peerIoReadUint32(msgs->io, inbuf, &r.length);
1738
1739 if (fext)
1740 {
1741 fireGotRej(msgs, &r);
1742 }
1743 else
1744 {
1745 fireError(msgs, EMSGSIZE);
1746 return READ_ERR;
1747 }
1748
1749 break;
1750 }
1751
1752 case BT_LTEP:
1753 dbgmsg(msgs, "Got a BT_LTEP");
1754 parseLtep(msgs, msglen, inbuf);
1755 break;
1756
1757 default:
1758 dbgmsg(msgs, "peer sent us an UNKNOWN: %d", (int)id);
1759 tr_peerIoDrain(msgs->io, inbuf, msglen);
1760 break;
1761 }
1762
1763 TR_ASSERT(msglen + 1 == msgs->incoming.length);
1764 TR_ASSERT(evbuffer_get_length(inbuf) == startBufLen - msglen);
1765
1766 msgs->state = AWAITING_BT_LENGTH;
1767 return READ_NOW;
1768 }
1769
1770 /* returns 0 on success, or an errno on failure */
clientGotBlock(tr_peerMsgs * msgs,struct evbuffer * data,struct peer_request const * req)1771 static int clientGotBlock(tr_peerMsgs* msgs, struct evbuffer* data, struct peer_request const* req)
1772 {
1773 TR_ASSERT(msgs != NULL);
1774 TR_ASSERT(req != NULL);
1775
1776 int err;
1777 tr_torrent* tor = msgs->torrent;
1778 tr_block_index_t const block = _tr_block(tor, req->index, req->offset);
1779
1780 if (!requestIsValid(msgs, req))
1781 {
1782 dbgmsg(msgs, "dropping invalid block %u:%u->%u", req->index, req->offset, req->length);
1783 return EBADMSG;
1784 }
1785
1786 if (req->length != tr_torBlockCountBytes(msgs->torrent, block))
1787 {
1788 dbgmsg(msgs, "wrong block size -- expected %u, got %d", tr_torBlockCountBytes(msgs->torrent, block), req->length);
1789 return EMSGSIZE;
1790 }
1791
1792 dbgmsg(msgs, "got block %u:%u->%u", req->index, req->offset, req->length);
1793
1794 if (!tr_peerMgrDidPeerRequest(msgs->torrent, &msgs->peer, block))
1795 {
1796 dbgmsg(msgs, "we didn't ask for this message...");
1797 return 0;
1798 }
1799
1800 if (tr_torrentPieceIsComplete(msgs->torrent, req->index))
1801 {
1802 dbgmsg(msgs, "we did ask for this message, but the piece is already complete...");
1803 return 0;
1804 }
1805
1806 /**
1807 *** Save the block
1808 **/
1809
1810 if ((err = tr_cacheWriteBlock(getSession(msgs)->cache, tor, req->index, req->offset, req->length, data)) != 0)
1811 {
1812 return err;
1813 }
1814
1815 tr_bitfieldAdd(&msgs->peer.blame, req->index);
1816 fireGotBlock(msgs, req);
1817 return 0;
1818 }
1819
1820 static void peerPulse(void* vmsgs);
1821
didWrite(tr_peerIo * io,size_t bytesWritten,bool wasPieceData,void * vmsgs)1822 static void didWrite(tr_peerIo* io, size_t bytesWritten, bool wasPieceData, void* vmsgs)
1823 {
1824 tr_peerMsgs* msgs = vmsgs;
1825
1826 if (wasPieceData)
1827 {
1828 firePeerGotPieceData(msgs, bytesWritten);
1829 }
1830
1831 if (tr_isPeerIo(io) && io->userData != NULL)
1832 {
1833 peerPulse(msgs);
1834 }
1835 }
1836
canRead(tr_peerIo * io,void * vmsgs,size_t * piece)1837 static ReadState canRead(tr_peerIo* io, void* vmsgs, size_t* piece)
1838 {
1839 ReadState ret;
1840 tr_peerMsgs* msgs = vmsgs;
1841 struct evbuffer* in = tr_peerIoGetReadBuffer(io);
1842 size_t const inlen = evbuffer_get_length(in);
1843
1844 dbgmsg(msgs, "canRead: inlen is %zu, msgs->state is %d", inlen, msgs->state);
1845
1846 if (inlen == 0)
1847 {
1848 ret = READ_LATER;
1849 }
1850 else if (msgs->state == AWAITING_BT_PIECE)
1851 {
1852 ret = readBtPiece(msgs, in, inlen, piece);
1853 }
1854 else
1855 {
1856 switch (msgs->state)
1857 {
1858 case AWAITING_BT_LENGTH:
1859 ret = readBtLength(msgs, in, inlen);
1860 break;
1861
1862 case AWAITING_BT_ID:
1863 ret = readBtId(msgs, in, inlen);
1864 break;
1865
1866 case AWAITING_BT_MESSAGE:
1867 ret = readBtMessage(msgs, in, inlen);
1868 break;
1869
1870 default:
1871 ret = READ_ERR;
1872 TR_ASSERT_MSG(false, "unhandled peer messages state %d", (int)msgs->state);
1873 }
1874 }
1875
1876 dbgmsg(msgs, "canRead: ret is %d", (int)ret);
1877
1878 return ret;
1879 }
1880
tr_peerMsgsIsReadingBlock(tr_peerMsgs const * msgs,tr_block_index_t block)1881 bool tr_peerMsgsIsReadingBlock(tr_peerMsgs const* msgs, tr_block_index_t block)
1882 {
1883 if (msgs->state != AWAITING_BT_PIECE)
1884 {
1885 return false;
1886 }
1887
1888 return block == _tr_block(msgs->torrent, msgs->incoming.blockReq.index, msgs->incoming.blockReq.offset);
1889 }
1890
1891 /**
1892 ***
1893 **/
1894
updateDesiredRequestCount(tr_peerMsgs * msgs)1895 static void updateDesiredRequestCount(tr_peerMsgs* msgs)
1896 {
1897 tr_torrent* const torrent = msgs->torrent;
1898
1899 /* there are lots of reasons we might not want to request any blocks... */
1900 if (tr_torrentIsSeed(torrent) || !tr_torrentHasMetadata(torrent) || msgs->client_is_choked || !msgs->client_is_interested)
1901 {
1902 msgs->desiredRequestCount = 0;
1903 }
1904 else
1905 {
1906 int estimatedBlocksInPeriod;
1907 unsigned int rate_Bps;
1908 unsigned int irate_Bps;
1909 int const floor = 4;
1910 int const seconds = REQUEST_BUF_SECS;
1911 uint64_t const now = tr_time_msec();
1912
1913 /* Get the rate limit we should use.
1914 * FIXME: this needs to consider all the other peers as well... */
1915 rate_Bps = tr_peerGetPieceSpeed_Bps(&msgs->peer, now, TR_PEER_TO_CLIENT);
1916
1917 if (tr_torrentUsesSpeedLimit(torrent, TR_PEER_TO_CLIENT))
1918 {
1919 rate_Bps = MIN(rate_Bps, tr_torrentGetSpeedLimit_Bps(torrent, TR_PEER_TO_CLIENT));
1920 }
1921
1922 /* honor the session limits, if enabled */
1923 if (tr_torrentUsesSessionLimits(torrent) &&
1924 tr_sessionGetActiveSpeedLimit_Bps(torrent->session, TR_PEER_TO_CLIENT, &irate_Bps))
1925 {
1926 rate_Bps = MIN(rate_Bps, irate_Bps);
1927 }
1928
1929 /* use this desired rate to figure out how
1930 * many requests we should send to this peer */
1931 estimatedBlocksInPeriod = (rate_Bps * seconds) / torrent->blockSize;
1932 msgs->desiredRequestCount = MAX(floor, estimatedBlocksInPeriod);
1933
1934 /* honor the peer's maximum request count, if specified */
1935 if (msgs->reqq > 0)
1936 {
1937 if (msgs->desiredRequestCount > msgs->reqq)
1938 {
1939 msgs->desiredRequestCount = msgs->reqq;
1940 }
1941 }
1942 }
1943 }
1944
updateMetadataRequests(tr_peerMsgs * msgs,time_t now)1945 static void updateMetadataRequests(tr_peerMsgs* msgs, time_t now)
1946 {
1947 int piece;
1948
1949 if (msgs->peerSupportsMetadataXfer && tr_torrentGetNextMetadataRequest(msgs->torrent, now, &piece))
1950 {
1951 tr_variant tmp;
1952 struct evbuffer* payload;
1953 struct evbuffer* out = msgs->outMessages;
1954
1955 /* build the data message */
1956 tr_variantInitDict(&tmp, 3);
1957 tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REQUEST);
1958 tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
1959 payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC);
1960
1961 dbgmsg(msgs, "requesting metadata piece #%d", piece);
1962
1963 /* write it out as a LTEP message to our outMessages buffer */
1964 evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
1965 evbuffer_add_uint8(out, BT_LTEP);
1966 evbuffer_add_uint8(out, msgs->ut_metadata_id);
1967 evbuffer_add_buffer(out, payload);
1968 pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
1969 dbgOutMessageLen(msgs);
1970
1971 /* cleanup */
1972 evbuffer_free(payload);
1973 tr_variantFree(&tmp);
1974 }
1975 }
1976
updateBlockRequests(tr_peerMsgs * msgs)1977 static void updateBlockRequests(tr_peerMsgs* msgs)
1978 {
1979 if (tr_torrentIsPieceTransferAllowed(msgs->torrent, TR_PEER_TO_CLIENT) && msgs->desiredRequestCount > 0 &&
1980 msgs->peer.pendingReqsToPeer <= msgs->desiredRequestCount * 0.66)
1981 {
1982 TR_ASSERT(tr_peerMsgsIsClientInterested(msgs));
1983 TR_ASSERT(!tr_peerMsgsIsClientChoked(msgs));
1984
1985 int n;
1986 tr_block_index_t* blocks;
1987 int const numwant = msgs->desiredRequestCount - msgs->peer.pendingReqsToPeer;
1988
1989 blocks = tr_new(tr_block_index_t, numwant);
1990 tr_peerMgrGetNextRequests(msgs->torrent, &msgs->peer, numwant, blocks, &n, false);
1991
1992 for (int i = 0; i < n; ++i)
1993 {
1994 struct peer_request req;
1995 blockToReq(msgs->torrent, blocks[i], &req);
1996 protocolSendRequest(msgs, &req);
1997 }
1998
1999 tr_free(blocks);
2000 }
2001 }
2002
fillOutputBuffer(tr_peerMsgs * msgs,time_t now)2003 static size_t fillOutputBuffer(tr_peerMsgs* msgs, time_t now)
2004 {
2005 int piece;
2006 size_t bytesWritten = 0;
2007 struct peer_request req;
2008 bool const haveMessages = evbuffer_get_length(msgs->outMessages) != 0;
2009 bool const fext = tr_peerIoSupportsFEXT(msgs->io);
2010
2011 /**
2012 *** Protocol messages
2013 **/
2014
2015 if (haveMessages && msgs->outMessagesBatchedAt == 0) /* fresh batch */
2016 {
2017 dbgmsg(msgs, "started an outMessages batch (length is %zu)", evbuffer_get_length(msgs->outMessages));
2018 msgs->outMessagesBatchedAt = now;
2019 }
2020 else if (haveMessages && now - msgs->outMessagesBatchedAt >= msgs->outMessagesBatchPeriod)
2021 {
2022 size_t const len = evbuffer_get_length(msgs->outMessages);
2023 /* flush the protocol messages */
2024 dbgmsg(msgs, "flushing outMessages... to %p (length is %zu)", (void*)msgs->io, len);
2025 tr_peerIoWriteBuf(msgs->io, msgs->outMessages, false);
2026 msgs->clientSentAnythingAt = now;
2027 msgs->outMessagesBatchedAt = 0;
2028 msgs->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2029 bytesWritten += len;
2030 }
2031
2032 /**
2033 *** Metadata Pieces
2034 **/
2035
2036 if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= METADATA_PIECE_SIZE && popNextMetadataRequest(msgs, &piece))
2037 {
2038 char* data;
2039 size_t dataLen;
2040 bool ok = false;
2041
2042 data = tr_torrentGetMetadataPiece(msgs->torrent, piece, &dataLen);
2043
2044 if (data != NULL)
2045 {
2046 tr_variant tmp;
2047 struct evbuffer* payload;
2048 struct evbuffer* out = msgs->outMessages;
2049
2050 /* build the data message */
2051 tr_variantInitDict(&tmp, 3);
2052 tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_DATA);
2053 tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
2054 tr_variantDictAddInt(&tmp, TR_KEY_total_size, msgs->torrent->infoDictLength);
2055 payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC);
2056
2057 /* write it out as a LTEP message to our outMessages buffer */
2058 evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload) + dataLen);
2059 evbuffer_add_uint8(out, BT_LTEP);
2060 evbuffer_add_uint8(out, msgs->ut_metadata_id);
2061 evbuffer_add_buffer(out, payload);
2062 evbuffer_add(out, data, dataLen);
2063 pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
2064 dbgOutMessageLen(msgs);
2065
2066 evbuffer_free(payload);
2067 tr_variantFree(&tmp);
2068 tr_free(data);
2069
2070 ok = true;
2071 }
2072
2073 if (!ok) /* send a rejection message */
2074 {
2075 tr_variant tmp;
2076 struct evbuffer* payload;
2077 struct evbuffer* out = msgs->outMessages;
2078
2079 /* build the rejection message */
2080 tr_variantInitDict(&tmp, 2);
2081 tr_variantDictAddInt(&tmp, TR_KEY_msg_type, METADATA_MSG_TYPE_REJECT);
2082 tr_variantDictAddInt(&tmp, TR_KEY_piece, piece);
2083 payload = tr_variantToBuf(&tmp, TR_VARIANT_FMT_BENC);
2084
2085 /* write it out as a LTEP message to our outMessages buffer */
2086 evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
2087 evbuffer_add_uint8(out, BT_LTEP);
2088 evbuffer_add_uint8(out, msgs->ut_metadata_id);
2089 evbuffer_add_buffer(out, payload);
2090 pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
2091 dbgOutMessageLen(msgs);
2092
2093 evbuffer_free(payload);
2094 tr_variantFree(&tmp);
2095 }
2096 }
2097
2098 /**
2099 *** Data Blocks
2100 **/
2101
2102 if (tr_peerIoGetWriteBufferSpace(msgs->io, now) >= msgs->torrent->blockSize && popNextRequest(msgs, &req))
2103 {
2104 --msgs->prefetchCount;
2105
2106 if (requestIsValid(msgs, &req) && tr_torrentPieceIsComplete(msgs->torrent, req.index))
2107 {
2108 bool err;
2109 uint32_t const msglen = 4 + 1 + 4 + 4 + req.length;
2110 struct evbuffer* out;
2111 struct evbuffer_iovec iovec[1];
2112
2113 out = evbuffer_new();
2114 evbuffer_expand(out, msglen);
2115
2116 evbuffer_add_uint32(out, sizeof(uint8_t) + 2 * sizeof(uint32_t) + req.length);
2117 evbuffer_add_uint8(out, BT_PIECE);
2118 evbuffer_add_uint32(out, req.index);
2119 evbuffer_add_uint32(out, req.offset);
2120
2121 evbuffer_reserve_space(out, req.length, iovec, 1);
2122 err = tr_cacheReadBlock(getSession(msgs)->cache, msgs->torrent, req.index, req.offset, req.length,
2123 iovec[0].iov_base) != 0;
2124 iovec[0].iov_len = req.length;
2125 evbuffer_commit_space(out, iovec, 1);
2126
2127 /* check the piece if it needs checking... */
2128 if (!err && tr_torrentPieceNeedsCheck(msgs->torrent, req.index))
2129 {
2130 err = !tr_torrentCheckPiece(msgs->torrent, req.index);
2131
2132 if (err)
2133 {
2134 tr_torrentSetLocalError(msgs->torrent, _("Please Verify Local Data! Piece #%zu is corrupt."),
2135 (size_t)req.index);
2136 }
2137 }
2138
2139 if (err)
2140 {
2141 if (fext)
2142 {
2143 protocolSendReject(msgs, &req);
2144 }
2145 }
2146 else
2147 {
2148 size_t const n = evbuffer_get_length(out);
2149 dbgmsg(msgs, "sending block %u:%u->%u", req.index, req.offset, req.length);
2150 TR_ASSERT(n == msglen);
2151 tr_peerIoWriteBuf(msgs->io, out, true);
2152 bytesWritten += n;
2153 msgs->clientSentAnythingAt = now;
2154 tr_historyAdd(&msgs->peer.blocksSentToPeer, tr_time(), 1);
2155 }
2156
2157 evbuffer_free(out);
2158
2159 if (err)
2160 {
2161 bytesWritten = 0;
2162 msgs = NULL;
2163 }
2164 }
2165 else if (fext) /* peer needs a reject message */
2166 {
2167 protocolSendReject(msgs, &req);
2168 }
2169
2170 if (msgs != NULL)
2171 {
2172 prefetchPieces(msgs);
2173 }
2174 }
2175
2176 /**
2177 *** Keepalive
2178 **/
2179
2180 if (msgs != NULL && msgs->clientSentAnythingAt != 0 && now - msgs->clientSentAnythingAt > KEEPALIVE_INTERVAL_SECS)
2181 {
2182 dbgmsg(msgs, "sending a keepalive message");
2183 evbuffer_add_uint32(msgs->outMessages, 0);
2184 pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2185 }
2186
2187 return bytesWritten;
2188 }
2189
peerPulse(void * vmsgs)2190 static void peerPulse(void* vmsgs)
2191 {
2192 tr_peerMsgs* msgs = vmsgs;
2193 time_t const now = tr_time();
2194
2195 if (tr_isPeerIo(msgs->io))
2196 {
2197 updateDesiredRequestCount(msgs);
2198 updateBlockRequests(msgs);
2199 updateMetadataRequests(msgs, now);
2200 }
2201
2202 for (;;)
2203 {
2204 if (fillOutputBuffer(msgs, now) < 1)
2205 {
2206 break;
2207 }
2208 }
2209 }
2210
tr_peerMsgsPulse(tr_peerMsgs * msgs)2211 void tr_peerMsgsPulse(tr_peerMsgs* msgs)
2212 {
2213 if (msgs != NULL)
2214 {
2215 peerPulse(msgs);
2216 }
2217 }
2218
gotError(tr_peerIo * io UNUSED,short what,void * vmsgs)2219 static void gotError(tr_peerIo* io UNUSED, short what, void* vmsgs)
2220 {
2221 if ((what & BEV_EVENT_TIMEOUT) != 0)
2222 {
2223 dbgmsg(vmsgs, "libevent got a timeout, what=%hd", what);
2224 }
2225
2226 if ((what & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) != 0)
2227 {
2228 dbgmsg(vmsgs, "libevent got an error! what=%hd, errno=%d (%s)", what, errno, tr_strerror(errno));
2229 }
2230
2231 fireError(vmsgs, ENOTCONN);
2232 }
2233
sendBitfield(tr_peerMsgs * msgs)2234 static void sendBitfield(tr_peerMsgs* msgs)
2235 {
2236 TR_ASSERT(tr_torrentHasMetadata(msgs->torrent));
2237
2238 void* bytes;
2239 size_t byte_count = 0;
2240 struct evbuffer* out = msgs->outMessages;
2241
2242 bytes = tr_torrentCreatePieceBitfield(msgs->torrent, &byte_count);
2243 evbuffer_add_uint32(out, sizeof(uint8_t) + byte_count);
2244 evbuffer_add_uint8(out, BT_BITFIELD);
2245 evbuffer_add(out, bytes, byte_count);
2246 dbgmsg(msgs, "sending bitfield... outMessage size is now %zu", evbuffer_get_length(out));
2247 pokeBatchPeriod(msgs, IMMEDIATE_PRIORITY_INTERVAL_SECS);
2248
2249 tr_free(bytes);
2250 }
2251
tellPeerWhatWeHave(tr_peerMsgs * msgs)2252 static void tellPeerWhatWeHave(tr_peerMsgs* msgs)
2253 {
2254 bool const fext = tr_peerIoSupportsFEXT(msgs->io);
2255
2256 if (fext && tr_torrentHasAll(msgs->torrent))
2257 {
2258 protocolSendHaveAll(msgs);
2259 }
2260 else if (fext && tr_torrentHasNone(msgs->torrent))
2261 {
2262 protocolSendHaveNone(msgs);
2263 }
2264 else if (!tr_torrentHasNone(msgs->torrent))
2265 {
2266 sendBitfield(msgs);
2267 }
2268 }
2269
2270 /**
2271 ***
2272 **/
2273
2274 /* some peers give us error messages if we send
2275 more than this many peers in a single pex message
2276 http://wiki.theory.org/BitTorrentPeerExchangeConventions */
2277 #define MAX_PEX_ADDED 50
2278 #define MAX_PEX_DROPPED 50
2279
2280 typedef struct
2281 {
2282 tr_pex* added;
2283 tr_pex* dropped;
2284 tr_pex* elements;
2285 int addedCount;
2286 int droppedCount;
2287 int elementCount;
2288 }
2289 PexDiffs;
2290
pexAddedCb(void * vpex,void * userData)2291 static void pexAddedCb(void* vpex, void* userData)
2292 {
2293 PexDiffs* diffs = userData;
2294 tr_pex* pex = vpex;
2295
2296 if (diffs->addedCount < MAX_PEX_ADDED)
2297 {
2298 diffs->added[diffs->addedCount++] = *pex;
2299 diffs->elements[diffs->elementCount++] = *pex;
2300 }
2301 }
2302
pexDroppedCb(void * vpex,void * userData)2303 static inline void pexDroppedCb(void* vpex, void* userData)
2304 {
2305 PexDiffs* diffs = userData;
2306 tr_pex* pex = vpex;
2307
2308 if (diffs->droppedCount < MAX_PEX_DROPPED)
2309 {
2310 diffs->dropped[diffs->droppedCount++] = *pex;
2311 }
2312 }
2313
pexElementCb(void * vpex,void * userData)2314 static inline void pexElementCb(void* vpex, void* userData)
2315 {
2316 PexDiffs* diffs = userData;
2317 tr_pex* pex = vpex;
2318
2319 diffs->elements[diffs->elementCount++] = *pex;
2320 }
2321
2322 typedef void (* tr_set_func)(void* element, void* userData);
2323
2324 /**
2325 * @brief find the differences and commonalities in two sorted sets
2326 * @param a the first set
2327 * @param aCount the number of elements in the set 'a'
2328 * @param b the second set
2329 * @param bCount the number of elements in the set 'b'
2330 * @param compare the sorting method for both sets
2331 * @param elementSize the sizeof the element in the two sorted sets
2332 * @param in_a called for items in set 'a' but not set 'b'
2333 * @param in_b called for items in set 'b' but not set 'a'
2334 * @param in_both called for items that are in both sets
2335 * @param userData user data passed along to in_a, in_b, and in_both
2336 */
tr_set_compare(void const * va,size_t aCount,void const * vb,size_t bCount,tr_voidptr_compare_func compare,size_t elementSize,tr_set_func in_a_cb,tr_set_func in_b_cb,tr_set_func in_both_cb,void * userData)2337 static void tr_set_compare(void const* va, size_t aCount, void const* vb, size_t bCount, tr_voidptr_compare_func compare,
2338 size_t elementSize, tr_set_func in_a_cb, tr_set_func in_b_cb, tr_set_func in_both_cb, void* userData)
2339 {
2340 uint8_t const* a = va;
2341 uint8_t const* b = vb;
2342 uint8_t const* aend = a + elementSize * aCount;
2343 uint8_t const* bend = b + elementSize * bCount;
2344
2345 while (a != aend || b != bend)
2346 {
2347 if (a == aend)
2348 {
2349 (*in_b_cb)((void*)b, userData);
2350 b += elementSize;
2351 }
2352 else if (b == bend)
2353 {
2354 (*in_a_cb)((void*)a, userData);
2355 a += elementSize;
2356 }
2357 else
2358 {
2359 int const val = (*compare)(a, b);
2360
2361 if (val == 0)
2362 {
2363 (*in_both_cb)((void*)a, userData);
2364 a += elementSize;
2365 b += elementSize;
2366 }
2367 else if (val < 0)
2368 {
2369 (*in_a_cb)((void*)a, userData);
2370 a += elementSize;
2371 }
2372 else if (val > 0)
2373 {
2374 (*in_b_cb)((void*)b, userData);
2375 b += elementSize;
2376 }
2377 }
2378 }
2379 }
2380
sendPex(tr_peerMsgs * msgs)2381 static void sendPex(tr_peerMsgs* msgs)
2382 {
2383 if (msgs->peerSupportsPex && tr_torrentAllowsPex(msgs->torrent))
2384 {
2385 PexDiffs diffs;
2386 PexDiffs diffs6;
2387 tr_pex* newPex = NULL;
2388 tr_pex* newPex6 = NULL;
2389 int const newCount = tr_peerMgrGetPeers(msgs->torrent, &newPex, TR_AF_INET, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2390 int const newCount6 = tr_peerMgrGetPeers(msgs->torrent, &newPex6, TR_AF_INET6, TR_PEERS_CONNECTED, MAX_PEX_PEER_COUNT);
2391
2392 /* build the diffs */
2393 diffs.added = tr_new(tr_pex, newCount);
2394 diffs.addedCount = 0;
2395 diffs.dropped = tr_new(tr_pex, msgs->pexCount);
2396 diffs.droppedCount = 0;
2397 diffs.elements = tr_new(tr_pex, newCount + msgs->pexCount);
2398 diffs.elementCount = 0;
2399 tr_set_compare(msgs->pex, msgs->pexCount, newPex, newCount, tr_pexCompare, sizeof(tr_pex), pexDroppedCb, pexAddedCb,
2400 pexElementCb, &diffs);
2401 diffs6.added = tr_new(tr_pex, newCount6);
2402 diffs6.addedCount = 0;
2403 diffs6.dropped = tr_new(tr_pex, msgs->pexCount6);
2404 diffs6.droppedCount = 0;
2405 diffs6.elements = tr_new(tr_pex, newCount6 + msgs->pexCount6);
2406 diffs6.elementCount = 0;
2407 tr_set_compare(msgs->pex6, msgs->pexCount6, newPex6, newCount6, tr_pexCompare, sizeof(tr_pex), pexDroppedCb, pexAddedCb,
2408 pexElementCb, &diffs6);
2409 dbgmsg(msgs, "pex: old peer count %d+%d, new peer count %d+%d, added %d+%d, removed %d+%d", msgs->pexCount,
2410 msgs->pexCount6, newCount, newCount6, diffs.addedCount, diffs6.addedCount, diffs.droppedCount, diffs6.droppedCount);
2411
2412 if (diffs.addedCount == 0 && diffs.droppedCount == 0 && diffs6.addedCount == 0 && diffs6.droppedCount == 0)
2413 {
2414 tr_free(diffs.elements);
2415 tr_free(diffs6.elements);
2416 }
2417 else
2418 {
2419 tr_variant val;
2420 uint8_t* tmp;
2421 uint8_t* walk;
2422 struct evbuffer* payload;
2423 struct evbuffer* out = msgs->outMessages;
2424
2425 /* update peer */
2426 tr_free(msgs->pex);
2427 msgs->pex = diffs.elements;
2428 msgs->pexCount = diffs.elementCount;
2429 tr_free(msgs->pex6);
2430 msgs->pex6 = diffs6.elements;
2431 msgs->pexCount6 = diffs6.elementCount;
2432
2433 /* build the pex payload */
2434 tr_variantInitDict(&val, 3); /* ipv6 support: left as 3: speed vs. likelihood? */
2435
2436 if (diffs.addedCount > 0)
2437 {
2438 /* "added" */
2439 tmp = walk = tr_new(uint8_t, diffs.addedCount * 6);
2440
2441 for (int i = 0; i < diffs.addedCount; ++i)
2442 {
2443 memcpy(walk, &diffs.added[i].addr.addr, 4);
2444 walk += 4;
2445 memcpy(walk, &diffs.added[i].port, 2);
2446 walk += 2;
2447 }
2448
2449 TR_ASSERT(walk - tmp == diffs.addedCount * 6);
2450 tr_variantDictAddRaw(&val, TR_KEY_added, tmp, walk - tmp);
2451 tr_free(tmp);
2452
2453 /* "added.f"
2454 * unset each holepunch flag because we don't support it. */
2455 tmp = walk = tr_new(uint8_t, diffs.addedCount);
2456
2457 for (int i = 0; i < diffs.addedCount; ++i)
2458 {
2459 *walk++ = diffs.added[i].flags & ~ADDED_F_HOLEPUNCH;
2460 }
2461
2462 TR_ASSERT(walk - tmp == diffs.addedCount);
2463 tr_variantDictAddRaw(&val, TR_KEY_added_f, tmp, walk - tmp);
2464 tr_free(tmp);
2465 }
2466
2467 if (diffs.droppedCount > 0)
2468 {
2469 /* "dropped" */
2470 tmp = walk = tr_new(uint8_t, diffs.droppedCount * 6);
2471
2472 for (int i = 0; i < diffs.droppedCount; ++i)
2473 {
2474 memcpy(walk, &diffs.dropped[i].addr.addr, 4);
2475 walk += 4;
2476 memcpy(walk, &diffs.dropped[i].port, 2);
2477 walk += 2;
2478 }
2479
2480 TR_ASSERT(walk - tmp == diffs.droppedCount * 6);
2481 tr_variantDictAddRaw(&val, TR_KEY_dropped, tmp, walk - tmp);
2482 tr_free(tmp);
2483 }
2484
2485 if (diffs6.addedCount > 0)
2486 {
2487 /* "added6" */
2488 tmp = walk = tr_new(uint8_t, diffs6.addedCount * 18);
2489
2490 for (int i = 0; i < diffs6.addedCount; ++i)
2491 {
2492 memcpy(walk, &diffs6.added[i].addr.addr.addr6.s6_addr, 16);
2493 walk += 16;
2494 memcpy(walk, &diffs6.added[i].port, 2);
2495 walk += 2;
2496 }
2497
2498 TR_ASSERT(walk - tmp == diffs6.addedCount * 18);
2499 tr_variantDictAddRaw(&val, TR_KEY_added6, tmp, walk - tmp);
2500 tr_free(tmp);
2501
2502 /* "added6.f"
2503 * unset each holepunch flag because we don't support it. */
2504 tmp = walk = tr_new(uint8_t, diffs6.addedCount);
2505
2506 for (int i = 0; i < diffs6.addedCount; ++i)
2507 {
2508 *walk++ = diffs6.added[i].flags & ~ADDED_F_HOLEPUNCH;
2509 }
2510
2511 TR_ASSERT(walk - tmp == diffs6.addedCount);
2512 tr_variantDictAddRaw(&val, TR_KEY_added6_f, tmp, walk - tmp);
2513 tr_free(tmp);
2514 }
2515
2516 if (diffs6.droppedCount > 0)
2517 {
2518 /* "dropped6" */
2519 tmp = walk = tr_new(uint8_t, diffs6.droppedCount * 18);
2520
2521 for (int i = 0; i < diffs6.droppedCount; ++i)
2522 {
2523 memcpy(walk, &diffs6.dropped[i].addr.addr.addr6.s6_addr, 16);
2524 walk += 16;
2525 memcpy(walk, &diffs6.dropped[i].port, 2);
2526 walk += 2;
2527 }
2528
2529 TR_ASSERT(walk - tmp == diffs6.droppedCount * 18);
2530 tr_variantDictAddRaw(&val, TR_KEY_dropped6, tmp, walk - tmp);
2531 tr_free(tmp);
2532 }
2533
2534 /* write the pex message */
2535 payload = tr_variantToBuf(&val, TR_VARIANT_FMT_BENC);
2536 evbuffer_add_uint32(out, 2 * sizeof(uint8_t) + evbuffer_get_length(payload));
2537 evbuffer_add_uint8(out, BT_LTEP);
2538 evbuffer_add_uint8(out, msgs->ut_pex_id);
2539 evbuffer_add_buffer(out, payload);
2540 pokeBatchPeriod(msgs, HIGH_PRIORITY_INTERVAL_SECS);
2541 dbgmsg(msgs, "sending a pex message; outMessage size is now %zu", evbuffer_get_length(out));
2542 dbgOutMessageLen(msgs);
2543
2544 evbuffer_free(payload);
2545 tr_variantFree(&val);
2546 }
2547
2548 /* cleanup */
2549 tr_free(diffs.added);
2550 tr_free(diffs.dropped);
2551 tr_free(newPex);
2552 tr_free(diffs6.added);
2553 tr_free(diffs6.dropped);
2554 tr_free(newPex6);
2555
2556 /* msgs->clientSentPexAt = tr_time(); */
2557 }
2558 }
2559
pexPulse(evutil_socket_t foo UNUSED,short bar UNUSED,void * vmsgs)2560 static void pexPulse(evutil_socket_t foo UNUSED, short bar UNUSED, void* vmsgs)
2561 {
2562 struct tr_peerMsgs* msgs = vmsgs;
2563
2564 sendPex(msgs);
2565
2566 TR_ASSERT(msgs->pexTimer != NULL);
2567 tr_timerAdd(msgs->pexTimer, PEX_INTERVAL_SECS, 0);
2568 }
2569
2570 /***
2571 **** tr_peer virtual functions
2572 ***/
2573
peermsgs_is_transferring_pieces(struct tr_peer const * peer,uint64_t now,tr_direction direction,unsigned int * setme_Bps)2574 static bool peermsgs_is_transferring_pieces(struct tr_peer const* peer, uint64_t now, tr_direction direction,
2575 unsigned int* setme_Bps)
2576 {
2577 unsigned int Bps = 0;
2578
2579 if (tr_isPeerMsgs(peer))
2580 {
2581 tr_peerMsgs const* msgs = (tr_peerMsgs const*)peer;
2582 Bps = tr_peerIoGetPieceSpeed_Bps(msgs->io, now, direction);
2583 }
2584
2585 if (setme_Bps != NULL)
2586 {
2587 *setme_Bps = Bps;
2588 }
2589
2590 return Bps > 0;
2591 }
2592
peermsgs_destruct(tr_peer * peer)2593 static void peermsgs_destruct(tr_peer* peer)
2594 {
2595 tr_peerMsgs* msgs = PEER_MSGS(peer);
2596
2597 TR_ASSERT(msgs != NULL);
2598
2599 tr_peerMsgsSetActive(msgs, TR_UP, false);
2600 tr_peerMsgsSetActive(msgs, TR_DOWN, false);
2601
2602 if (msgs->pexTimer != NULL)
2603 {
2604 event_free(msgs->pexTimer);
2605 }
2606
2607 if (msgs->incoming.block != NULL)
2608 {
2609 evbuffer_free(msgs->incoming.block);
2610 }
2611
2612 if (msgs->io != NULL)
2613 {
2614 tr_peerIoClear(msgs->io);
2615 tr_peerIoUnref(msgs->io); /* balanced by the ref in handshakeDoneCB() */
2616 }
2617
2618 evbuffer_free(msgs->outMessages);
2619 tr_free(msgs->pex6);
2620 tr_free(msgs->pex);
2621
2622 tr_peerDestruct(&msgs->peer);
2623
2624 memset(msgs, ~0, sizeof(tr_peerMsgs));
2625 }
2626
2627 static struct tr_peer_virtual_funcs const my_funcs =
2628 {
2629 .destruct = peermsgs_destruct,
2630 .is_transferring_pieces = peermsgs_is_transferring_pieces
2631 };
2632
2633 /***
2634 ****
2635 ***/
2636
tr_peerMsgsGetConnectionAge(tr_peerMsgs const * msgs)2637 time_t tr_peerMsgsGetConnectionAge(tr_peerMsgs const* msgs)
2638 {
2639 TR_ASSERT(tr_isPeerMsgs(msgs));
2640
2641 return tr_peerIoGetAge(msgs->io);
2642 }
2643
tr_peerMsgsIsPeerChoked(tr_peerMsgs const * msgs)2644 bool tr_peerMsgsIsPeerChoked(tr_peerMsgs const* msgs)
2645 {
2646 TR_ASSERT(tr_isPeerMsgs(msgs));
2647
2648 return msgs->peer_is_choked;
2649 }
2650
tr_peerMsgsIsPeerInterested(tr_peerMsgs const * msgs)2651 bool tr_peerMsgsIsPeerInterested(tr_peerMsgs const* msgs)
2652 {
2653 TR_ASSERT(tr_isPeerMsgs(msgs));
2654
2655 return msgs->peer_is_interested;
2656 }
2657
tr_peerMsgsIsClientChoked(tr_peerMsgs const * msgs)2658 bool tr_peerMsgsIsClientChoked(tr_peerMsgs const* msgs)
2659 {
2660 TR_ASSERT(tr_isPeerMsgs(msgs));
2661
2662 return msgs->client_is_choked;
2663 }
2664
tr_peerMsgsIsClientInterested(tr_peerMsgs const * msgs)2665 bool tr_peerMsgsIsClientInterested(tr_peerMsgs const* msgs)
2666 {
2667 TR_ASSERT(tr_isPeerMsgs(msgs));
2668
2669 return msgs->client_is_interested;
2670 }
2671
tr_peerMsgsIsUtpConnection(tr_peerMsgs const * msgs)2672 bool tr_peerMsgsIsUtpConnection(tr_peerMsgs const* msgs)
2673 {
2674 TR_ASSERT(tr_isPeerMsgs(msgs));
2675
2676 return msgs->io->socket.type == TR_PEER_SOCKET_TYPE_UTP;
2677 }
2678
tr_peerMsgsIsEncrypted(tr_peerMsgs const * msgs)2679 bool tr_peerMsgsIsEncrypted(tr_peerMsgs const* msgs)
2680 {
2681 TR_ASSERT(tr_isPeerMsgs(msgs));
2682
2683 return tr_peerIoIsEncrypted(msgs->io);
2684 }
2685
tr_peerMsgsIsIncomingConnection(tr_peerMsgs const * msgs)2686 bool tr_peerMsgsIsIncomingConnection(tr_peerMsgs const* msgs)
2687 {
2688 TR_ASSERT(tr_isPeerMsgs(msgs));
2689
2690 return tr_peerIoIsIncoming(msgs->io);
2691 }
2692
2693 /***
2694 ****
2695 ***/
2696
tr_isPeerMsgs(void const * msgs)2697 bool tr_isPeerMsgs(void const* msgs)
2698 {
2699 /* FIXME: this is pretty crude */
2700 return msgs != NULL && ((struct tr_peerMsgs*)msgs)->magic_number == MAGIC_NUMBER;
2701 }
2702
tr_peerMsgsCast(void * vm)2703 tr_peerMsgs* tr_peerMsgsCast(void* vm)
2704 {
2705 return tr_isPeerMsgs(vm) ? vm : NULL;
2706 }
2707
tr_peerMsgsNew(struct tr_torrent * torrent,struct tr_peerIo * io,tr_peer_callback callback,void * callbackData)2708 tr_peerMsgs* tr_peerMsgsNew(struct tr_torrent* torrent, struct tr_peerIo* io, tr_peer_callback callback, void* callbackData)
2709 {
2710 TR_ASSERT(io != NULL);
2711
2712 tr_peerMsgs* m = tr_new0(tr_peerMsgs, 1);
2713
2714 tr_peerConstruct(&m->peer, torrent);
2715 m->peer.funcs = &my_funcs;
2716
2717 m->magic_number = MAGIC_NUMBER;
2718 m->client_is_choked = true;
2719 m->peer_is_choked = true;
2720 m->client_is_interested = false;
2721 m->peer_is_interested = false;
2722 m->is_active[TR_UP] = false;
2723 m->is_active[TR_DOWN] = false;
2724 m->callback = callback;
2725 m->callbackData = callbackData;
2726 m->io = io;
2727 m->torrent = torrent;
2728 m->state = AWAITING_BT_LENGTH;
2729 m->outMessages = evbuffer_new();
2730 m->outMessagesBatchedAt = 0;
2731 m->outMessagesBatchPeriod = LOW_PRIORITY_INTERVAL_SECS;
2732
2733 if (tr_torrentAllowsPex(torrent))
2734 {
2735 m->pexTimer = evtimer_new(torrent->session->event_base, pexPulse, m);
2736 tr_timerAdd(m->pexTimer, PEX_INTERVAL_SECS, 0);
2737 }
2738
2739 if (tr_peerIoSupportsUTP(m->io))
2740 {
2741 tr_address const* addr = tr_peerIoGetAddress(m->io, NULL);
2742 tr_peerMgrSetUtpSupported(torrent, addr);
2743 tr_peerMgrSetUtpFailed(torrent, addr, false);
2744 }
2745
2746 if (tr_peerIoSupportsLTEP(m->io))
2747 {
2748 sendLtepHandshake(m);
2749 }
2750
2751 tellPeerWhatWeHave(m);
2752
2753 if (tr_dhtEnabled(torrent->session) && tr_peerIoSupportsDHT(m->io))
2754 {
2755 /* Only send PORT over IPv6 when the IPv6 DHT is running (BEP-32). */
2756 struct tr_address const* addr = tr_peerIoGetAddress(m->io, NULL);
2757
2758 if (addr->type == TR_AF_INET || tr_globalIPv6() != NULL)
2759 {
2760 protocolSendPort(m, tr_dhtPort(torrent->session));
2761 }
2762 }
2763
2764 tr_peerIoSetIOFuncs(m->io, canRead, didWrite, gotError, m);
2765 updateDesiredRequestCount(m);
2766
2767 return m;
2768 }
2769