1 /* Copyright StrongLoop, Inc. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "defs.h"
23 #include <errno.h>
24 #include <stdlib.h>
25 #include <string.h>
26 
27 /* A connection is modeled as an abstraction on top of two simple state
28  * machines, one for reading and one for writing.  Either state machine
29  * is, when active, in one of three states: busy, done or stop; the fourth
30  * and final state, dead, is an end state and only relevant when shutting
31  * down the connection.  A short overview:
32  *
33  *                          busy                  done           stop
34  *  ----------|---------------------------|--------------------|------|
35  *  readable  | waiting for incoming data | have incoming data | idle |
36  *  writable  | busy writing out data     | completed write    | idle |
37  *
38  * We could remove the done state from the writable state machine. For our
39  * purposes, it's functionally equivalent to the stop state.
40  *
41  * When the connection with upstream has been established, the client_ctx
42  * moves into a state where incoming data from the client is sent upstream
43  * and vice versa, incoming data from upstream is sent to the client.  In
44  * other words, we're just piping data back and forth.  See conn_cycle()
45  * for details.
46  *
47  * An interesting deviation from libuv's I/O model is that reads are discrete
48  * rather than continuous events.  In layman's terms, when a read operation
49  * completes, the connection stops reading until further notice.
50  *
51  * The rationale for this approach is that we have to wait until the data
52  * has been sent out again before we can reuse the read buffer.
53  *
54  * It also pleasingly unifies with the request model that libuv uses for
55  * writes and everything else; libuv may switch to a request model for
56  * reads in the future.
57  */
58 enum conn_state {
59   c_busy,  /* Busy; waiting for incoming data or for a write to complete. */
60   c_done,  /* Done; read incoming data or write finished. */
61   c_stop,  /* Stopped. */
62   c_dead
63 };
64 
65 /* Session states. */
66 enum sess_state {
67   s_handshake,        /* Wait for client handshake. */
68   s_handshake_auth,   /* Wait for client authentication data. */
69   s_req_start,        /* Start waiting for request data. */
70   s_req_parse,        /* Wait for request data. */
71   s_req_lookup,       /* Wait for upstream hostname DNS lookup to complete. */
72   s_req_connect,      /* Wait for uv_tcp_connect() to complete. */
73   s_proxy_start,      /* Connected. Start piping data. */
74   s_proxy,            /* Connected. Pipe data back and forth. */
75   s_kill,             /* Tear down session. */
76   s_almost_dead_0,    /* Waiting for finalizers to complete. */
77   s_almost_dead_1,    /* Waiting for finalizers to complete. */
78   s_almost_dead_2,    /* Waiting for finalizers to complete. */
79   s_almost_dead_3,    /* Waiting for finalizers to complete. */
80   s_almost_dead_4,    /* Waiting for finalizers to complete. */
81   s_dead              /* Dead. Safe to free now. */
82 };
83 
84 static void do_next(client_ctx *cx);
85 static int do_handshake(client_ctx *cx);
86 static int do_handshake_auth(client_ctx *cx);
87 static int do_req_start(client_ctx *cx);
88 static int do_req_parse(client_ctx *cx);
89 static int do_req_lookup(client_ctx *cx);
90 static int do_req_connect_start(client_ctx *cx);
91 static int do_req_connect(client_ctx *cx);
92 static int do_proxy_start(client_ctx *cx);
93 static int do_proxy(client_ctx *cx);
94 static int do_kill(client_ctx *cx);
95 static int do_almost_dead(client_ctx *cx);
96 static int conn_cycle(const char *who, conn *a, conn *b);
97 static void conn_timer_reset(conn *c);
98 static void conn_timer_expire(uv_timer_t *handle, int status);
99 static void conn_getaddrinfo(conn *c, const char *hostname);
100 static void conn_getaddrinfo_done(uv_getaddrinfo_t *req,
101                                   int status,
102                                   struct addrinfo *ai);
103 static int conn_connect(conn *c);
104 static void conn_connect_done(uv_connect_t *req, int status);
105 static void conn_read(conn *c);
106 static void conn_read_done(uv_stream_t *handle,
107                            ssize_t nread,
108                            const uv_buf_t *buf);
109 static void conn_alloc(uv_handle_t *handle, size_t size, uv_buf_t *buf);
110 static void conn_write(conn *c, const void *data, unsigned int len);
111 static void conn_write_done(uv_write_t *req, int status);
112 static void conn_close(conn *c);
113 static void conn_close_done(uv_handle_t *handle);
114 
115 /* |incoming| has been initialized by server.c when this is called. */
client_finish_init(server_ctx * sx,client_ctx * cx)116 void client_finish_init(server_ctx *sx, client_ctx *cx) {
117   conn *incoming;
118   conn *outgoing;
119 
120   cx->sx = sx;
121   cx->state = s_handshake;
122   s5_init(&cx->parser);
123 
124   incoming = &cx->incoming;
125   incoming->client = cx;
126   incoming->result = 0;
127   incoming->rdstate = c_stop;
128   incoming->wrstate = c_stop;
129   incoming->idle_timeout = sx->idle_timeout;
130   CHECK(0 == uv_timer_init(sx->loop, &incoming->timer_handle));
131 
132   outgoing = &cx->outgoing;
133   outgoing->client = cx;
134   outgoing->result = 0;
135   outgoing->rdstate = c_stop;
136   outgoing->wrstate = c_stop;
137   outgoing->idle_timeout = sx->idle_timeout;
138   CHECK(0 == uv_tcp_init(cx->sx->loop, &outgoing->handle.tcp));
139   CHECK(0 == uv_timer_init(cx->sx->loop, &outgoing->timer_handle));
140 
141   /* Wait for the initial packet. */
142   conn_read(incoming);
143 }
144 
145 /* This is the core state machine that drives the client <-> upstream proxy.
146  * We move through the initial handshake and authentication steps first and
147  * end up (if all goes well) in the proxy state where we're just proxying
148  * data between the client and upstream.
149  */
do_next(client_ctx * cx)150 static void do_next(client_ctx *cx) {
151   int new_state;
152 
153   ASSERT(cx->state != s_dead);
154   switch (cx->state) {
155     case s_handshake:
156       new_state = do_handshake(cx);
157       break;
158     case s_handshake_auth:
159       new_state = do_handshake_auth(cx);
160       break;
161     case s_req_start:
162       new_state = do_req_start(cx);
163       break;
164     case s_req_parse:
165       new_state = do_req_parse(cx);
166       break;
167     case s_req_lookup:
168       new_state = do_req_lookup(cx);
169       break;
170     case s_req_connect:
171       new_state = do_req_connect(cx);
172       break;
173     case s_proxy_start:
174       new_state = do_proxy_start(cx);
175       break;
176     case s_proxy:
177       new_state = do_proxy(cx);
178       break;
179     case s_kill:
180       new_state = do_kill(cx);
181       break;
182     case s_almost_dead_0:
183     case s_almost_dead_1:
184     case s_almost_dead_2:
185     case s_almost_dead_3:
186     case s_almost_dead_4:
187       new_state = do_almost_dead(cx);
188       break;
189     default:
190       UNREACHABLE();
191   }
192   cx->state = new_state;
193 
194   if (cx->state == s_dead) {
195     if (DEBUG_CHECKS) {
196       memset(cx, -1, sizeof(*cx));
197     }
198     free(cx);
199   }
200 }
201 
do_handshake(client_ctx * cx)202 static int do_handshake(client_ctx *cx) {
203   unsigned int methods;
204   conn *incoming;
205   s5_ctx *parser;
206   uint8_t *data;
207   size_t size;
208   int err;
209 
210   parser = &cx->parser;
211   incoming = &cx->incoming;
212   ASSERT(incoming->rdstate == c_done);
213   ASSERT(incoming->wrstate == c_stop);
214   incoming->rdstate = c_stop;
215 
216   if (incoming->result < 0) {
217     pr_err("read error: %s", uv_strerror(incoming->result));
218     return do_kill(cx);
219   }
220 
221   data = (uint8_t *) incoming->t.buf;
222   size = (size_t) incoming->result;
223   err = s5_parse(parser, &data, &size);
224   if (err == s5_ok) {
225     conn_read(incoming);
226     return s_handshake;  /* Need more data. */
227   }
228 
229   if (size != 0) {
230     /* Could allow a round-trip saving shortcut here if the requested auth
231      * method is S5_AUTH_NONE (provided unauthenticated traffic is allowed.)
232      * Requires client support however.
233      */
234     pr_err("junk in handshake");
235     return do_kill(cx);
236   }
237 
238   if (err != s5_auth_select) {
239     pr_err("handshake error: %s", s5_strerror(err));
240     return do_kill(cx);
241   }
242 
243   methods = s5_auth_methods(parser);
244   if ((methods & S5_AUTH_NONE) && can_auth_none(cx->sx, cx)) {
245     s5_select_auth(parser, S5_AUTH_NONE);
246     conn_write(incoming, "\5\0", 2);  /* No auth required. */
247     return s_req_start;
248   }
249 
250   if ((methods & S5_AUTH_PASSWD) && can_auth_passwd(cx->sx, cx)) {
251     /* TODO(bnoordhuis) Implement username/password auth. */
252   }
253 
254   conn_write(incoming, "\5\377", 2);  /* No acceptable auth. */
255   return s_kill;
256 }
257 
258 /* TODO(bnoordhuis) Implement username/password auth. */
do_handshake_auth(client_ctx * cx)259 static int do_handshake_auth(client_ctx *cx) {
260   UNREACHABLE();
261   return do_kill(cx);
262 }
263 
do_req_start(client_ctx * cx)264 static int do_req_start(client_ctx *cx) {
265   conn *incoming;
266 
267   incoming = &cx->incoming;
268   ASSERT(incoming->rdstate == c_stop);
269   ASSERT(incoming->wrstate == c_done);
270   incoming->wrstate = c_stop;
271 
272   if (incoming->result < 0) {
273     pr_err("write error: %s", uv_strerror(incoming->result));
274     return do_kill(cx);
275   }
276 
277   conn_read(incoming);
278   return s_req_parse;
279 }
280 
do_req_parse(client_ctx * cx)281 static int do_req_parse(client_ctx *cx) {
282   conn *incoming;
283   conn *outgoing;
284   s5_ctx *parser;
285   uint8_t *data;
286   size_t size;
287   int err;
288 
289   parser = &cx->parser;
290   incoming = &cx->incoming;
291   outgoing = &cx->outgoing;
292   ASSERT(incoming->rdstate == c_done);
293   ASSERT(incoming->wrstate == c_stop);
294   ASSERT(outgoing->rdstate == c_stop);
295   ASSERT(outgoing->wrstate == c_stop);
296   incoming->rdstate = c_stop;
297 
298   if (incoming->result < 0) {
299     pr_err("read error: %s", uv_strerror(incoming->result));
300     return do_kill(cx);
301   }
302 
303   data = (uint8_t *) incoming->t.buf;
304   size = (size_t) incoming->result;
305   err = s5_parse(parser, &data, &size);
306   if (err == s5_ok) {
307     conn_read(incoming);
308     return s_req_parse;  /* Need more data. */
309   }
310 
311   if (size != 0) {
312     pr_err("junk in request %u", (unsigned) size);
313     return do_kill(cx);
314   }
315 
316   if (err != s5_exec_cmd) {
317     pr_err("request error: %s", s5_strerror(err));
318     return do_kill(cx);
319   }
320 
321   if (parser->cmd == s5_cmd_tcp_bind) {
322     /* Not supported but relatively straightforward to implement. */
323     pr_warn("BIND requests are not supported.");
324     return do_kill(cx);
325   }
326 
327   if (parser->cmd == s5_cmd_udp_assoc) {
328     /* Not supported.  Might be hard to implement because libuv has no
329      * functionality for detecting the MTU size which the RFC mandates.
330      */
331     pr_warn("UDP ASSOC requests are not supported.");
332     return do_kill(cx);
333   }
334   ASSERT(parser->cmd == s5_cmd_tcp_connect);
335 
336   if (parser->atyp == s5_atyp_host) {
337     conn_getaddrinfo(outgoing, (const char *) parser->daddr);
338     return s_req_lookup;
339   }
340 
341   if (parser->atyp == s5_atyp_ipv4) {
342     memset(&outgoing->t.addr4, 0, sizeof(outgoing->t.addr4));
343     outgoing->t.addr4.sin_family = AF_INET;
344     outgoing->t.addr4.sin_port = htons(parser->dport);
345     memcpy(&outgoing->t.addr4.sin_addr,
346            parser->daddr,
347            sizeof(outgoing->t.addr4.sin_addr));
348   } else if (parser->atyp == s5_atyp_ipv6) {
349     memset(&outgoing->t.addr6, 0, sizeof(outgoing->t.addr6));
350     outgoing->t.addr6.sin6_family = AF_INET6;
351     outgoing->t.addr6.sin6_port = htons(parser->dport);
352     memcpy(&outgoing->t.addr6.sin6_addr,
353            parser->daddr,
354            sizeof(outgoing->t.addr6.sin6_addr));
355   } else {
356     UNREACHABLE();
357   }
358 
359   return do_req_connect_start(cx);
360 }
361 
do_req_lookup(client_ctx * cx)362 static int do_req_lookup(client_ctx *cx) {
363   s5_ctx *parser;
364   conn *incoming;
365   conn *outgoing;
366 
367   parser = &cx->parser;
368   incoming = &cx->incoming;
369   outgoing = &cx->outgoing;
370   ASSERT(incoming->rdstate == c_stop);
371   ASSERT(incoming->wrstate == c_stop);
372   ASSERT(outgoing->rdstate == c_stop);
373   ASSERT(outgoing->wrstate == c_stop);
374 
375   if (outgoing->result < 0) {
376     /* TODO(bnoordhuis) Escape control characters in parser->daddr. */
377     pr_err("lookup error for \"%s\": %s",
378            parser->daddr,
379            uv_strerror(outgoing->result));
380     /* Send back a 'Host unreachable' reply. */
381     conn_write(incoming, "\5\4\0\1\0\0\0\0\0\0", 10);
382     return s_kill;
383   }
384 
385   /* Don't make assumptions about the offset of sin_port/sin6_port. */
386   switch (outgoing->t.addr.sa_family) {
387     case AF_INET:
388       outgoing->t.addr4.sin_port = htons(parser->dport);
389       break;
390     case AF_INET6:
391       outgoing->t.addr6.sin6_port = htons(parser->dport);
392       break;
393     default:
394       UNREACHABLE();
395   }
396 
397   return do_req_connect_start(cx);
398 }
399 
400 /* Assumes that cx->outgoing.t.sa contains a valid AF_INET/AF_INET6 address. */
do_req_connect_start(client_ctx * cx)401 static int do_req_connect_start(client_ctx *cx) {
402   conn *incoming;
403   conn *outgoing;
404   int err;
405 
406   incoming = &cx->incoming;
407   outgoing = &cx->outgoing;
408   ASSERT(incoming->rdstate == c_stop);
409   ASSERT(incoming->wrstate == c_stop);
410   ASSERT(outgoing->rdstate == c_stop);
411   ASSERT(outgoing->wrstate == c_stop);
412 
413   if (!can_access(cx->sx, cx, &outgoing->t.addr)) {
414     pr_warn("connection not allowed by ruleset");
415     /* Send a 'Connection not allowed by ruleset' reply. */
416     conn_write(incoming, "\5\2\0\1\0\0\0\0\0\0", 10);
417     return s_kill;
418   }
419 
420   err = conn_connect(outgoing);
421   if (err != 0) {
422     pr_err("connect error: %s\n", uv_strerror(err));
423     return do_kill(cx);
424   }
425 
426   return s_req_connect;
427 }
428 
do_req_connect(client_ctx * cx)429 static int do_req_connect(client_ctx *cx) {
430   const struct sockaddr_in6 *in6;
431   const struct sockaddr_in *in;
432   char addr_storage[sizeof(*in6)];
433   conn *incoming;
434   conn *outgoing;
435   uint8_t *buf;
436   int addrlen;
437 
438   incoming = &cx->incoming;
439   outgoing = &cx->outgoing;
440   ASSERT(incoming->rdstate == c_stop);
441   ASSERT(incoming->wrstate == c_stop);
442   ASSERT(outgoing->rdstate == c_stop);
443   ASSERT(outgoing->wrstate == c_stop);
444 
445   /* Build and send the reply.  Not very pretty but gets the job done. */
446   buf = (uint8_t *) incoming->t.buf;
447   if (outgoing->result == 0) {
448     /* The RFC mandates that the SOCKS server must include the local port
449      * and address in the reply.  So that's what we do.
450      */
451     addrlen = sizeof(addr_storage);
452     CHECK(0 == uv_tcp_getsockname(&outgoing->handle.tcp,
453                                   (struct sockaddr *) addr_storage,
454                                   &addrlen));
455     buf[0] = 5;  /* Version. */
456     buf[1] = 0;  /* Success. */
457     buf[2] = 0;  /* Reserved. */
458     if (addrlen == sizeof(*in)) {
459       buf[3] = 1;  /* IPv4. */
460       in = (const struct sockaddr_in *) &addr_storage;
461       memcpy(buf + 4, &in->sin_addr, 4);
462       memcpy(buf + 8, &in->sin_port, 2);
463       conn_write(incoming, buf, 10);
464     } else if (addrlen == sizeof(*in6)) {
465       buf[3] = 4;  /* IPv6. */
466       in6 = (const struct sockaddr_in6 *) &addr_storage;
467       memcpy(buf + 4, &in6->sin6_addr, 16);
468       memcpy(buf + 20, &in6->sin6_port, 2);
469       conn_write(incoming, buf, 22);
470     } else {
471       UNREACHABLE();
472     }
473     return s_proxy_start;
474   } else {
475     pr_err("upstream connection error: %s\n", uv_strerror(outgoing->result));
476     /* Send a 'Connection refused' reply. */
477     conn_write(incoming, "\5\5\0\1\0\0\0\0\0\0", 10);
478     return s_kill;
479   }
480 
481   UNREACHABLE();
482   return s_kill;
483 }
484 
do_proxy_start(client_ctx * cx)485 static int do_proxy_start(client_ctx *cx) {
486   conn *incoming;
487   conn *outgoing;
488 
489   incoming = &cx->incoming;
490   outgoing = &cx->outgoing;
491   ASSERT(incoming->rdstate == c_stop);
492   ASSERT(incoming->wrstate == c_done);
493   ASSERT(outgoing->rdstate == c_stop);
494   ASSERT(outgoing->wrstate == c_stop);
495   incoming->wrstate = c_stop;
496 
497   if (incoming->result < 0) {
498     pr_err("write error: %s", uv_strerror(incoming->result));
499     return do_kill(cx);
500   }
501 
502   conn_read(incoming);
503   conn_read(outgoing);
504   return s_proxy;
505 }
506 
507 /* Proxy incoming data back and forth. */
do_proxy(client_ctx * cx)508 static int do_proxy(client_ctx *cx) {
509   if (conn_cycle("client", &cx->incoming, &cx->outgoing)) {
510     return do_kill(cx);
511   }
512 
513   if (conn_cycle("upstream", &cx->outgoing, &cx->incoming)) {
514     return do_kill(cx);
515   }
516 
517   return s_proxy;
518 }
519 
do_kill(client_ctx * cx)520 static int do_kill(client_ctx *cx) {
521   int new_state;
522 
523   if (cx->state >= s_almost_dead_0) {
524     return cx->state;
525   }
526 
527   /* Try to cancel the request. The callback still runs but if the
528    * cancellation succeeded, it gets called with status=UV_ECANCELED.
529    */
530   new_state = s_almost_dead_1;
531   if (cx->state == s_req_lookup) {
532     new_state = s_almost_dead_0;
533     uv_cancel(&cx->outgoing.t.req);
534   }
535 
536   conn_close(&cx->incoming);
537   conn_close(&cx->outgoing);
538   return new_state;
539 }
540 
do_almost_dead(client_ctx * cx)541 static int do_almost_dead(client_ctx *cx) {
542   ASSERT(cx->state >= s_almost_dead_0);
543   return cx->state + 1;  /* Another finalizer completed. */
544 }
545 
conn_cycle(const char * who,conn * a,conn * b)546 static int conn_cycle(const char *who, conn *a, conn *b) {
547   if (a->result < 0) {
548     if (a->result != UV_EOF) {
549       pr_err("%s error: %s", who, uv_strerror(a->result));
550     }
551     return -1;
552   }
553 
554   if (b->result < 0) {
555     return -1;
556   }
557 
558   if (a->wrstate == c_done) {
559     a->wrstate = c_stop;
560   }
561 
562   /* The logic is as follows: read when we don't write and write when we don't
563    * read.  That gives us back-pressure handling for free because if the peer
564    * sends data faster than we consume it, TCP congestion control kicks in.
565    */
566   if (a->wrstate == c_stop) {
567     if (b->rdstate == c_stop) {
568       conn_read(b);
569     } else if (b->rdstate == c_done) {
570       conn_write(a, b->t.buf, b->result);
571       b->rdstate = c_stop;  /* Triggers the call to conn_read() above. */
572     }
573   }
574 
575   return 0;
576 }
577 
conn_timer_reset(conn * c)578 static void conn_timer_reset(conn *c) {
579   CHECK(0 == uv_timer_start(&c->timer_handle,
580                             conn_timer_expire,
581                             c->idle_timeout,
582                             0));
583 }
584 
conn_timer_expire(uv_timer_t * handle,int status)585 static void conn_timer_expire(uv_timer_t *handle, int status) {
586   conn *c;
587 
588   CHECK(0 == status);
589   c = CONTAINER_OF(handle, conn, timer_handle);
590   c->result = UV_ETIMEDOUT;
591   do_next(c->client);
592 }
593 
conn_getaddrinfo(conn * c,const char * hostname)594 static void conn_getaddrinfo(conn *c, const char *hostname) {
595   struct addrinfo hints;
596 
597   memset(&hints, 0, sizeof(hints));
598   hints.ai_family = AF_UNSPEC;
599   hints.ai_socktype = SOCK_STREAM;
600   hints.ai_protocol = IPPROTO_TCP;
601   CHECK(0 == uv_getaddrinfo(c->client->sx->loop,
602                             &c->t.addrinfo_req,
603                             conn_getaddrinfo_done,
604                             hostname,
605                             NULL,
606                             &hints));
607   conn_timer_reset(c);
608 }
609 
conn_getaddrinfo_done(uv_getaddrinfo_t * req,int status,struct addrinfo * ai)610 static void conn_getaddrinfo_done(uv_getaddrinfo_t *req,
611                                   int status,
612                                   struct addrinfo *ai) {
613   conn *c;
614 
615   c = CONTAINER_OF(req, conn, t.addrinfo_req);
616   c->result = status;
617 
618   if (status == 0) {
619     /* FIXME(bnoordhuis) Should try all addresses. */
620     if (ai->ai_family == AF_INET) {
621       c->t.addr4 = *(const struct sockaddr_in *) ai->ai_addr;
622     } else if (ai->ai_family == AF_INET6) {
623       c->t.addr6 = *(const struct sockaddr_in6 *) ai->ai_addr;
624     } else {
625       UNREACHABLE();
626     }
627   }
628 
629   uv_freeaddrinfo(ai);
630   do_next(c->client);
631 }
632 
633 /* Assumes that c->t.sa contains a valid AF_INET or AF_INET6 address. */
conn_connect(conn * c)634 static int conn_connect(conn *c) {
635   ASSERT(c->t.addr.sa_family == AF_INET ||
636          c->t.addr.sa_family == AF_INET6);
637   conn_timer_reset(c);
638   return uv_tcp_connect(&c->t.connect_req,
639                         &c->handle.tcp,
640                         &c->t.addr,
641                         conn_connect_done);
642 }
643 
conn_connect_done(uv_connect_t * req,int status)644 static void conn_connect_done(uv_connect_t *req, int status) {
645   conn *c;
646 
647   if (status == UV_ECANCELED) {
648     return;  /* Handle has been closed. */
649   }
650 
651   c = CONTAINER_OF(req, conn, t.connect_req);
652   c->result = status;
653   do_next(c->client);
654 }
655 
conn_read(conn * c)656 static void conn_read(conn *c) {
657   ASSERT(c->rdstate == c_stop);
658   CHECK(0 == uv_read_start(&c->handle.stream, conn_alloc, conn_read_done));
659   c->rdstate = c_busy;
660   conn_timer_reset(c);
661 }
662 
conn_read_done(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)663 static void conn_read_done(uv_stream_t *handle,
664                            ssize_t nread,
665                            const uv_buf_t *buf) {
666   conn *c;
667 
668   c = CONTAINER_OF(handle, conn, handle);
669   ASSERT(c->t.buf == buf->base);
670   ASSERT(c->rdstate == c_busy);
671   c->rdstate = c_done;
672   c->result = nread;
673 
674   uv_read_stop(&c->handle.stream);
675   do_next(c->client);
676 }
677 
conn_alloc(uv_handle_t * handle,size_t size,uv_buf_t * buf)678 static void conn_alloc(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
679   conn *c;
680 
681   c = CONTAINER_OF(handle, conn, handle);
682   ASSERT(c->rdstate == c_busy);
683   buf->base = c->t.buf;
684   buf->len = sizeof(c->t.buf);
685 }
686 
conn_write(conn * c,const void * data,unsigned int len)687 static void conn_write(conn *c, const void *data, unsigned int len) {
688   uv_buf_t buf;
689 
690   ASSERT(c->wrstate == c_stop || c->wrstate == c_done);
691   c->wrstate = c_busy;
692 
693   /* It's okay to cast away constness here, uv_write() won't modify the
694    * memory.
695    */
696   buf.base = (char *) data;
697   buf.len = len;
698 
699   CHECK(0 == uv_write(&c->write_req,
700                       &c->handle.stream,
701                       &buf,
702                       1,
703                       conn_write_done));
704   conn_timer_reset(c);
705 }
706 
conn_write_done(uv_write_t * req,int status)707 static void conn_write_done(uv_write_t *req, int status) {
708   conn *c;
709 
710   if (status == UV_ECANCELED) {
711     return;  /* Handle has been closed. */
712   }
713 
714   c = CONTAINER_OF(req, conn, write_req);
715   ASSERT(c->wrstate == c_busy);
716   c->wrstate = c_done;
717   c->result = status;
718   do_next(c->client);
719 }
720 
conn_close(conn * c)721 static void conn_close(conn *c) {
722   ASSERT(c->rdstate != c_dead);
723   ASSERT(c->wrstate != c_dead);
724   c->rdstate = c_dead;
725   c->wrstate = c_dead;
726   c->timer_handle.data = c;
727   c->handle.handle.data = c;
728   uv_close(&c->handle.handle, conn_close_done);
729   uv_close((uv_handle_t *) &c->timer_handle, conn_close_done);
730 }
731 
conn_close_done(uv_handle_t * handle)732 static void conn_close_done(uv_handle_t *handle) {
733   conn *c;
734 
735   c = handle->data;
736   do_next(c->client);
737 }
738