1 /*
2 * This file Copyright (C) 2007-2014 Mnemosyne LLC
3 *
4 * It may be used under the GNU GPL versions 2 or 3
5 * or any future license endorsed by Mnemosyne LLC.
6 *
7 */
8
9 #include <errno.h>
10 #include <string.h>
11
12 #include <event2/event.h>
13 #include <event2/buffer.h>
14 #include <event2/bufferevent.h>
15
16 #include <libutp/utp.h>
17
18 #include "transmission.h"
19 #include "session.h"
20 #include "bandwidth.h"
21 #include "log.h"
22 #include "net.h"
23 #include "peer-common.h" /* MAX_BLOCK_SIZE */
24 #include "peer-io.h"
25 #include "tr-assert.h"
26 #include "tr-utp.h"
27 #include "trevent.h" /* tr_runInEventThread() */
28 #include "utils.h"
29
30 #ifdef _WIN32
31 #undef EAGAIN
32 #define EAGAIN WSAEWOULDBLOCK
33 #undef EINTR
34 #define EINTR WSAEINTR
35 #undef EINPROGRESS
36 #define EINPROGRESS WSAEINPROGRESS
37 #undef EPIPE
38 #define EPIPE WSAECONNRESET
39 #endif
40
41 /* The amount of read bufferring that we allow for uTP sockets. */
42
43 #define UTP_READ_BUFFER_SIZE (256 * 1024)
44
guessPacketOverhead(size_t d)45 static size_t guessPacketOverhead(size_t d)
46 {
47 /**
48 * http://sd.wareonearth.com/~phil/net/overhead/
49 *
50 * TCP over Ethernet:
51 * Assuming no header compression (e.g. not PPP)
52 * Add 20 IPv4 header or 40 IPv6 header (no options)
53 * Add 20 TCP header
54 * Add 12 bytes optional TCP timestamps
55 * Max TCP Payload data rates over ethernet are thus:
56 * (1500-40)/ (38+1500) = 94.9285 % IPv4, minimal headers
57 * (1500-52)/ (38+1500) = 94.1482 % IPv4, TCP timestamps
58 * (1500-52)/ (42+1500) = 93.9040 % 802.1q, IPv4, TCP timestamps
59 * (1500-60)/ (38+1500) = 93.6281 % IPv6, minimal headers
60 * (1500-72)/ (38+1500) = 92.8479 % IPv6, TCP timestamps
61 * (1500-72)/ (42+1500) = 92.6070 % 802.1q, IPv6, ICP timestamps
62 */
63 double const assumed_payload_data_rate = 94.0;
64
65 return (unsigned int)(d * (100.0 / assumed_payload_data_rate) - d);
66 }
67
68 /**
69 ***
70 **/
71
72 #define dbgmsg(io, ...) tr_logAddDeepNamed(tr_peerIoGetAddrStr(io), __VA_ARGS__)
73
74 /**
75 ***
76 **/
77
78 struct tr_datatype
79 {
80 struct tr_datatype* next;
81 size_t length;
82 bool isPieceData;
83 };
84
85 static struct tr_datatype* datatype_pool = NULL;
86
87 static struct tr_datatype const TR_DATATYPE_INIT =
88 {
89 .next = NULL,
90 .length = 0,
91 .isPieceData = false
92 };
93
datatype_new(void)94 static struct tr_datatype* datatype_new(void)
95 {
96 struct tr_datatype* ret;
97
98 if (datatype_pool == NULL)
99 {
100 ret = tr_new(struct tr_datatype, 1);
101 }
102 else
103 {
104 ret = datatype_pool;
105 datatype_pool = datatype_pool->next;
106 }
107
108 *ret = TR_DATATYPE_INIT;
109 return ret;
110 }
111
datatype_free(struct tr_datatype * datatype)112 static void datatype_free(struct tr_datatype* datatype)
113 {
114 datatype->next = datatype_pool;
115 datatype_pool = datatype;
116 }
117
peer_io_pull_datatype(tr_peerIo * io)118 static void peer_io_pull_datatype(tr_peerIo* io)
119 {
120 struct tr_datatype* tmp;
121
122 if ((tmp = io->outbuf_datatypes) != NULL)
123 {
124 io->outbuf_datatypes = tmp->next;
125 datatype_free(tmp);
126 }
127 }
128
peer_io_push_datatype(tr_peerIo * io,struct tr_datatype * datatype)129 static void peer_io_push_datatype(tr_peerIo* io, struct tr_datatype* datatype)
130 {
131 struct tr_datatype* tmp;
132
133 if ((tmp = io->outbuf_datatypes) != NULL)
134 {
135 while (tmp->next != NULL)
136 {
137 tmp = tmp->next;
138 }
139
140 tmp->next = datatype;
141 }
142 else
143 {
144 io->outbuf_datatypes = datatype;
145 }
146 }
147
148 /***
149 ****
150 ***/
151
didWriteWrapper(tr_peerIo * io,unsigned int bytes_transferred)152 static void didWriteWrapper(tr_peerIo* io, unsigned int bytes_transferred)
153 {
154 while (bytes_transferred != 0 && tr_isPeerIo(io))
155 {
156 struct tr_datatype* next = io->outbuf_datatypes;
157
158 unsigned int const payload = MIN(next->length, bytes_transferred);
159 /* For uTP sockets, the overhead is computed in utp_on_overhead. */
160 unsigned int const overhead = io->socket.type == TR_PEER_SOCKET_TYPE_TCP ? guessPacketOverhead(payload) : 0;
161 uint64_t const now = tr_time_msec();
162
163 tr_bandwidthUsed(&io->bandwidth, TR_UP, payload, next->isPieceData, now);
164
165 if (overhead > 0)
166 {
167 tr_bandwidthUsed(&io->bandwidth, TR_UP, overhead, false, now);
168 }
169
170 if (io->didWrite != NULL)
171 {
172 io->didWrite(io, payload, next->isPieceData, io->userData);
173 }
174
175 if (tr_isPeerIo(io))
176 {
177 bytes_transferred -= payload;
178 next->length -= payload;
179
180 if (next->length == 0)
181 {
182 peer_io_pull_datatype(io);
183 }
184 }
185 }
186 }
187
canReadWrapper(tr_peerIo * io)188 static void canReadWrapper(tr_peerIo* io)
189 {
190 bool err = false;
191 bool done = false;
192 tr_session* session;
193
194 dbgmsg(io, "canRead");
195
196 tr_peerIoRef(io);
197
198 session = io->session;
199
200 /* try to consume the input buffer */
201 if (io->canRead != NULL)
202 {
203 uint64_t const now = tr_time_msec();
204
205 tr_sessionLock(session);
206
207 while (!done && !err)
208 {
209 size_t piece = 0;
210 size_t const oldLen = evbuffer_get_length(io->inbuf);
211 int const ret = io->canRead(io, io->userData, &piece);
212 size_t const used = oldLen - evbuffer_get_length(io->inbuf);
213 unsigned int const overhead = guessPacketOverhead(used);
214
215 if (piece != 0 || piece != used)
216 {
217 if (piece != 0)
218 {
219 tr_bandwidthUsed(&io->bandwidth, TR_DOWN, piece, true, now);
220 }
221
222 if (used != piece)
223 {
224 tr_bandwidthUsed(&io->bandwidth, TR_DOWN, used - piece, false, now);
225 }
226 }
227
228 if (overhead > 0)
229 {
230 tr_bandwidthUsed(&io->bandwidth, TR_UP, overhead, false, now);
231 }
232
233 switch (ret)
234 {
235 case READ_NOW:
236 if (evbuffer_get_length(io->inbuf) != 0)
237 {
238 continue;
239 }
240
241 done = true;
242 break;
243
244 case READ_LATER:
245 done = true;
246 break;
247
248 case READ_ERR:
249 err = true;
250 break;
251 }
252
253 TR_ASSERT(tr_isPeerIo(io));
254 }
255
256 tr_sessionUnlock(session);
257 }
258
259 tr_peerIoUnref(io);
260 }
261
event_read_cb(evutil_socket_t fd,short event UNUSED,void * vio)262 static void event_read_cb(evutil_socket_t fd, short event UNUSED, void* vio)
263 {
264 tr_peerIo* io = vio;
265
266 TR_ASSERT(tr_isPeerIo(io));
267 TR_ASSERT(io->socket.type == TR_PEER_SOCKET_TYPE_TCP);
268
269 int res;
270 int e;
271
272 /* Limit the input buffer to 256K, so it doesn't grow too large */
273 unsigned int howmuch;
274 unsigned int curlen;
275 tr_direction const dir = TR_DOWN;
276 unsigned int const max = 256 * 1024;
277
278 io->pendingEvents &= ~EV_READ;
279
280 curlen = evbuffer_get_length(io->inbuf);
281 howmuch = curlen >= max ? 0 : max - curlen;
282 howmuch = tr_bandwidthClamp(&io->bandwidth, TR_DOWN, howmuch);
283
284 dbgmsg(io, "libevent says this peer is ready to read");
285
286 /* if we don't have any bandwidth left, stop reading */
287 if (howmuch < 1)
288 {
289 tr_peerIoSetEnabled(io, dir, false);
290 return;
291 }
292
293 EVUTIL_SET_SOCKET_ERROR(0);
294 res = evbuffer_read(io->inbuf, fd, (int)howmuch);
295 e = EVUTIL_SOCKET_ERROR();
296
297 if (res > 0)
298 {
299 tr_peerIoSetEnabled(io, dir, true);
300
301 /* Invoke the user callback - must always be called last */
302 canReadWrapper(io);
303 }
304 else
305 {
306 char errstr[512];
307 short what = BEV_EVENT_READING;
308
309 if (res == 0) /* EOF */
310 {
311 what |= BEV_EVENT_EOF;
312 }
313 else if (res == -1)
314 {
315 if (e == EAGAIN || e == EINTR)
316 {
317 tr_peerIoSetEnabled(io, dir, true);
318 return;
319 }
320
321 what |= BEV_EVENT_ERROR;
322 }
323
324 dbgmsg(io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e,
325 tr_net_strerror(errstr, sizeof(errstr), e));
326
327 if (io->gotError != NULL)
328 {
329 io->gotError(io, what, io->userData);
330 }
331 }
332 }
333
tr_evbuffer_write(tr_peerIo * io,int fd,size_t howmuch)334 static int tr_evbuffer_write(tr_peerIo* io, int fd, size_t howmuch)
335 {
336 int e;
337 int n;
338 char errstr[256];
339
340 EVUTIL_SET_SOCKET_ERROR(0);
341 n = evbuffer_write_atmost(io->outbuf, fd, howmuch);
342 e = EVUTIL_SOCKET_ERROR();
343 dbgmsg(io, "wrote %d to peer (%s)", n, (n == -1 ? tr_net_strerror(errstr, sizeof(errstr), e) : ""));
344
345 return n;
346 }
347
event_write_cb(evutil_socket_t fd,short event UNUSED,void * vio)348 static void event_write_cb(evutil_socket_t fd, short event UNUSED, void* vio)
349 {
350 tr_peerIo* io = vio;
351
352 TR_ASSERT(tr_isPeerIo(io));
353 TR_ASSERT(io->socket.type == TR_PEER_SOCKET_TYPE_TCP);
354
355 int res = 0;
356 int e;
357 short what = BEV_EVENT_WRITING;
358 size_t howmuch;
359 tr_direction const dir = TR_UP;
360 char errstr[1024];
361
362 io->pendingEvents &= ~EV_WRITE;
363
364 dbgmsg(io, "libevent says this peer is ready to write");
365
366 /* Write as much as possible, since the socket is non-blocking, write() will
367 * return if it can't write any more data without blocking */
368 howmuch = tr_bandwidthClamp(&io->bandwidth, dir, evbuffer_get_length(io->outbuf));
369
370 /* if we don't have any bandwidth left, stop writing */
371 if (howmuch < 1)
372 {
373 tr_peerIoSetEnabled(io, dir, false);
374 return;
375 }
376
377 EVUTIL_SET_SOCKET_ERROR(0);
378 res = tr_evbuffer_write(io, fd, howmuch);
379 e = EVUTIL_SOCKET_ERROR();
380
381 if (res == -1)
382 {
383 if (e == 0 || e == EAGAIN || e == EINTR || e == EINPROGRESS)
384 {
385 goto reschedule;
386 }
387
388 /* error case */
389 what |= BEV_EVENT_ERROR;
390 }
391 else if (res == 0)
392 {
393 /* eof case */
394 what |= BEV_EVENT_EOF;
395 }
396
397 if (res <= 0)
398 {
399 goto error;
400 }
401
402 if (evbuffer_get_length(io->outbuf) != 0)
403 {
404 tr_peerIoSetEnabled(io, dir, true);
405 }
406
407 didWriteWrapper(io, res);
408 return;
409
410 reschedule:
411 if (evbuffer_get_length(io->outbuf) != 0)
412 {
413 tr_peerIoSetEnabled(io, dir, true);
414 }
415
416 return;
417
418 error:
419 tr_net_strerror(errstr, sizeof(errstr), e);
420 dbgmsg(io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr);
421
422 if (io->gotError != NULL)
423 {
424 io->gotError(io, what, io->userData);
425 }
426 }
427
428 /**
429 ***
430 **/
431
maybeSetCongestionAlgorithm(tr_socket_t socket,char const * algorithm)432 static void maybeSetCongestionAlgorithm(tr_socket_t socket, char const* algorithm)
433 {
434 if (!tr_str_is_empty(algorithm))
435 {
436 tr_netSetCongestionControl(socket, algorithm);
437 }
438 }
439
440 #ifdef WITH_UTP
441 /* UTP callbacks */
442
utp_on_read(void * closure,unsigned char const * buf,size_t buflen)443 static void utp_on_read(void* closure, unsigned char const* buf, size_t buflen)
444 {
445 tr_peerIo* io = closure;
446
447 TR_ASSERT(tr_isPeerIo(io));
448
449 int rc = evbuffer_add(io->inbuf, buf, buflen);
450 dbgmsg(io, "utp_on_read got %zu bytes", buflen);
451
452 if (rc < 0)
453 {
454 tr_logAddNamedError("UTP", "On read evbuffer_add");
455 return;
456 }
457
458 tr_peerIoSetEnabled(io, TR_DOWN, true);
459 canReadWrapper(io);
460 }
461
utp_on_write(void * closure,unsigned char * buf,size_t buflen)462 static void utp_on_write(void* closure, unsigned char* buf, size_t buflen)
463 {
464 tr_peerIo* io = closure;
465
466 TR_ASSERT(tr_isPeerIo(io));
467
468 int rc = evbuffer_remove(io->outbuf, buf, buflen);
469 dbgmsg(io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc);
470 TR_ASSERT(rc == (int)buflen); /* if this fails, we've corrupted our bookkeeping somewhere */
471
472 if (rc < (long)buflen)
473 {
474 tr_logAddNamedError("UTP", "Short write: %d < %ld", rc, (long)buflen);
475 }
476
477 didWriteWrapper(io, buflen);
478 }
479
utp_get_rb_size(void * closure)480 static size_t utp_get_rb_size(void* closure)
481 {
482 tr_peerIo* io = closure;
483
484 TR_ASSERT(tr_isPeerIo(io));
485
486 size_t bytes = tr_bandwidthClamp(&io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE);
487
488 dbgmsg(io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes);
489 return UTP_READ_BUFFER_SIZE - bytes;
490 }
491
492 static int tr_peerIoTryWrite(tr_peerIo* io, size_t howmuch);
493
utp_on_writable(tr_peerIo * io)494 static void utp_on_writable(tr_peerIo* io)
495 {
496 int n;
497
498 dbgmsg(io, "libutp says this peer is ready to write");
499
500 n = tr_peerIoTryWrite(io, SIZE_MAX);
501 tr_peerIoSetEnabled(io, TR_UP, n != 0 && evbuffer_get_length(io->outbuf) != 0);
502 }
503
utp_on_state_change(void * closure,int state)504 static void utp_on_state_change(void* closure, int state)
505 {
506 tr_peerIo* io = closure;
507
508 TR_ASSERT(tr_isPeerIo(io));
509
510 if (state == UTP_STATE_CONNECT)
511 {
512 dbgmsg(io, "utp_on_state_change -- changed to connected");
513 io->utpSupported = true;
514 }
515 else if (state == UTP_STATE_WRITABLE)
516 {
517 dbgmsg(io, "utp_on_state_change -- changed to writable");
518
519 if ((io->pendingEvents & EV_WRITE) != 0)
520 {
521 utp_on_writable(io);
522 }
523 }
524 else if (state == UTP_STATE_EOF)
525 {
526 if (io->gotError != NULL)
527 {
528 io->gotError(io, BEV_EVENT_EOF, io->userData);
529 }
530 }
531 else if (state == UTP_STATE_DESTROYING)
532 {
533 tr_logAddNamedError("UTP", "Impossible state UTP_STATE_DESTROYING");
534 return;
535 }
536 else
537 {
538 tr_logAddNamedError("UTP", "Unknown state %d", state);
539 }
540 }
541
utp_on_error(void * closure,int errcode)542 static void utp_on_error(void* closure, int errcode)
543 {
544 tr_peerIo* io = closure;
545
546 TR_ASSERT(tr_isPeerIo(io));
547
548 dbgmsg(io, "utp_on_error -- errcode is %d", errcode);
549
550 if (io->gotError != NULL)
551 {
552 errno = errcode;
553 io->gotError(io, BEV_EVENT_ERROR, io->userData);
554 }
555 }
556
utp_on_overhead(void * closure,uint8_t send,size_t count,int type UNUSED)557 static void utp_on_overhead(void* closure, uint8_t /* bool */ send, size_t count, int type UNUSED)
558 {
559 tr_peerIo* io = closure;
560
561 TR_ASSERT(tr_isPeerIo(io));
562
563 dbgmsg(io, "utp_on_overhead -- count is %zu", count);
564
565 tr_bandwidthUsed(&io->bandwidth, send ? TR_UP : TR_DOWN, count, false, tr_time_msec());
566 }
567
568 static struct UTPFunctionTable utp_function_table =
569 {
570 .on_read = utp_on_read,
571 .on_write = utp_on_write,
572 .get_rb_size = utp_get_rb_size,
573 .on_state = utp_on_state_change,
574 .on_error = utp_on_error,
575 .on_overhead = utp_on_overhead
576 };
577
578 /* Dummy UTP callbacks. */
579 /* We switch a UTP socket to use these after the associated peerIo has been
580 destroyed -- see io_dtor. */
581
dummy_read(void * closure UNUSED,unsigned char const * buf UNUSED,size_t buflen UNUSED)582 static void dummy_read(void* closure UNUSED, unsigned char const* buf UNUSED, size_t buflen UNUSED)
583 {
584 /* This cannot happen, as far as I'm aware. */
585 tr_logAddNamedError("UTP", "On_read called on closed socket");
586 }
587
dummy_write(void * closure UNUSED,unsigned char * buf,size_t buflen)588 static void dummy_write(void* closure UNUSED, unsigned char* buf, size_t buflen)
589 {
590 /* This can very well happen if we've shut down a peer connection that
591 had unflushed buffers. Complain and send zeroes. */
592 tr_logAddNamedDbg("UTP", "On_write called on closed socket");
593 memset(buf, 0, buflen);
594 }
595
dummy_get_rb_size(void * closure UNUSED)596 static size_t dummy_get_rb_size(void* closure UNUSED)
597 {
598 return 0;
599 }
600
dummy_on_state_change(void * closure UNUSED,int state UNUSED)601 static void dummy_on_state_change(void* closure UNUSED, int state UNUSED)
602 {
603 }
604
dummy_on_error(void * closure UNUSED,int errcode UNUSED)605 static void dummy_on_error(void* closure UNUSED, int errcode UNUSED)
606 {
607 }
608
dummy_on_overhead(void * closure UNUSED,uint8_t send UNUSED,size_t count UNUSED,int type UNUSED)609 static void dummy_on_overhead(void* closure UNUSED, uint8_t /* bool */ send UNUSED, size_t count UNUSED, int type UNUSED)
610 {
611 }
612
613 static struct UTPFunctionTable dummy_utp_function_table =
614 {
615 .on_read = dummy_read,
616 .on_write = dummy_write,
617 .get_rb_size = dummy_get_rb_size,
618 .on_state = dummy_on_state_change,
619 .on_error = dummy_on_error,
620 .on_overhead = dummy_on_overhead
621 };
622
623 #endif /* #ifdef WITH_UTP */
624
tr_peerIoNew(tr_session * session,tr_bandwidth * parent,tr_address const * addr,tr_port port,uint8_t const * torrentHash,bool isIncoming,bool isSeed,struct tr_peer_socket const socket)625 static tr_peerIo* tr_peerIoNew(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port,
626 uint8_t const* torrentHash, bool isIncoming, bool isSeed, struct tr_peer_socket const socket)
627 {
628 TR_ASSERT(session != NULL);
629 TR_ASSERT(session->events != NULL);
630 TR_ASSERT(tr_amInEventThread(session));
631
632 #ifdef WITH_UTP
633 TR_ASSERT(socket.type == TR_PEER_SOCKET_TYPE_TCP || socket.type == TR_PEER_SOCKET_TYPE_UTP);
634 #else
635 TR_ASSERT(socket.type == TR_PEER_SOCKET_TYPE_TCP);
636 #endif
637
638 if (socket.type == TR_PEER_SOCKET_TYPE_TCP)
639 {
640 tr_netSetTOS(socket.handle.tcp, session->peerSocketTOS, addr->type);
641 maybeSetCongestionAlgorithm(socket.handle.tcp, session->peer_congestion_algorithm);
642 }
643
644 tr_peerIo* io = tr_new0(tr_peerIo, 1);
645 io->magicNumber = PEER_IO_MAGIC_NUMBER;
646 io->refCount = 1;
647 tr_cryptoConstruct(&io->crypto, torrentHash, isIncoming);
648 io->session = session;
649 io->addr = *addr;
650 io->isSeed = isSeed;
651 io->port = port;
652 io->socket = socket;
653 io->isIncoming = isIncoming;
654 io->timeCreated = tr_time();
655 io->inbuf = evbuffer_new();
656 io->outbuf = evbuffer_new();
657 tr_bandwidthConstruct(&io->bandwidth, session, parent);
658 tr_bandwidthSetPeer(&io->bandwidth, io);
659 dbgmsg(io, "bandwidth is %p; its parent is %p", (void*)&io->bandwidth, (void*)parent);
660
661 switch (socket.type)
662 {
663 case TR_PEER_SOCKET_TYPE_TCP:
664 dbgmsg(io, "socket (tcp) is %" PRIdMAX, (intmax_t)socket.handle.tcp);
665 io->event_read = event_new(session->event_base, socket.handle.tcp, EV_READ, event_read_cb, io);
666 io->event_write = event_new(session->event_base, socket.handle.tcp, EV_WRITE, event_write_cb, io);
667 break;
668
669 #ifdef WITH_UTP
670
671 case TR_PEER_SOCKET_TYPE_UTP:
672 dbgmsg(io, "socket (utp) is %p", (void*)socket.handle.utp);
673 UTP_SetSockopt(socket.handle.utp, SO_RCVBUF, UTP_READ_BUFFER_SIZE);
674 dbgmsg(io, "%s", "calling UTP_SetCallbacks &utp_function_table");
675 UTP_SetCallbacks(socket.handle.utp, &utp_function_table, io);
676
677 if (!isIncoming)
678 {
679 dbgmsg(io, "%s", "calling UTP_Connect");
680 UTP_Connect(socket.handle.utp);
681 }
682
683 break;
684
685 #endif
686
687 default:
688 TR_ASSERT_MSG(false, "unsupported peer socket type %d", socket.type);
689 }
690
691 return io;
692 }
693
tr_peerIoNewIncoming(tr_session * session,tr_bandwidth * parent,tr_address const * addr,tr_port port,struct tr_peer_socket const socket)694 tr_peerIo* tr_peerIoNewIncoming(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port,
695 struct tr_peer_socket const socket)
696 {
697 TR_ASSERT(session != NULL);
698 TR_ASSERT(tr_address_is_valid(addr));
699
700 return tr_peerIoNew(session, parent, addr, port, NULL, true, false, socket);
701 }
702
tr_peerIoNewOutgoing(tr_session * session,tr_bandwidth * parent,tr_address const * addr,tr_port port,uint8_t const * torrentHash,bool isSeed,bool utp)703 tr_peerIo* tr_peerIoNewOutgoing(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port,
704 uint8_t const* torrentHash, bool isSeed, bool utp)
705 {
706 TR_ASSERT(session != NULL);
707 TR_ASSERT(tr_address_is_valid(addr));
708 TR_ASSERT(torrentHash != NULL);
709
710 struct tr_peer_socket socket = TR_PEER_SOCKET_INIT;
711
712 if (utp)
713 {
714 socket = tr_netOpenPeerUTPSocket(session, addr, port, isSeed);
715 }
716
717 if (socket.type == TR_PEER_SOCKET_TYPE_NONE)
718 {
719 socket = tr_netOpenPeerSocket(session, addr, port, isSeed);
720 dbgmsg(NULL, "tr_netOpenPeerSocket returned fd %" PRIdMAX, (intmax_t)(socket.type != TR_PEER_SOCKET_TYPE_NONE ?
721 socket.handle.tcp : TR_BAD_SOCKET));
722 }
723
724 if (socket.type == TR_PEER_SOCKET_TYPE_NONE)
725 {
726 return NULL;
727 }
728
729 return tr_peerIoNew(session, parent, addr, port, torrentHash, false, isSeed, socket);
730 }
731
732 /***
733 ****
734 ***/
735
event_enable(tr_peerIo * io,short event)736 static void event_enable(tr_peerIo* io, short event)
737 {
738 TR_ASSERT(tr_amInEventThread(io->session));
739 TR_ASSERT(io->session != NULL);
740 TR_ASSERT(io->session->events != NULL);
741
742 bool const need_events = io->socket.type == TR_PEER_SOCKET_TYPE_TCP;
743
744 if (need_events)
745 {
746 TR_ASSERT(event_initialized(io->event_read));
747 TR_ASSERT(event_initialized(io->event_write));
748 }
749
750 if ((event & EV_READ) != 0 && (io->pendingEvents & EV_READ) == 0)
751 {
752 dbgmsg(io, "enabling ready-to-read polling");
753
754 if (need_events)
755 {
756 event_add(io->event_read, NULL);
757 }
758
759 io->pendingEvents |= EV_READ;
760 }
761
762 if ((event & EV_WRITE) != 0 && (io->pendingEvents & EV_WRITE) == 0)
763 {
764 dbgmsg(io, "enabling ready-to-write polling");
765
766 if (need_events)
767 {
768 event_add(io->event_write, NULL);
769 }
770
771 io->pendingEvents |= EV_WRITE;
772 }
773 }
774
event_disable(struct tr_peerIo * io,short event)775 static void event_disable(struct tr_peerIo* io, short event)
776 {
777 TR_ASSERT(tr_amInEventThread(io->session));
778 TR_ASSERT(io->session != NULL);
779 TR_ASSERT(io->session->events != NULL);
780
781 bool const need_events = io->socket.type == TR_PEER_SOCKET_TYPE_TCP;
782
783 if (need_events)
784 {
785 TR_ASSERT(event_initialized(io->event_read));
786 TR_ASSERT(event_initialized(io->event_write));
787 }
788
789 if ((event & EV_READ) != 0 && (io->pendingEvents & EV_READ) != 0)
790 {
791 dbgmsg(io, "disabling ready-to-read polling");
792
793 if (need_events)
794 {
795 event_del(io->event_read);
796 }
797
798 io->pendingEvents &= ~EV_READ;
799 }
800
801 if ((event & EV_WRITE) != 0 && (io->pendingEvents & EV_WRITE) != 0)
802 {
803 dbgmsg(io, "disabling ready-to-write polling");
804
805 if (need_events)
806 {
807 event_del(io->event_write);
808 }
809
810 io->pendingEvents &= ~EV_WRITE;
811 }
812 }
813
tr_peerIoSetEnabled(tr_peerIo * io,tr_direction dir,bool isEnabled)814 void tr_peerIoSetEnabled(tr_peerIo* io, tr_direction dir, bool isEnabled)
815 {
816 TR_ASSERT(tr_isPeerIo(io));
817 TR_ASSERT(tr_isDirection(dir));
818 TR_ASSERT(tr_amInEventThread(io->session));
819 TR_ASSERT(io->session->events != NULL);
820
821 short const event = dir == TR_UP ? EV_WRITE : EV_READ;
822
823 if (isEnabled)
824 {
825 event_enable(io, event);
826 }
827 else
828 {
829 event_disable(io, event);
830 }
831 }
832
833 /***
834 ****
835 ***/
836
io_close_socket(tr_peerIo * io)837 static void io_close_socket(tr_peerIo* io)
838 {
839 switch (io->socket.type)
840 {
841 case TR_PEER_SOCKET_TYPE_NONE:
842 break;
843
844 case TR_PEER_SOCKET_TYPE_TCP:
845 tr_netClose(io->session, io->socket.handle.tcp);
846 break;
847
848 #ifdef WITH_UTP
849
850 case TR_PEER_SOCKET_TYPE_UTP:
851 UTP_SetCallbacks(io->socket.handle.utp, &dummy_utp_function_table, NULL);
852 UTP_Close(io->socket.handle.utp);
853 break;
854
855 #endif
856
857 default:
858 TR_ASSERT_MSG(false, "unsupported peer socket type %d", io->socket.type);
859 }
860
861 io->socket = TR_PEER_SOCKET_INIT;
862
863 if (io->event_read != NULL)
864 {
865 event_free(io->event_read);
866 io->event_read = NULL;
867 }
868
869 if (io->event_write != NULL)
870 {
871 event_free(io->event_write);
872 io->event_write = NULL;
873 }
874 }
875
io_dtor(void * vio)876 static void io_dtor(void* vio)
877 {
878 tr_peerIo* io = vio;
879
880 TR_ASSERT(tr_isPeerIo(io));
881 TR_ASSERT(tr_amInEventThread(io->session));
882 TR_ASSERT(io->session->events != NULL);
883
884 dbgmsg(io, "in tr_peerIo destructor");
885 event_disable(io, EV_READ | EV_WRITE);
886 tr_bandwidthDestruct(&io->bandwidth);
887 evbuffer_free(io->outbuf);
888 evbuffer_free(io->inbuf);
889 io_close_socket(io);
890 tr_cryptoDestruct(&io->crypto);
891
892 while (io->outbuf_datatypes != NULL)
893 {
894 peer_io_pull_datatype(io);
895 }
896
897 memset(io, ~0, sizeof(tr_peerIo));
898 tr_free(io);
899 }
900
tr_peerIoFree(tr_peerIo * io)901 static void tr_peerIoFree(tr_peerIo* io)
902 {
903 if (io != NULL)
904 {
905 dbgmsg(io, "in tr_peerIoFree");
906 io->canRead = NULL;
907 io->didWrite = NULL;
908 io->gotError = NULL;
909 tr_runInEventThread(io->session, io_dtor, io);
910 }
911 }
912
tr_peerIoRefImpl(char const * file,int line,tr_peerIo * io)913 void tr_peerIoRefImpl(char const* file, int line, tr_peerIo* io)
914 {
915 TR_ASSERT(tr_isPeerIo(io));
916
917 dbgmsg(io, "%s:%d is incrementing the IO's refcount from %d to %d", file, line, io->refCount, io->refCount + 1);
918
919 ++io->refCount;
920 }
921
tr_peerIoUnrefImpl(char const * file,int line,tr_peerIo * io)922 void tr_peerIoUnrefImpl(char const* file, int line, tr_peerIo* io)
923 {
924 TR_ASSERT(tr_isPeerIo(io));
925
926 dbgmsg(io, "%s:%d is decrementing the IO's refcount from %d to %d", file, line, io->refCount, io->refCount - 1);
927
928 if (--io->refCount == 0)
929 {
930 tr_peerIoFree(io);
931 }
932 }
933
tr_peerIoGetAddress(tr_peerIo const * io,tr_port * port)934 tr_address const* tr_peerIoGetAddress(tr_peerIo const* io, tr_port* port)
935 {
936 TR_ASSERT(tr_isPeerIo(io));
937
938 if (port != NULL)
939 {
940 *port = io->port;
941 }
942
943 return &io->addr;
944 }
945
tr_peerIoAddrStr(tr_address const * addr,tr_port port)946 char const* tr_peerIoAddrStr(tr_address const* addr, tr_port port)
947 {
948 static char buf[512];
949 tr_snprintf(buf, sizeof(buf), "[%s]:%u", tr_address_to_string(addr), ntohs(port));
950 return buf;
951 }
952
tr_peerIoGetAddrStr(tr_peerIo const * io)953 char const* tr_peerIoGetAddrStr(tr_peerIo const* io)
954 {
955 return tr_isPeerIo(io) ? tr_peerIoAddrStr(&io->addr, io->port) : "error";
956 }
957
tr_peerIoSetIOFuncs(tr_peerIo * io,tr_can_read_cb readcb,tr_did_write_cb writecb,tr_net_error_cb errcb,void * userData)958 void tr_peerIoSetIOFuncs(tr_peerIo* io, tr_can_read_cb readcb, tr_did_write_cb writecb, tr_net_error_cb errcb, void* userData)
959 {
960 io->canRead = readcb;
961 io->didWrite = writecb;
962 io->gotError = errcb;
963 io->userData = userData;
964 }
965
tr_peerIoClear(tr_peerIo * io)966 void tr_peerIoClear(tr_peerIo* io)
967 {
968 tr_peerIoSetIOFuncs(io, NULL, NULL, NULL, NULL);
969 tr_peerIoSetEnabled(io, TR_UP, false);
970 tr_peerIoSetEnabled(io, TR_DOWN, false);
971 }
972
tr_peerIoReconnect(tr_peerIo * io)973 int tr_peerIoReconnect(tr_peerIo* io)
974 {
975 TR_ASSERT(tr_isPeerIo(io));
976 TR_ASSERT(!tr_peerIoIsIncoming(io));
977
978 tr_session* session = tr_peerIoGetSession(io);
979
980 short int pendingEvents = io->pendingEvents;
981 event_disable(io, EV_READ | EV_WRITE);
982
983 io_close_socket(io);
984
985 io->socket = tr_netOpenPeerSocket(session, &io->addr, io->port, io->isSeed);
986
987 if (io->socket.type != TR_PEER_SOCKET_TYPE_TCP)
988 {
989 return -1;
990 }
991
992 io->event_read = event_new(session->event_base, io->socket.handle.tcp, EV_READ, event_read_cb, io);
993 io->event_write = event_new(session->event_base, io->socket.handle.tcp, EV_WRITE, event_write_cb, io);
994
995 event_enable(io, pendingEvents);
996 tr_netSetTOS(io->socket.handle.tcp, session->peerSocketTOS, io->addr.type);
997 maybeSetCongestionAlgorithm(io->socket.handle.tcp, session->peer_congestion_algorithm);
998
999 return 0;
1000 }
1001
1002 /**
1003 ***
1004 **/
1005
tr_peerIoSetTorrentHash(tr_peerIo * io,uint8_t const * hash)1006 void tr_peerIoSetTorrentHash(tr_peerIo* io, uint8_t const* hash)
1007 {
1008 TR_ASSERT(tr_isPeerIo(io));
1009
1010 tr_cryptoSetTorrentHash(&io->crypto, hash);
1011 }
1012
tr_peerIoGetTorrentHash(tr_peerIo * io)1013 uint8_t const* tr_peerIoGetTorrentHash(tr_peerIo* io)
1014 {
1015 TR_ASSERT(tr_isPeerIo(io));
1016
1017 return tr_cryptoGetTorrentHash(&io->crypto);
1018 }
1019
tr_peerIoHasTorrentHash(tr_peerIo const * io)1020 bool tr_peerIoHasTorrentHash(tr_peerIo const* io)
1021 {
1022 TR_ASSERT(tr_isPeerIo(io));
1023
1024 return tr_cryptoHasTorrentHash(&io->crypto);
1025 }
1026
1027 /**
1028 ***
1029 **/
1030
tr_peerIoSetPeersId(tr_peerIo * io,uint8_t const * peer_id)1031 void tr_peerIoSetPeersId(tr_peerIo* io, uint8_t const* peer_id)
1032 {
1033 TR_ASSERT(tr_isPeerIo(io));
1034
1035 if ((io->peerIdIsSet = peer_id != NULL))
1036 {
1037 memcpy(io->peerId, peer_id, 20);
1038 }
1039 else
1040 {
1041 memset(io->peerId, '\0', 20);
1042 }
1043 }
1044
1045 /**
1046 ***
1047 **/
1048
getDesiredOutputBufferSize(tr_peerIo const * io,uint64_t now)1049 static unsigned int getDesiredOutputBufferSize(tr_peerIo const* io, uint64_t now)
1050 {
1051 /* this is all kind of arbitrary, but what seems to work well is
1052 * being large enough to hold the next 20 seconds' worth of input,
1053 * or a few blocks, whichever is bigger.
1054 * It's okay to tweak this as needed */
1055 unsigned int const currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps(&io->bandwidth, now, TR_UP);
1056 unsigned int const period = 15U; /* arbitrary */
1057 /* the 3 is arbitrary; the .5 is to leave room for messages */
1058 static unsigned int const ceiling = (unsigned int)(MAX_BLOCK_SIZE * 3.5);
1059 return MAX(ceiling, currentSpeed_Bps * period);
1060 }
1061
tr_peerIoGetWriteBufferSpace(tr_peerIo const * io,uint64_t now)1062 size_t tr_peerIoGetWriteBufferSpace(tr_peerIo const* io, uint64_t now)
1063 {
1064 size_t const desiredLen = getDesiredOutputBufferSize(io, now);
1065 size_t const currentLen = evbuffer_get_length(io->outbuf);
1066 size_t freeSpace = 0;
1067
1068 if (desiredLen > currentLen)
1069 {
1070 freeSpace = desiredLen - currentLen;
1071 }
1072
1073 return freeSpace;
1074 }
1075
1076 /**
1077 ***
1078 **/
1079
tr_peerIoSetEncryption(tr_peerIo * io,tr_encryption_type encryption_type)1080 void tr_peerIoSetEncryption(tr_peerIo* io, tr_encryption_type encryption_type)
1081 {
1082 TR_ASSERT(tr_isPeerIo(io));
1083 TR_ASSERT(encryption_type == PEER_ENCRYPTION_NONE || encryption_type == PEER_ENCRYPTION_RC4);
1084
1085 io->encryption_type = encryption_type;
1086 }
1087
1088 /**
1089 ***
1090 **/
1091
processBuffer(tr_crypto * crypto,struct evbuffer * buffer,size_t offset,size_t size,void (* callback)(tr_crypto *,size_t,void const *,void *))1092 static inline void processBuffer(tr_crypto* crypto, struct evbuffer* buffer, size_t offset, size_t size, void (* callback)(
1093 tr_crypto*, size_t, void const*, void*))
1094 {
1095 struct evbuffer_ptr pos;
1096 struct evbuffer_iovec iovec;
1097
1098 evbuffer_ptr_set(buffer, &pos, offset, EVBUFFER_PTR_SET);
1099
1100 do
1101 {
1102 if (evbuffer_peek(buffer, size, &pos, &iovec, 1) <= 0)
1103 {
1104 break;
1105 }
1106
1107 callback(crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base);
1108
1109 TR_ASSERT(size >= iovec.iov_len);
1110 size -= iovec.iov_len;
1111 }
1112 while (evbuffer_ptr_set(buffer, &pos, iovec.iov_len, EVBUFFER_PTR_ADD) == 0);
1113
1114 TR_ASSERT(size == 0);
1115 }
1116
addDatatype(tr_peerIo * io,size_t byteCount,bool isPieceData)1117 static void addDatatype(tr_peerIo* io, size_t byteCount, bool isPieceData)
1118 {
1119 struct tr_datatype* d;
1120 d = datatype_new();
1121 d->isPieceData = isPieceData;
1122 d->length = byteCount;
1123 peer_io_push_datatype(io, d);
1124 }
1125
maybeEncryptBuffer(tr_peerIo * io,struct evbuffer * buf,size_t offset,size_t size)1126 static inline void maybeEncryptBuffer(tr_peerIo* io, struct evbuffer* buf, size_t offset, size_t size)
1127 {
1128 if (io->encryption_type == PEER_ENCRYPTION_RC4)
1129 {
1130 processBuffer(&io->crypto, buf, offset, size, &tr_cryptoEncrypt);
1131 }
1132 }
1133
tr_peerIoWriteBuf(tr_peerIo * io,struct evbuffer * buf,bool isPieceData)1134 void tr_peerIoWriteBuf(tr_peerIo* io, struct evbuffer* buf, bool isPieceData)
1135 {
1136 size_t const byteCount = evbuffer_get_length(buf);
1137 maybeEncryptBuffer(io, buf, 0, byteCount);
1138 evbuffer_add_buffer(io->outbuf, buf);
1139 addDatatype(io, byteCount, isPieceData);
1140 }
1141
tr_peerIoWriteBytes(tr_peerIo * io,void const * bytes,size_t byteCount,bool isPieceData)1142 void tr_peerIoWriteBytes(tr_peerIo* io, void const* bytes, size_t byteCount, bool isPieceData)
1143 {
1144 struct evbuffer_iovec iovec;
1145 evbuffer_reserve_space(io->outbuf, byteCount, &iovec, 1);
1146
1147 iovec.iov_len = byteCount;
1148
1149 if (io->encryption_type == PEER_ENCRYPTION_RC4)
1150 {
1151 tr_cryptoEncrypt(&io->crypto, iovec.iov_len, bytes, iovec.iov_base);
1152 }
1153 else
1154 {
1155 memcpy(iovec.iov_base, bytes, iovec.iov_len);
1156 }
1157
1158 evbuffer_commit_space(io->outbuf, &iovec, 1);
1159
1160 addDatatype(io, byteCount, isPieceData);
1161 }
1162
1163 /***
1164 ****
1165 ***/
1166
evbuffer_add_uint8(struct evbuffer * outbuf,uint8_t byte)1167 void evbuffer_add_uint8(struct evbuffer* outbuf, uint8_t byte)
1168 {
1169 evbuffer_add(outbuf, &byte, 1);
1170 }
1171
evbuffer_add_uint16(struct evbuffer * outbuf,uint16_t addme_hs)1172 void evbuffer_add_uint16(struct evbuffer* outbuf, uint16_t addme_hs)
1173 {
1174 uint16_t const ns = htons(addme_hs);
1175 evbuffer_add(outbuf, &ns, sizeof(ns));
1176 }
1177
evbuffer_add_uint32(struct evbuffer * outbuf,uint32_t addme_hl)1178 void evbuffer_add_uint32(struct evbuffer* outbuf, uint32_t addme_hl)
1179 {
1180 uint32_t const nl = htonl(addme_hl);
1181 evbuffer_add(outbuf, &nl, sizeof(nl));
1182 }
1183
evbuffer_add_uint64(struct evbuffer * outbuf,uint64_t addme_hll)1184 void evbuffer_add_uint64(struct evbuffer* outbuf, uint64_t addme_hll)
1185 {
1186 uint64_t const nll = tr_htonll(addme_hll);
1187 evbuffer_add(outbuf, &nll, sizeof(nll));
1188 }
1189
1190 /***
1191 ****
1192 ***/
1193
maybeDecryptBuffer(tr_peerIo * io,struct evbuffer * buf,size_t offset,size_t size)1194 static inline void maybeDecryptBuffer(tr_peerIo* io, struct evbuffer* buf, size_t offset, size_t size)
1195 {
1196 if (io->encryption_type == PEER_ENCRYPTION_RC4)
1197 {
1198 processBuffer(&io->crypto, buf, offset, size, &tr_cryptoDecrypt);
1199 }
1200 }
1201
tr_peerIoReadBytesToBuf(tr_peerIo * io,struct evbuffer * inbuf,struct evbuffer * outbuf,size_t byteCount)1202 void tr_peerIoReadBytesToBuf(tr_peerIo* io, struct evbuffer* inbuf, struct evbuffer* outbuf, size_t byteCount)
1203 {
1204 TR_ASSERT(tr_isPeerIo(io));
1205 TR_ASSERT(evbuffer_get_length(inbuf) >= byteCount);
1206
1207 size_t const old_length = evbuffer_get_length(outbuf);
1208
1209 /* append it to outbuf */
1210 struct evbuffer* tmp = evbuffer_new();
1211 evbuffer_remove_buffer(inbuf, tmp, byteCount);
1212 evbuffer_add_buffer(outbuf, tmp);
1213 evbuffer_free(tmp);
1214
1215 maybeDecryptBuffer(io, outbuf, old_length, byteCount);
1216 }
1217
tr_peerIoReadBytes(tr_peerIo * io,struct evbuffer * inbuf,void * bytes,size_t byteCount)1218 void tr_peerIoReadBytes(tr_peerIo* io, struct evbuffer* inbuf, void* bytes, size_t byteCount)
1219 {
1220 TR_ASSERT(tr_isPeerIo(io));
1221 TR_ASSERT(evbuffer_get_length(inbuf) >= byteCount);
1222
1223 switch (io->encryption_type)
1224 {
1225 case PEER_ENCRYPTION_NONE:
1226 evbuffer_remove(inbuf, bytes, byteCount);
1227 break;
1228
1229 case PEER_ENCRYPTION_RC4:
1230 evbuffer_remove(inbuf, bytes, byteCount);
1231 tr_cryptoDecrypt(&io->crypto, byteCount, bytes, bytes);
1232 break;
1233
1234 default:
1235 TR_ASSERT_MSG(false, "unhandled encryption type %d", (int)io->encryption_type);
1236 }
1237 }
1238
tr_peerIoReadUint16(tr_peerIo * io,struct evbuffer * inbuf,uint16_t * setme)1239 void tr_peerIoReadUint16(tr_peerIo* io, struct evbuffer* inbuf, uint16_t* setme)
1240 {
1241 uint16_t tmp;
1242 tr_peerIoReadBytes(io, inbuf, &tmp, sizeof(uint16_t));
1243 *setme = ntohs(tmp);
1244 }
1245
tr_peerIoReadUint32(tr_peerIo * io,struct evbuffer * inbuf,uint32_t * setme)1246 void tr_peerIoReadUint32(tr_peerIo* io, struct evbuffer* inbuf, uint32_t* setme)
1247 {
1248 uint32_t tmp;
1249 tr_peerIoReadBytes(io, inbuf, &tmp, sizeof(uint32_t));
1250 *setme = ntohl(tmp);
1251 }
1252
tr_peerIoDrain(tr_peerIo * io,struct evbuffer * inbuf,size_t byteCount)1253 void tr_peerIoDrain(tr_peerIo* io, struct evbuffer* inbuf, size_t byteCount)
1254 {
1255 char buf[4096];
1256 size_t const buflen = sizeof(buf);
1257
1258 while (byteCount > 0)
1259 {
1260 size_t const thisPass = MIN(byteCount, buflen);
1261 tr_peerIoReadBytes(io, inbuf, buf, thisPass);
1262 byteCount -= thisPass;
1263 }
1264 }
1265
1266 /***
1267 ****
1268 ***/
1269
tr_peerIoTryRead(tr_peerIo * io,size_t howmuch)1270 static int tr_peerIoTryRead(tr_peerIo* io, size_t howmuch)
1271 {
1272 int res = 0;
1273
1274 if ((howmuch = tr_bandwidthClamp(&io->bandwidth, TR_DOWN, howmuch)) != 0)
1275 {
1276 switch (io->socket.type)
1277 {
1278 case TR_PEER_SOCKET_TYPE_UTP:
1279 /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1280 * It opens up the congestion window by sending an ACK (soonish)
1281 * if one was not going to be sent. */
1282 if (evbuffer_get_length(io->inbuf) == 0)
1283 {
1284 UTP_RBDrained(io->socket.handle.utp);
1285 }
1286
1287 break;
1288
1289 case TR_PEER_SOCKET_TYPE_TCP:
1290 {
1291 int e;
1292 char err_buf[512];
1293
1294 EVUTIL_SET_SOCKET_ERROR(0);
1295 res = evbuffer_read(io->inbuf, io->socket.handle.tcp, (int)howmuch);
1296 e = EVUTIL_SOCKET_ERROR();
1297
1298 dbgmsg(io, "read %d from peer (%s)", res, res == -1 ? tr_net_strerror(err_buf, sizeof(err_buf), e) : "");
1299
1300 if (evbuffer_get_length(io->inbuf) != 0)
1301 {
1302 canReadWrapper(io);
1303 }
1304
1305 if (res <= 0 && io->gotError != NULL && e != EAGAIN && e != EINTR && e != EINPROGRESS)
1306 {
1307 short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1308
1309 if (res == 0)
1310 {
1311 what |= BEV_EVENT_EOF;
1312 }
1313
1314 dbgmsg(io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e,
1315 tr_net_strerror(err_buf, sizeof(err_buf), e));
1316
1317 io->gotError(io, what, io->userData);
1318 }
1319
1320 break;
1321 }
1322
1323 default:
1324 TR_ASSERT_MSG(false, "unsupported peer socket type %d", io->socket.type);
1325 }
1326 }
1327
1328 return res;
1329 }
1330
tr_peerIoTryWrite(tr_peerIo * io,size_t howmuch)1331 static int tr_peerIoTryWrite(tr_peerIo* io, size_t howmuch)
1332 {
1333 int n = 0;
1334 size_t const old_len = evbuffer_get_length(io->outbuf);
1335 dbgmsg(io, "in tr_peerIoTryWrite %zu", howmuch);
1336
1337 if (howmuch > old_len)
1338 {
1339 howmuch = old_len;
1340 }
1341
1342 if ((howmuch = tr_bandwidthClamp(&io->bandwidth, TR_UP, howmuch)) != 0)
1343 {
1344 switch (io->socket.type)
1345 {
1346 case TR_PEER_SOCKET_TYPE_UTP:
1347 UTP_Write(io->socket.handle.utp, howmuch);
1348 n = old_len - evbuffer_get_length(io->outbuf);
1349 break;
1350
1351 case TR_PEER_SOCKET_TYPE_TCP:
1352 {
1353 int e;
1354
1355 EVUTIL_SET_SOCKET_ERROR(0);
1356 n = tr_evbuffer_write(io, io->socket.handle.tcp, howmuch);
1357 e = EVUTIL_SOCKET_ERROR();
1358
1359 if (n > 0)
1360 {
1361 didWriteWrapper(io, n);
1362 }
1363
1364 if (n < 0 && io->gotError != NULL && e != 0 && e != EPIPE && e != EAGAIN && e != EINTR && e != EINPROGRESS)
1365 {
1366 char errstr[512];
1367 short const what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1368
1369 dbgmsg(io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e,
1370 tr_net_strerror(errstr, sizeof(errstr), e));
1371
1372 io->gotError(io, what, io->userData);
1373 }
1374
1375 break;
1376 }
1377
1378 default:
1379 TR_ASSERT_MSG(false, "unsupported peer socket type %d", io->socket.type);
1380 }
1381 }
1382
1383 return n;
1384 }
1385
tr_peerIoFlush(tr_peerIo * io,tr_direction dir,size_t limit)1386 int tr_peerIoFlush(tr_peerIo* io, tr_direction dir, size_t limit)
1387 {
1388 TR_ASSERT(tr_isPeerIo(io));
1389 TR_ASSERT(tr_isDirection(dir));
1390
1391 int bytesUsed = 0;
1392
1393 if (dir == TR_DOWN)
1394 {
1395 bytesUsed = tr_peerIoTryRead(io, limit);
1396 }
1397 else
1398 {
1399 bytesUsed = tr_peerIoTryWrite(io, limit);
1400 }
1401
1402 dbgmsg(io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed);
1403 return bytesUsed;
1404 }
1405
tr_peerIoFlushOutgoingProtocolMsgs(tr_peerIo * io)1406 int tr_peerIoFlushOutgoingProtocolMsgs(tr_peerIo* io)
1407 {
1408 size_t byteCount = 0;
1409
1410 /* count up how many bytes are used by non-piece-data messages
1411 at the front of our outbound queue */
1412 for (struct tr_datatype const* it = io->outbuf_datatypes; it != NULL; it = it->next)
1413 {
1414 if (it->isPieceData)
1415 {
1416 break;
1417 }
1418 else
1419 {
1420 byteCount += it->length;
1421 }
1422 }
1423
1424 return tr_peerIoFlush(io, TR_UP, byteCount);
1425 }
1426