1 /*
2  * This file Copyright (C) 2007-2014 Mnemosyne LLC
3  *
4  * It may be used under the GNU GPL versions 2 or 3
5  * or any future license endorsed by Mnemosyne LLC.
6  *
7  */
8 
9 #include <errno.h>
10 #include <string.h>
11 
12 #include <event2/event.h>
13 #include <event2/buffer.h>
14 #include <event2/bufferevent.h>
15 
16 #include <libutp/utp.h>
17 
18 #include "transmission.h"
19 #include "session.h"
20 #include "bandwidth.h"
21 #include "log.h"
22 #include "net.h"
23 #include "peer-common.h" /* MAX_BLOCK_SIZE */
24 #include "peer-io.h"
25 #include "tr-assert.h"
26 #include "tr-utp.h"
27 #include "trevent.h" /* tr_runInEventThread() */
28 #include "utils.h"
29 
30 #ifdef _WIN32
31 #undef EAGAIN
32 #define EAGAIN WSAEWOULDBLOCK
33 #undef EINTR
34 #define EINTR WSAEINTR
35 #undef EINPROGRESS
36 #define EINPROGRESS WSAEINPROGRESS
37 #undef EPIPE
38 #define EPIPE WSAECONNRESET
39 #endif
40 
41 /* The amount of read bufferring that we allow for uTP sockets. */
42 
43 #define UTP_READ_BUFFER_SIZE (256 * 1024)
44 
guessPacketOverhead(size_t d)45 static size_t guessPacketOverhead(size_t d)
46 {
47     /**
48      * http://sd.wareonearth.com/~phil/net/overhead/
49      *
50      * TCP over Ethernet:
51      * Assuming no header compression (e.g. not PPP)
52      * Add 20 IPv4 header or 40 IPv6 header (no options)
53      * Add 20 TCP header
54      * Add 12 bytes optional TCP timestamps
55      * Max TCP Payload data rates over ethernet are thus:
56      * (1500-40)/ (38+1500) = 94.9285 %  IPv4, minimal headers
57      * (1500-52)/ (38+1500) = 94.1482 %  IPv4, TCP timestamps
58      * (1500-52)/ (42+1500) = 93.9040 %  802.1q, IPv4, TCP timestamps
59      * (1500-60)/ (38+1500) = 93.6281 %  IPv6, minimal headers
60      * (1500-72)/ (38+1500) = 92.8479 %  IPv6, TCP timestamps
61      * (1500-72)/ (42+1500) = 92.6070 %  802.1q, IPv6, ICP timestamps
62      */
63     double const assumed_payload_data_rate = 94.0;
64 
65     return (unsigned int)(d * (100.0 / assumed_payload_data_rate) - d);
66 }
67 
68 /**
69 ***
70 **/
71 
72 #define dbgmsg(io, ...) tr_logAddDeepNamed(tr_peerIoGetAddrStr(io), __VA_ARGS__)
73 
74 /**
75 ***
76 **/
77 
78 struct tr_datatype
79 {
80     struct tr_datatype* next;
81     size_t length;
82     bool isPieceData;
83 };
84 
85 static struct tr_datatype* datatype_pool = NULL;
86 
87 static struct tr_datatype const TR_DATATYPE_INIT =
88 {
89     .next = NULL,
90     .length = 0,
91     .isPieceData = false
92 };
93 
datatype_new(void)94 static struct tr_datatype* datatype_new(void)
95 {
96     struct tr_datatype* ret;
97 
98     if (datatype_pool == NULL)
99     {
100         ret = tr_new(struct tr_datatype, 1);
101     }
102     else
103     {
104         ret = datatype_pool;
105         datatype_pool = datatype_pool->next;
106     }
107 
108     *ret = TR_DATATYPE_INIT;
109     return ret;
110 }
111 
datatype_free(struct tr_datatype * datatype)112 static void datatype_free(struct tr_datatype* datatype)
113 {
114     datatype->next = datatype_pool;
115     datatype_pool = datatype;
116 }
117 
peer_io_pull_datatype(tr_peerIo * io)118 static void peer_io_pull_datatype(tr_peerIo* io)
119 {
120     struct tr_datatype* tmp;
121 
122     if ((tmp = io->outbuf_datatypes) != NULL)
123     {
124         io->outbuf_datatypes = tmp->next;
125         datatype_free(tmp);
126     }
127 }
128 
peer_io_push_datatype(tr_peerIo * io,struct tr_datatype * datatype)129 static void peer_io_push_datatype(tr_peerIo* io, struct tr_datatype* datatype)
130 {
131     struct tr_datatype* tmp;
132 
133     if ((tmp = io->outbuf_datatypes) != NULL)
134     {
135         while (tmp->next != NULL)
136         {
137             tmp = tmp->next;
138         }
139 
140         tmp->next = datatype;
141     }
142     else
143     {
144         io->outbuf_datatypes = datatype;
145     }
146 }
147 
148 /***
149 ****
150 ***/
151 
didWriteWrapper(tr_peerIo * io,unsigned int bytes_transferred)152 static void didWriteWrapper(tr_peerIo* io, unsigned int bytes_transferred)
153 {
154     while (bytes_transferred != 0 && tr_isPeerIo(io))
155     {
156         struct tr_datatype* next = io->outbuf_datatypes;
157 
158         unsigned int const payload = MIN(next->length, bytes_transferred);
159         /* For uTP sockets, the overhead is computed in utp_on_overhead. */
160         unsigned int const overhead = io->socket.type == TR_PEER_SOCKET_TYPE_TCP ? guessPacketOverhead(payload) : 0;
161         uint64_t const now = tr_time_msec();
162 
163         tr_bandwidthUsed(&io->bandwidth, TR_UP, payload, next->isPieceData, now);
164 
165         if (overhead > 0)
166         {
167             tr_bandwidthUsed(&io->bandwidth, TR_UP, overhead, false, now);
168         }
169 
170         if (io->didWrite != NULL)
171         {
172             io->didWrite(io, payload, next->isPieceData, io->userData);
173         }
174 
175         if (tr_isPeerIo(io))
176         {
177             bytes_transferred -= payload;
178             next->length -= payload;
179 
180             if (next->length == 0)
181             {
182                 peer_io_pull_datatype(io);
183             }
184         }
185     }
186 }
187 
canReadWrapper(tr_peerIo * io)188 static void canReadWrapper(tr_peerIo* io)
189 {
190     bool err = false;
191     bool done = false;
192     tr_session* session;
193 
194     dbgmsg(io, "canRead");
195 
196     tr_peerIoRef(io);
197 
198     session = io->session;
199 
200     /* try to consume the input buffer */
201     if (io->canRead != NULL)
202     {
203         uint64_t const now = tr_time_msec();
204 
205         tr_sessionLock(session);
206 
207         while (!done && !err)
208         {
209             size_t piece = 0;
210             size_t const oldLen = evbuffer_get_length(io->inbuf);
211             int const ret = io->canRead(io, io->userData, &piece);
212             size_t const used = oldLen - evbuffer_get_length(io->inbuf);
213             unsigned int const overhead = guessPacketOverhead(used);
214 
215             if (piece != 0 || piece != used)
216             {
217                 if (piece != 0)
218                 {
219                     tr_bandwidthUsed(&io->bandwidth, TR_DOWN, piece, true, now);
220                 }
221 
222                 if (used != piece)
223                 {
224                     tr_bandwidthUsed(&io->bandwidth, TR_DOWN, used - piece, false, now);
225                 }
226             }
227 
228             if (overhead > 0)
229             {
230                 tr_bandwidthUsed(&io->bandwidth, TR_UP, overhead, false, now);
231             }
232 
233             switch (ret)
234             {
235             case READ_NOW:
236                 if (evbuffer_get_length(io->inbuf) != 0)
237                 {
238                     continue;
239                 }
240 
241                 done = true;
242                 break;
243 
244             case READ_LATER:
245                 done = true;
246                 break;
247 
248             case READ_ERR:
249                 err = true;
250                 break;
251             }
252 
253             TR_ASSERT(tr_isPeerIo(io));
254         }
255 
256         tr_sessionUnlock(session);
257     }
258 
259     tr_peerIoUnref(io);
260 }
261 
event_read_cb(evutil_socket_t fd,short event UNUSED,void * vio)262 static void event_read_cb(evutil_socket_t fd, short event UNUSED, void* vio)
263 {
264     tr_peerIo* io = vio;
265 
266     TR_ASSERT(tr_isPeerIo(io));
267     TR_ASSERT(io->socket.type == TR_PEER_SOCKET_TYPE_TCP);
268 
269     int res;
270     int e;
271 
272     /* Limit the input buffer to 256K, so it doesn't grow too large */
273     unsigned int howmuch;
274     unsigned int curlen;
275     tr_direction const dir = TR_DOWN;
276     unsigned int const max = 256 * 1024;
277 
278     io->pendingEvents &= ~EV_READ;
279 
280     curlen = evbuffer_get_length(io->inbuf);
281     howmuch = curlen >= max ? 0 : max - curlen;
282     howmuch = tr_bandwidthClamp(&io->bandwidth, TR_DOWN, howmuch);
283 
284     dbgmsg(io, "libevent says this peer is ready to read");
285 
286     /* if we don't have any bandwidth left, stop reading */
287     if (howmuch < 1)
288     {
289         tr_peerIoSetEnabled(io, dir, false);
290         return;
291     }
292 
293     EVUTIL_SET_SOCKET_ERROR(0);
294     res = evbuffer_read(io->inbuf, fd, (int)howmuch);
295     e = EVUTIL_SOCKET_ERROR();
296 
297     if (res > 0)
298     {
299         tr_peerIoSetEnabled(io, dir, true);
300 
301         /* Invoke the user callback - must always be called last */
302         canReadWrapper(io);
303     }
304     else
305     {
306         char errstr[512];
307         short what = BEV_EVENT_READING;
308 
309         if (res == 0) /* EOF */
310         {
311             what |= BEV_EVENT_EOF;
312         }
313         else if (res == -1)
314         {
315             if (e == EAGAIN || e == EINTR)
316             {
317                 tr_peerIoSetEnabled(io, dir, true);
318                 return;
319             }
320 
321             what |= BEV_EVENT_ERROR;
322         }
323 
324         dbgmsg(io, "event_read_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e,
325             tr_net_strerror(errstr, sizeof(errstr), e));
326 
327         if (io->gotError != NULL)
328         {
329             io->gotError(io, what, io->userData);
330         }
331     }
332 }
333 
tr_evbuffer_write(tr_peerIo * io,int fd,size_t howmuch)334 static int tr_evbuffer_write(tr_peerIo* io, int fd, size_t howmuch)
335 {
336     int e;
337     int n;
338     char errstr[256];
339 
340     EVUTIL_SET_SOCKET_ERROR(0);
341     n = evbuffer_write_atmost(io->outbuf, fd, howmuch);
342     e = EVUTIL_SOCKET_ERROR();
343     dbgmsg(io, "wrote %d to peer (%s)", n, (n == -1 ? tr_net_strerror(errstr, sizeof(errstr), e) : ""));
344 
345     return n;
346 }
347 
event_write_cb(evutil_socket_t fd,short event UNUSED,void * vio)348 static void event_write_cb(evutil_socket_t fd, short event UNUSED, void* vio)
349 {
350     tr_peerIo* io = vio;
351 
352     TR_ASSERT(tr_isPeerIo(io));
353     TR_ASSERT(io->socket.type == TR_PEER_SOCKET_TYPE_TCP);
354 
355     int res = 0;
356     int e;
357     short what = BEV_EVENT_WRITING;
358     size_t howmuch;
359     tr_direction const dir = TR_UP;
360     char errstr[1024];
361 
362     io->pendingEvents &= ~EV_WRITE;
363 
364     dbgmsg(io, "libevent says this peer is ready to write");
365 
366     /* Write as much as possible, since the socket is non-blocking, write() will
367      * return if it can't write any more data without blocking */
368     howmuch = tr_bandwidthClamp(&io->bandwidth, dir, evbuffer_get_length(io->outbuf));
369 
370     /* if we don't have any bandwidth left, stop writing */
371     if (howmuch < 1)
372     {
373         tr_peerIoSetEnabled(io, dir, false);
374         return;
375     }
376 
377     EVUTIL_SET_SOCKET_ERROR(0);
378     res = tr_evbuffer_write(io, fd, howmuch);
379     e = EVUTIL_SOCKET_ERROR();
380 
381     if (res == -1)
382     {
383         if (e == 0 || e == EAGAIN || e == EINTR || e == EINPROGRESS)
384         {
385             goto reschedule;
386         }
387 
388         /* error case */
389         what |= BEV_EVENT_ERROR;
390     }
391     else if (res == 0)
392     {
393         /* eof case */
394         what |= BEV_EVENT_EOF;
395     }
396 
397     if (res <= 0)
398     {
399         goto error;
400     }
401 
402     if (evbuffer_get_length(io->outbuf) != 0)
403     {
404         tr_peerIoSetEnabled(io, dir, true);
405     }
406 
407     didWriteWrapper(io, res);
408     return;
409 
410 reschedule:
411     if (evbuffer_get_length(io->outbuf) != 0)
412     {
413         tr_peerIoSetEnabled(io, dir, true);
414     }
415 
416     return;
417 
418 error:
419     tr_net_strerror(errstr, sizeof(errstr), e);
420     dbgmsg(io, "event_write_cb got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e, errstr);
421 
422     if (io->gotError != NULL)
423     {
424         io->gotError(io, what, io->userData);
425     }
426 }
427 
428 /**
429 ***
430 **/
431 
maybeSetCongestionAlgorithm(tr_socket_t socket,char const * algorithm)432 static void maybeSetCongestionAlgorithm(tr_socket_t socket, char const* algorithm)
433 {
434     if (!tr_str_is_empty(algorithm))
435     {
436         tr_netSetCongestionControl(socket, algorithm);
437     }
438 }
439 
440 #ifdef WITH_UTP
441 /* UTP callbacks */
442 
utp_on_read(void * closure,unsigned char const * buf,size_t buflen)443 static void utp_on_read(void* closure, unsigned char const* buf, size_t buflen)
444 {
445     tr_peerIo* io = closure;
446 
447     TR_ASSERT(tr_isPeerIo(io));
448 
449     int rc = evbuffer_add(io->inbuf, buf, buflen);
450     dbgmsg(io, "utp_on_read got %zu bytes", buflen);
451 
452     if (rc < 0)
453     {
454         tr_logAddNamedError("UTP", "On read evbuffer_add");
455         return;
456     }
457 
458     tr_peerIoSetEnabled(io, TR_DOWN, true);
459     canReadWrapper(io);
460 }
461 
utp_on_write(void * closure,unsigned char * buf,size_t buflen)462 static void utp_on_write(void* closure, unsigned char* buf, size_t buflen)
463 {
464     tr_peerIo* io = closure;
465 
466     TR_ASSERT(tr_isPeerIo(io));
467 
468     int rc = evbuffer_remove(io->outbuf, buf, buflen);
469     dbgmsg(io, "utp_on_write sending %zu bytes... evbuffer_remove returned %d", buflen, rc);
470     TR_ASSERT(rc == (int)buflen); /* if this fails, we've corrupted our bookkeeping somewhere */
471 
472     if (rc < (long)buflen)
473     {
474         tr_logAddNamedError("UTP", "Short write: %d < %ld", rc, (long)buflen);
475     }
476 
477     didWriteWrapper(io, buflen);
478 }
479 
utp_get_rb_size(void * closure)480 static size_t utp_get_rb_size(void* closure)
481 {
482     tr_peerIo* io = closure;
483 
484     TR_ASSERT(tr_isPeerIo(io));
485 
486     size_t bytes = tr_bandwidthClamp(&io->bandwidth, TR_DOWN, UTP_READ_BUFFER_SIZE);
487 
488     dbgmsg(io, "utp_get_rb_size is saying it's ready to read %zu bytes", bytes);
489     return UTP_READ_BUFFER_SIZE - bytes;
490 }
491 
492 static int tr_peerIoTryWrite(tr_peerIo* io, size_t howmuch);
493 
utp_on_writable(tr_peerIo * io)494 static void utp_on_writable(tr_peerIo* io)
495 {
496     int n;
497 
498     dbgmsg(io, "libutp says this peer is ready to write");
499 
500     n = tr_peerIoTryWrite(io, SIZE_MAX);
501     tr_peerIoSetEnabled(io, TR_UP, n != 0 && evbuffer_get_length(io->outbuf) != 0);
502 }
503 
utp_on_state_change(void * closure,int state)504 static void utp_on_state_change(void* closure, int state)
505 {
506     tr_peerIo* io = closure;
507 
508     TR_ASSERT(tr_isPeerIo(io));
509 
510     if (state == UTP_STATE_CONNECT)
511     {
512         dbgmsg(io, "utp_on_state_change -- changed to connected");
513         io->utpSupported = true;
514     }
515     else if (state == UTP_STATE_WRITABLE)
516     {
517         dbgmsg(io, "utp_on_state_change -- changed to writable");
518 
519         if ((io->pendingEvents & EV_WRITE) != 0)
520         {
521             utp_on_writable(io);
522         }
523     }
524     else if (state == UTP_STATE_EOF)
525     {
526         if (io->gotError != NULL)
527         {
528             io->gotError(io, BEV_EVENT_EOF, io->userData);
529         }
530     }
531     else if (state == UTP_STATE_DESTROYING)
532     {
533         tr_logAddNamedError("UTP", "Impossible state UTP_STATE_DESTROYING");
534         return;
535     }
536     else
537     {
538         tr_logAddNamedError("UTP", "Unknown state %d", state);
539     }
540 }
541 
utp_on_error(void * closure,int errcode)542 static void utp_on_error(void* closure, int errcode)
543 {
544     tr_peerIo* io = closure;
545 
546     TR_ASSERT(tr_isPeerIo(io));
547 
548     dbgmsg(io, "utp_on_error -- errcode is %d", errcode);
549 
550     if (io->gotError != NULL)
551     {
552         errno = errcode;
553         io->gotError(io, BEV_EVENT_ERROR, io->userData);
554     }
555 }
556 
utp_on_overhead(void * closure,uint8_t send,size_t count,int type UNUSED)557 static void utp_on_overhead(void* closure, uint8_t /* bool */ send, size_t count, int type UNUSED)
558 {
559     tr_peerIo* io = closure;
560 
561     TR_ASSERT(tr_isPeerIo(io));
562 
563     dbgmsg(io, "utp_on_overhead -- count is %zu", count);
564 
565     tr_bandwidthUsed(&io->bandwidth, send ? TR_UP : TR_DOWN, count, false, tr_time_msec());
566 }
567 
568 static struct UTPFunctionTable utp_function_table =
569 {
570     .on_read = utp_on_read,
571     .on_write = utp_on_write,
572     .get_rb_size = utp_get_rb_size,
573     .on_state = utp_on_state_change,
574     .on_error = utp_on_error,
575     .on_overhead = utp_on_overhead
576 };
577 
578 /* Dummy UTP callbacks. */
579 /* We switch a UTP socket to use these after the associated peerIo has been
580    destroyed -- see io_dtor. */
581 
dummy_read(void * closure UNUSED,unsigned char const * buf UNUSED,size_t buflen UNUSED)582 static void dummy_read(void* closure UNUSED, unsigned char const* buf UNUSED, size_t buflen UNUSED)
583 {
584     /* This cannot happen, as far as I'm aware. */
585     tr_logAddNamedError("UTP", "On_read called on closed socket");
586 }
587 
dummy_write(void * closure UNUSED,unsigned char * buf,size_t buflen)588 static void dummy_write(void* closure UNUSED, unsigned char* buf, size_t buflen)
589 {
590     /* This can very well happen if we've shut down a peer connection that
591        had unflushed buffers.  Complain and send zeroes. */
592     tr_logAddNamedDbg("UTP", "On_write called on closed socket");
593     memset(buf, 0, buflen);
594 }
595 
dummy_get_rb_size(void * closure UNUSED)596 static size_t dummy_get_rb_size(void* closure UNUSED)
597 {
598     return 0;
599 }
600 
dummy_on_state_change(void * closure UNUSED,int state UNUSED)601 static void dummy_on_state_change(void* closure UNUSED, int state UNUSED)
602 {
603 }
604 
dummy_on_error(void * closure UNUSED,int errcode UNUSED)605 static void dummy_on_error(void* closure UNUSED, int errcode UNUSED)
606 {
607 }
608 
dummy_on_overhead(void * closure UNUSED,uint8_t send UNUSED,size_t count UNUSED,int type UNUSED)609 static void dummy_on_overhead(void* closure UNUSED, uint8_t /* bool */ send UNUSED, size_t count UNUSED, int type UNUSED)
610 {
611 }
612 
613 static struct UTPFunctionTable dummy_utp_function_table =
614 {
615     .on_read = dummy_read,
616     .on_write = dummy_write,
617     .get_rb_size = dummy_get_rb_size,
618     .on_state = dummy_on_state_change,
619     .on_error = dummy_on_error,
620     .on_overhead = dummy_on_overhead
621 };
622 
623 #endif /* #ifdef WITH_UTP */
624 
tr_peerIoNew(tr_session * session,tr_bandwidth * parent,tr_address const * addr,tr_port port,uint8_t const * torrentHash,bool isIncoming,bool isSeed,struct tr_peer_socket const socket)625 static tr_peerIo* tr_peerIoNew(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port,
626     uint8_t const* torrentHash, bool isIncoming, bool isSeed, struct tr_peer_socket const socket)
627 {
628     TR_ASSERT(session != NULL);
629     TR_ASSERT(session->events != NULL);
630     TR_ASSERT(tr_amInEventThread(session));
631 
632 #ifdef WITH_UTP
633     TR_ASSERT(socket.type == TR_PEER_SOCKET_TYPE_TCP || socket.type == TR_PEER_SOCKET_TYPE_UTP);
634 #else
635     TR_ASSERT(socket.type == TR_PEER_SOCKET_TYPE_TCP);
636 #endif
637 
638     if (socket.type == TR_PEER_SOCKET_TYPE_TCP)
639     {
640         tr_netSetTOS(socket.handle.tcp, session->peerSocketTOS, addr->type);
641         maybeSetCongestionAlgorithm(socket.handle.tcp, session->peer_congestion_algorithm);
642     }
643 
644     tr_peerIo* io = tr_new0(tr_peerIo, 1);
645     io->magicNumber = PEER_IO_MAGIC_NUMBER;
646     io->refCount = 1;
647     tr_cryptoConstruct(&io->crypto, torrentHash, isIncoming);
648     io->session = session;
649     io->addr = *addr;
650     io->isSeed = isSeed;
651     io->port = port;
652     io->socket = socket;
653     io->isIncoming = isIncoming;
654     io->timeCreated = tr_time();
655     io->inbuf = evbuffer_new();
656     io->outbuf = evbuffer_new();
657     tr_bandwidthConstruct(&io->bandwidth, session, parent);
658     tr_bandwidthSetPeer(&io->bandwidth, io);
659     dbgmsg(io, "bandwidth is %p; its parent is %p", (void*)&io->bandwidth, (void*)parent);
660 
661     switch (socket.type)
662     {
663     case TR_PEER_SOCKET_TYPE_TCP:
664         dbgmsg(io, "socket (tcp) is %" PRIdMAX, (intmax_t)socket.handle.tcp);
665         io->event_read = event_new(session->event_base, socket.handle.tcp, EV_READ, event_read_cb, io);
666         io->event_write = event_new(session->event_base, socket.handle.tcp, EV_WRITE, event_write_cb, io);
667         break;
668 
669 #ifdef WITH_UTP
670 
671     case TR_PEER_SOCKET_TYPE_UTP:
672         dbgmsg(io, "socket (utp) is %p", (void*)socket.handle.utp);
673         UTP_SetSockopt(socket.handle.utp, SO_RCVBUF, UTP_READ_BUFFER_SIZE);
674         dbgmsg(io, "%s", "calling UTP_SetCallbacks &utp_function_table");
675         UTP_SetCallbacks(socket.handle.utp, &utp_function_table, io);
676 
677         if (!isIncoming)
678         {
679             dbgmsg(io, "%s", "calling UTP_Connect");
680             UTP_Connect(socket.handle.utp);
681         }
682 
683         break;
684 
685 #endif
686 
687     default:
688         TR_ASSERT_MSG(false, "unsupported peer socket type %d", socket.type);
689     }
690 
691     return io;
692 }
693 
tr_peerIoNewIncoming(tr_session * session,tr_bandwidth * parent,tr_address const * addr,tr_port port,struct tr_peer_socket const socket)694 tr_peerIo* tr_peerIoNewIncoming(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port,
695     struct tr_peer_socket const socket)
696 {
697     TR_ASSERT(session != NULL);
698     TR_ASSERT(tr_address_is_valid(addr));
699 
700     return tr_peerIoNew(session, parent, addr, port, NULL, true, false, socket);
701 }
702 
tr_peerIoNewOutgoing(tr_session * session,tr_bandwidth * parent,tr_address const * addr,tr_port port,uint8_t const * torrentHash,bool isSeed,bool utp)703 tr_peerIo* tr_peerIoNewOutgoing(tr_session* session, tr_bandwidth* parent, tr_address const* addr, tr_port port,
704     uint8_t const* torrentHash, bool isSeed, bool utp)
705 {
706     TR_ASSERT(session != NULL);
707     TR_ASSERT(tr_address_is_valid(addr));
708     TR_ASSERT(torrentHash != NULL);
709 
710     struct tr_peer_socket socket = TR_PEER_SOCKET_INIT;
711 
712     if (utp)
713     {
714         socket = tr_netOpenPeerUTPSocket(session, addr, port, isSeed);
715     }
716 
717     if (socket.type == TR_PEER_SOCKET_TYPE_NONE)
718     {
719         socket = tr_netOpenPeerSocket(session, addr, port, isSeed);
720         dbgmsg(NULL, "tr_netOpenPeerSocket returned fd %" PRIdMAX, (intmax_t)(socket.type != TR_PEER_SOCKET_TYPE_NONE ?
721             socket.handle.tcp : TR_BAD_SOCKET));
722     }
723 
724     if (socket.type == TR_PEER_SOCKET_TYPE_NONE)
725     {
726         return NULL;
727     }
728 
729     return tr_peerIoNew(session, parent, addr, port, torrentHash, false, isSeed, socket);
730 }
731 
732 /***
733 ****
734 ***/
735 
event_enable(tr_peerIo * io,short event)736 static void event_enable(tr_peerIo* io, short event)
737 {
738     TR_ASSERT(tr_amInEventThread(io->session));
739     TR_ASSERT(io->session != NULL);
740     TR_ASSERT(io->session->events != NULL);
741 
742     bool const need_events = io->socket.type == TR_PEER_SOCKET_TYPE_TCP;
743 
744     if (need_events)
745     {
746         TR_ASSERT(event_initialized(io->event_read));
747         TR_ASSERT(event_initialized(io->event_write));
748     }
749 
750     if ((event & EV_READ) != 0 && (io->pendingEvents & EV_READ) == 0)
751     {
752         dbgmsg(io, "enabling ready-to-read polling");
753 
754         if (need_events)
755         {
756             event_add(io->event_read, NULL);
757         }
758 
759         io->pendingEvents |= EV_READ;
760     }
761 
762     if ((event & EV_WRITE) != 0 && (io->pendingEvents & EV_WRITE) == 0)
763     {
764         dbgmsg(io, "enabling ready-to-write polling");
765 
766         if (need_events)
767         {
768             event_add(io->event_write, NULL);
769         }
770 
771         io->pendingEvents |= EV_WRITE;
772     }
773 }
774 
event_disable(struct tr_peerIo * io,short event)775 static void event_disable(struct tr_peerIo* io, short event)
776 {
777     TR_ASSERT(tr_amInEventThread(io->session));
778     TR_ASSERT(io->session != NULL);
779     TR_ASSERT(io->session->events != NULL);
780 
781     bool const need_events = io->socket.type == TR_PEER_SOCKET_TYPE_TCP;
782 
783     if (need_events)
784     {
785         TR_ASSERT(event_initialized(io->event_read));
786         TR_ASSERT(event_initialized(io->event_write));
787     }
788 
789     if ((event & EV_READ) != 0 && (io->pendingEvents & EV_READ) != 0)
790     {
791         dbgmsg(io, "disabling ready-to-read polling");
792 
793         if (need_events)
794         {
795             event_del(io->event_read);
796         }
797 
798         io->pendingEvents &= ~EV_READ;
799     }
800 
801     if ((event & EV_WRITE) != 0 && (io->pendingEvents & EV_WRITE) != 0)
802     {
803         dbgmsg(io, "disabling ready-to-write polling");
804 
805         if (need_events)
806         {
807             event_del(io->event_write);
808         }
809 
810         io->pendingEvents &= ~EV_WRITE;
811     }
812 }
813 
tr_peerIoSetEnabled(tr_peerIo * io,tr_direction dir,bool isEnabled)814 void tr_peerIoSetEnabled(tr_peerIo* io, tr_direction dir, bool isEnabled)
815 {
816     TR_ASSERT(tr_isPeerIo(io));
817     TR_ASSERT(tr_isDirection(dir));
818     TR_ASSERT(tr_amInEventThread(io->session));
819     TR_ASSERT(io->session->events != NULL);
820 
821     short const event = dir == TR_UP ? EV_WRITE : EV_READ;
822 
823     if (isEnabled)
824     {
825         event_enable(io, event);
826     }
827     else
828     {
829         event_disable(io, event);
830     }
831 }
832 
833 /***
834 ****
835 ***/
836 
io_close_socket(tr_peerIo * io)837 static void io_close_socket(tr_peerIo* io)
838 {
839     switch (io->socket.type)
840     {
841     case TR_PEER_SOCKET_TYPE_NONE:
842         break;
843 
844     case TR_PEER_SOCKET_TYPE_TCP:
845         tr_netClose(io->session, io->socket.handle.tcp);
846         break;
847 
848 #ifdef WITH_UTP
849 
850     case TR_PEER_SOCKET_TYPE_UTP:
851         UTP_SetCallbacks(io->socket.handle.utp, &dummy_utp_function_table, NULL);
852         UTP_Close(io->socket.handle.utp);
853         break;
854 
855 #endif
856 
857     default:
858         TR_ASSERT_MSG(false, "unsupported peer socket type %d", io->socket.type);
859     }
860 
861     io->socket = TR_PEER_SOCKET_INIT;
862 
863     if (io->event_read != NULL)
864     {
865         event_free(io->event_read);
866         io->event_read = NULL;
867     }
868 
869     if (io->event_write != NULL)
870     {
871         event_free(io->event_write);
872         io->event_write = NULL;
873     }
874 }
875 
io_dtor(void * vio)876 static void io_dtor(void* vio)
877 {
878     tr_peerIo* io = vio;
879 
880     TR_ASSERT(tr_isPeerIo(io));
881     TR_ASSERT(tr_amInEventThread(io->session));
882     TR_ASSERT(io->session->events != NULL);
883 
884     dbgmsg(io, "in tr_peerIo destructor");
885     event_disable(io, EV_READ | EV_WRITE);
886     tr_bandwidthDestruct(&io->bandwidth);
887     evbuffer_free(io->outbuf);
888     evbuffer_free(io->inbuf);
889     io_close_socket(io);
890     tr_cryptoDestruct(&io->crypto);
891 
892     while (io->outbuf_datatypes != NULL)
893     {
894         peer_io_pull_datatype(io);
895     }
896 
897     memset(io, ~0, sizeof(tr_peerIo));
898     tr_free(io);
899 }
900 
tr_peerIoFree(tr_peerIo * io)901 static void tr_peerIoFree(tr_peerIo* io)
902 {
903     if (io != NULL)
904     {
905         dbgmsg(io, "in tr_peerIoFree");
906         io->canRead = NULL;
907         io->didWrite = NULL;
908         io->gotError = NULL;
909         tr_runInEventThread(io->session, io_dtor, io);
910     }
911 }
912 
tr_peerIoRefImpl(char const * file,int line,tr_peerIo * io)913 void tr_peerIoRefImpl(char const* file, int line, tr_peerIo* io)
914 {
915     TR_ASSERT(tr_isPeerIo(io));
916 
917     dbgmsg(io, "%s:%d is incrementing the IO's refcount from %d to %d", file, line, io->refCount, io->refCount + 1);
918 
919     ++io->refCount;
920 }
921 
tr_peerIoUnrefImpl(char const * file,int line,tr_peerIo * io)922 void tr_peerIoUnrefImpl(char const* file, int line, tr_peerIo* io)
923 {
924     TR_ASSERT(tr_isPeerIo(io));
925 
926     dbgmsg(io, "%s:%d is decrementing the IO's refcount from %d to %d", file, line, io->refCount, io->refCount - 1);
927 
928     if (--io->refCount == 0)
929     {
930         tr_peerIoFree(io);
931     }
932 }
933 
tr_peerIoGetAddress(tr_peerIo const * io,tr_port * port)934 tr_address const* tr_peerIoGetAddress(tr_peerIo const* io, tr_port* port)
935 {
936     TR_ASSERT(tr_isPeerIo(io));
937 
938     if (port != NULL)
939     {
940         *port = io->port;
941     }
942 
943     return &io->addr;
944 }
945 
tr_peerIoAddrStr(tr_address const * addr,tr_port port)946 char const* tr_peerIoAddrStr(tr_address const* addr, tr_port port)
947 {
948     static char buf[512];
949     tr_snprintf(buf, sizeof(buf), "[%s]:%u", tr_address_to_string(addr), ntohs(port));
950     return buf;
951 }
952 
tr_peerIoGetAddrStr(tr_peerIo const * io)953 char const* tr_peerIoGetAddrStr(tr_peerIo const* io)
954 {
955     return tr_isPeerIo(io) ? tr_peerIoAddrStr(&io->addr, io->port) : "error";
956 }
957 
tr_peerIoSetIOFuncs(tr_peerIo * io,tr_can_read_cb readcb,tr_did_write_cb writecb,tr_net_error_cb errcb,void * userData)958 void tr_peerIoSetIOFuncs(tr_peerIo* io, tr_can_read_cb readcb, tr_did_write_cb writecb, tr_net_error_cb errcb, void* userData)
959 {
960     io->canRead = readcb;
961     io->didWrite = writecb;
962     io->gotError = errcb;
963     io->userData = userData;
964 }
965 
tr_peerIoClear(tr_peerIo * io)966 void tr_peerIoClear(tr_peerIo* io)
967 {
968     tr_peerIoSetIOFuncs(io, NULL, NULL, NULL, NULL);
969     tr_peerIoSetEnabled(io, TR_UP, false);
970     tr_peerIoSetEnabled(io, TR_DOWN, false);
971 }
972 
tr_peerIoReconnect(tr_peerIo * io)973 int tr_peerIoReconnect(tr_peerIo* io)
974 {
975     TR_ASSERT(tr_isPeerIo(io));
976     TR_ASSERT(!tr_peerIoIsIncoming(io));
977 
978     tr_session* session = tr_peerIoGetSession(io);
979 
980     short int pendingEvents = io->pendingEvents;
981     event_disable(io, EV_READ | EV_WRITE);
982 
983     io_close_socket(io);
984 
985     io->socket = tr_netOpenPeerSocket(session, &io->addr, io->port, io->isSeed);
986 
987     if (io->socket.type != TR_PEER_SOCKET_TYPE_TCP)
988     {
989         return -1;
990     }
991 
992     io->event_read = event_new(session->event_base, io->socket.handle.tcp, EV_READ, event_read_cb, io);
993     io->event_write = event_new(session->event_base, io->socket.handle.tcp, EV_WRITE, event_write_cb, io);
994 
995     event_enable(io, pendingEvents);
996     tr_netSetTOS(io->socket.handle.tcp, session->peerSocketTOS, io->addr.type);
997     maybeSetCongestionAlgorithm(io->socket.handle.tcp, session->peer_congestion_algorithm);
998 
999     return 0;
1000 }
1001 
1002 /**
1003 ***
1004 **/
1005 
tr_peerIoSetTorrentHash(tr_peerIo * io,uint8_t const * hash)1006 void tr_peerIoSetTorrentHash(tr_peerIo* io, uint8_t const* hash)
1007 {
1008     TR_ASSERT(tr_isPeerIo(io));
1009 
1010     tr_cryptoSetTorrentHash(&io->crypto, hash);
1011 }
1012 
tr_peerIoGetTorrentHash(tr_peerIo * io)1013 uint8_t const* tr_peerIoGetTorrentHash(tr_peerIo* io)
1014 {
1015     TR_ASSERT(tr_isPeerIo(io));
1016 
1017     return tr_cryptoGetTorrentHash(&io->crypto);
1018 }
1019 
tr_peerIoHasTorrentHash(tr_peerIo const * io)1020 bool tr_peerIoHasTorrentHash(tr_peerIo const* io)
1021 {
1022     TR_ASSERT(tr_isPeerIo(io));
1023 
1024     return tr_cryptoHasTorrentHash(&io->crypto);
1025 }
1026 
1027 /**
1028 ***
1029 **/
1030 
tr_peerIoSetPeersId(tr_peerIo * io,uint8_t const * peer_id)1031 void tr_peerIoSetPeersId(tr_peerIo* io, uint8_t const* peer_id)
1032 {
1033     TR_ASSERT(tr_isPeerIo(io));
1034 
1035     if ((io->peerIdIsSet = peer_id != NULL))
1036     {
1037         memcpy(io->peerId, peer_id, 20);
1038     }
1039     else
1040     {
1041         memset(io->peerId, '\0', 20);
1042     }
1043 }
1044 
1045 /**
1046 ***
1047 **/
1048 
getDesiredOutputBufferSize(tr_peerIo const * io,uint64_t now)1049 static unsigned int getDesiredOutputBufferSize(tr_peerIo const* io, uint64_t now)
1050 {
1051     /* this is all kind of arbitrary, but what seems to work well is
1052      * being large enough to hold the next 20 seconds' worth of input,
1053      * or a few blocks, whichever is bigger.
1054      * It's okay to tweak this as needed */
1055     unsigned int const currentSpeed_Bps = tr_bandwidthGetPieceSpeed_Bps(&io->bandwidth, now, TR_UP);
1056     unsigned int const period = 15U; /* arbitrary */
1057     /* the 3 is arbitrary; the .5 is to leave room for messages */
1058     static unsigned int const ceiling = (unsigned int)(MAX_BLOCK_SIZE * 3.5);
1059     return MAX(ceiling, currentSpeed_Bps * period);
1060 }
1061 
tr_peerIoGetWriteBufferSpace(tr_peerIo const * io,uint64_t now)1062 size_t tr_peerIoGetWriteBufferSpace(tr_peerIo const* io, uint64_t now)
1063 {
1064     size_t const desiredLen = getDesiredOutputBufferSize(io, now);
1065     size_t const currentLen = evbuffer_get_length(io->outbuf);
1066     size_t freeSpace = 0;
1067 
1068     if (desiredLen > currentLen)
1069     {
1070         freeSpace = desiredLen - currentLen;
1071     }
1072 
1073     return freeSpace;
1074 }
1075 
1076 /**
1077 ***
1078 **/
1079 
tr_peerIoSetEncryption(tr_peerIo * io,tr_encryption_type encryption_type)1080 void tr_peerIoSetEncryption(tr_peerIo* io, tr_encryption_type encryption_type)
1081 {
1082     TR_ASSERT(tr_isPeerIo(io));
1083     TR_ASSERT(encryption_type == PEER_ENCRYPTION_NONE || encryption_type == PEER_ENCRYPTION_RC4);
1084 
1085     io->encryption_type = encryption_type;
1086 }
1087 
1088 /**
1089 ***
1090 **/
1091 
processBuffer(tr_crypto * crypto,struct evbuffer * buffer,size_t offset,size_t size,void (* callback)(tr_crypto *,size_t,void const *,void *))1092 static inline void processBuffer(tr_crypto* crypto, struct evbuffer* buffer, size_t offset, size_t size, void (* callback)(
1093     tr_crypto*, size_t, void const*, void*))
1094 {
1095     struct evbuffer_ptr pos;
1096     struct evbuffer_iovec iovec;
1097 
1098     evbuffer_ptr_set(buffer, &pos, offset, EVBUFFER_PTR_SET);
1099 
1100     do
1101     {
1102         if (evbuffer_peek(buffer, size, &pos, &iovec, 1) <= 0)
1103         {
1104             break;
1105         }
1106 
1107         callback(crypto, iovec.iov_len, iovec.iov_base, iovec.iov_base);
1108 
1109         TR_ASSERT(size >= iovec.iov_len);
1110         size -= iovec.iov_len;
1111     }
1112     while (evbuffer_ptr_set(buffer, &pos, iovec.iov_len, EVBUFFER_PTR_ADD) == 0);
1113 
1114     TR_ASSERT(size == 0);
1115 }
1116 
addDatatype(tr_peerIo * io,size_t byteCount,bool isPieceData)1117 static void addDatatype(tr_peerIo* io, size_t byteCount, bool isPieceData)
1118 {
1119     struct tr_datatype* d;
1120     d = datatype_new();
1121     d->isPieceData = isPieceData;
1122     d->length = byteCount;
1123     peer_io_push_datatype(io, d);
1124 }
1125 
maybeEncryptBuffer(tr_peerIo * io,struct evbuffer * buf,size_t offset,size_t size)1126 static inline void maybeEncryptBuffer(tr_peerIo* io, struct evbuffer* buf, size_t offset, size_t size)
1127 {
1128     if (io->encryption_type == PEER_ENCRYPTION_RC4)
1129     {
1130         processBuffer(&io->crypto, buf, offset, size, &tr_cryptoEncrypt);
1131     }
1132 }
1133 
tr_peerIoWriteBuf(tr_peerIo * io,struct evbuffer * buf,bool isPieceData)1134 void tr_peerIoWriteBuf(tr_peerIo* io, struct evbuffer* buf, bool isPieceData)
1135 {
1136     size_t const byteCount = evbuffer_get_length(buf);
1137     maybeEncryptBuffer(io, buf, 0, byteCount);
1138     evbuffer_add_buffer(io->outbuf, buf);
1139     addDatatype(io, byteCount, isPieceData);
1140 }
1141 
tr_peerIoWriteBytes(tr_peerIo * io,void const * bytes,size_t byteCount,bool isPieceData)1142 void tr_peerIoWriteBytes(tr_peerIo* io, void const* bytes, size_t byteCount, bool isPieceData)
1143 {
1144     struct evbuffer_iovec iovec;
1145     evbuffer_reserve_space(io->outbuf, byteCount, &iovec, 1);
1146 
1147     iovec.iov_len = byteCount;
1148 
1149     if (io->encryption_type == PEER_ENCRYPTION_RC4)
1150     {
1151         tr_cryptoEncrypt(&io->crypto, iovec.iov_len, bytes, iovec.iov_base);
1152     }
1153     else
1154     {
1155         memcpy(iovec.iov_base, bytes, iovec.iov_len);
1156     }
1157 
1158     evbuffer_commit_space(io->outbuf, &iovec, 1);
1159 
1160     addDatatype(io, byteCount, isPieceData);
1161 }
1162 
1163 /***
1164 ****
1165 ***/
1166 
evbuffer_add_uint8(struct evbuffer * outbuf,uint8_t byte)1167 void evbuffer_add_uint8(struct evbuffer* outbuf, uint8_t byte)
1168 {
1169     evbuffer_add(outbuf, &byte, 1);
1170 }
1171 
evbuffer_add_uint16(struct evbuffer * outbuf,uint16_t addme_hs)1172 void evbuffer_add_uint16(struct evbuffer* outbuf, uint16_t addme_hs)
1173 {
1174     uint16_t const ns = htons(addme_hs);
1175     evbuffer_add(outbuf, &ns, sizeof(ns));
1176 }
1177 
evbuffer_add_uint32(struct evbuffer * outbuf,uint32_t addme_hl)1178 void evbuffer_add_uint32(struct evbuffer* outbuf, uint32_t addme_hl)
1179 {
1180     uint32_t const nl = htonl(addme_hl);
1181     evbuffer_add(outbuf, &nl, sizeof(nl));
1182 }
1183 
evbuffer_add_uint64(struct evbuffer * outbuf,uint64_t addme_hll)1184 void evbuffer_add_uint64(struct evbuffer* outbuf, uint64_t addme_hll)
1185 {
1186     uint64_t const nll = tr_htonll(addme_hll);
1187     evbuffer_add(outbuf, &nll, sizeof(nll));
1188 }
1189 
1190 /***
1191 ****
1192 ***/
1193 
maybeDecryptBuffer(tr_peerIo * io,struct evbuffer * buf,size_t offset,size_t size)1194 static inline void maybeDecryptBuffer(tr_peerIo* io, struct evbuffer* buf, size_t offset, size_t size)
1195 {
1196     if (io->encryption_type == PEER_ENCRYPTION_RC4)
1197     {
1198         processBuffer(&io->crypto, buf, offset, size, &tr_cryptoDecrypt);
1199     }
1200 }
1201 
tr_peerIoReadBytesToBuf(tr_peerIo * io,struct evbuffer * inbuf,struct evbuffer * outbuf,size_t byteCount)1202 void tr_peerIoReadBytesToBuf(tr_peerIo* io, struct evbuffer* inbuf, struct evbuffer* outbuf, size_t byteCount)
1203 {
1204     TR_ASSERT(tr_isPeerIo(io));
1205     TR_ASSERT(evbuffer_get_length(inbuf) >= byteCount);
1206 
1207     size_t const old_length = evbuffer_get_length(outbuf);
1208 
1209     /* append it to outbuf */
1210     struct evbuffer* tmp = evbuffer_new();
1211     evbuffer_remove_buffer(inbuf, tmp, byteCount);
1212     evbuffer_add_buffer(outbuf, tmp);
1213     evbuffer_free(tmp);
1214 
1215     maybeDecryptBuffer(io, outbuf, old_length, byteCount);
1216 }
1217 
tr_peerIoReadBytes(tr_peerIo * io,struct evbuffer * inbuf,void * bytes,size_t byteCount)1218 void tr_peerIoReadBytes(tr_peerIo* io, struct evbuffer* inbuf, void* bytes, size_t byteCount)
1219 {
1220     TR_ASSERT(tr_isPeerIo(io));
1221     TR_ASSERT(evbuffer_get_length(inbuf) >= byteCount);
1222 
1223     switch (io->encryption_type)
1224     {
1225     case PEER_ENCRYPTION_NONE:
1226         evbuffer_remove(inbuf, bytes, byteCount);
1227         break;
1228 
1229     case PEER_ENCRYPTION_RC4:
1230         evbuffer_remove(inbuf, bytes, byteCount);
1231         tr_cryptoDecrypt(&io->crypto, byteCount, bytes, bytes);
1232         break;
1233 
1234     default:
1235         TR_ASSERT_MSG(false, "unhandled encryption type %d", (int)io->encryption_type);
1236     }
1237 }
1238 
tr_peerIoReadUint16(tr_peerIo * io,struct evbuffer * inbuf,uint16_t * setme)1239 void tr_peerIoReadUint16(tr_peerIo* io, struct evbuffer* inbuf, uint16_t* setme)
1240 {
1241     uint16_t tmp;
1242     tr_peerIoReadBytes(io, inbuf, &tmp, sizeof(uint16_t));
1243     *setme = ntohs(tmp);
1244 }
1245 
tr_peerIoReadUint32(tr_peerIo * io,struct evbuffer * inbuf,uint32_t * setme)1246 void tr_peerIoReadUint32(tr_peerIo* io, struct evbuffer* inbuf, uint32_t* setme)
1247 {
1248     uint32_t tmp;
1249     tr_peerIoReadBytes(io, inbuf, &tmp, sizeof(uint32_t));
1250     *setme = ntohl(tmp);
1251 }
1252 
tr_peerIoDrain(tr_peerIo * io,struct evbuffer * inbuf,size_t byteCount)1253 void tr_peerIoDrain(tr_peerIo* io, struct evbuffer* inbuf, size_t byteCount)
1254 {
1255     char buf[4096];
1256     size_t const buflen = sizeof(buf);
1257 
1258     while (byteCount > 0)
1259     {
1260         size_t const thisPass = MIN(byteCount, buflen);
1261         tr_peerIoReadBytes(io, inbuf, buf, thisPass);
1262         byteCount -= thisPass;
1263     }
1264 }
1265 
1266 /***
1267 ****
1268 ***/
1269 
tr_peerIoTryRead(tr_peerIo * io,size_t howmuch)1270 static int tr_peerIoTryRead(tr_peerIo* io, size_t howmuch)
1271 {
1272     int res = 0;
1273 
1274     if ((howmuch = tr_bandwidthClamp(&io->bandwidth, TR_DOWN, howmuch)) != 0)
1275     {
1276         switch (io->socket.type)
1277         {
1278         case TR_PEER_SOCKET_TYPE_UTP:
1279             /* UTP_RBDrained notifies libutp that your read buffer is emtpy.
1280              * It opens up the congestion window by sending an ACK (soonish)
1281              * if one was not going to be sent. */
1282             if (evbuffer_get_length(io->inbuf) == 0)
1283             {
1284                 UTP_RBDrained(io->socket.handle.utp);
1285             }
1286 
1287             break;
1288 
1289         case TR_PEER_SOCKET_TYPE_TCP:
1290             {
1291                 int e;
1292                 char err_buf[512];
1293 
1294                 EVUTIL_SET_SOCKET_ERROR(0);
1295                 res = evbuffer_read(io->inbuf, io->socket.handle.tcp, (int)howmuch);
1296                 e = EVUTIL_SOCKET_ERROR();
1297 
1298                 dbgmsg(io, "read %d from peer (%s)", res, res == -1 ? tr_net_strerror(err_buf, sizeof(err_buf), e) : "");
1299 
1300                 if (evbuffer_get_length(io->inbuf) != 0)
1301                 {
1302                     canReadWrapper(io);
1303                 }
1304 
1305                 if (res <= 0 && io->gotError != NULL && e != EAGAIN && e != EINTR && e != EINPROGRESS)
1306                 {
1307                     short what = BEV_EVENT_READING | BEV_EVENT_ERROR;
1308 
1309                     if (res == 0)
1310                     {
1311                         what |= BEV_EVENT_EOF;
1312                     }
1313 
1314                     dbgmsg(io, "tr_peerIoTryRead got an error. res is %d, what is %hd, errno is %d (%s)", res, what, e,
1315                         tr_net_strerror(err_buf, sizeof(err_buf), e));
1316 
1317                     io->gotError(io, what, io->userData);
1318                 }
1319 
1320                 break;
1321             }
1322 
1323         default:
1324             TR_ASSERT_MSG(false, "unsupported peer socket type %d", io->socket.type);
1325         }
1326     }
1327 
1328     return res;
1329 }
1330 
tr_peerIoTryWrite(tr_peerIo * io,size_t howmuch)1331 static int tr_peerIoTryWrite(tr_peerIo* io, size_t howmuch)
1332 {
1333     int n = 0;
1334     size_t const old_len = evbuffer_get_length(io->outbuf);
1335     dbgmsg(io, "in tr_peerIoTryWrite %zu", howmuch);
1336 
1337     if (howmuch > old_len)
1338     {
1339         howmuch = old_len;
1340     }
1341 
1342     if ((howmuch = tr_bandwidthClamp(&io->bandwidth, TR_UP, howmuch)) != 0)
1343     {
1344         switch (io->socket.type)
1345         {
1346         case TR_PEER_SOCKET_TYPE_UTP:
1347             UTP_Write(io->socket.handle.utp, howmuch);
1348             n = old_len - evbuffer_get_length(io->outbuf);
1349             break;
1350 
1351         case TR_PEER_SOCKET_TYPE_TCP:
1352             {
1353                 int e;
1354 
1355                 EVUTIL_SET_SOCKET_ERROR(0);
1356                 n = tr_evbuffer_write(io, io->socket.handle.tcp, howmuch);
1357                 e = EVUTIL_SOCKET_ERROR();
1358 
1359                 if (n > 0)
1360                 {
1361                     didWriteWrapper(io, n);
1362                 }
1363 
1364                 if (n < 0 && io->gotError != NULL && e != 0 && e != EPIPE && e != EAGAIN && e != EINTR && e != EINPROGRESS)
1365                 {
1366                     char errstr[512];
1367                     short const what = BEV_EVENT_WRITING | BEV_EVENT_ERROR;
1368 
1369                     dbgmsg(io, "tr_peerIoTryWrite got an error. res is %d, what is %hd, errno is %d (%s)", n, what, e,
1370                         tr_net_strerror(errstr, sizeof(errstr), e));
1371 
1372                     io->gotError(io, what, io->userData);
1373                 }
1374 
1375                 break;
1376             }
1377 
1378         default:
1379             TR_ASSERT_MSG(false, "unsupported peer socket type %d", io->socket.type);
1380         }
1381     }
1382 
1383     return n;
1384 }
1385 
tr_peerIoFlush(tr_peerIo * io,tr_direction dir,size_t limit)1386 int tr_peerIoFlush(tr_peerIo* io, tr_direction dir, size_t limit)
1387 {
1388     TR_ASSERT(tr_isPeerIo(io));
1389     TR_ASSERT(tr_isDirection(dir));
1390 
1391     int bytesUsed = 0;
1392 
1393     if (dir == TR_DOWN)
1394     {
1395         bytesUsed = tr_peerIoTryRead(io, limit);
1396     }
1397     else
1398     {
1399         bytesUsed = tr_peerIoTryWrite(io, limit);
1400     }
1401 
1402     dbgmsg(io, "flushing peer-io, direction %d, limit %zu, bytesUsed %d", (int)dir, limit, bytesUsed);
1403     return bytesUsed;
1404 }
1405 
tr_peerIoFlushOutgoingProtocolMsgs(tr_peerIo * io)1406 int tr_peerIoFlushOutgoingProtocolMsgs(tr_peerIo* io)
1407 {
1408     size_t byteCount = 0;
1409 
1410     /* count up how many bytes are used by non-piece-data messages
1411        at the front of our outbound queue */
1412     for (struct tr_datatype const* it = io->outbuf_datatypes; it != NULL; it = it->next)
1413     {
1414         if (it->isPieceData)
1415         {
1416             break;
1417         }
1418         else
1419         {
1420             byteCount += it->length;
1421         }
1422     }
1423 
1424     return tr_peerIoFlush(io, TR_UP, byteCount);
1425 }
1426