1 /* Copyright StrongLoop, Inc. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include "defs.h"
23 #include <errno.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 /* A connection is modeled as an abstraction on top of two simple state
28 * machines, one for reading and one for writing. Either state machine
29 * is, when active, in one of three states: busy, done or stop; the fourth
30 * and final state, dead, is an end state and only relevant when shutting
31 * down the connection. A short overview:
32 *
33 * busy done stop
34 * ----------|---------------------------|--------------------|------|
35 * readable | waiting for incoming data | have incoming data | idle |
36 * writable | busy writing out data | completed write | idle |
37 *
38 * We could remove the done state from the writable state machine. For our
39 * purposes, it's functionally equivalent to the stop state.
40 *
41 * When the connection with upstream has been established, the client_ctx
42 * moves into a state where incoming data from the client is sent upstream
43 * and vice versa, incoming data from upstream is sent to the client. In
44 * other words, we're just piping data back and forth. See conn_cycle()
45 * for details.
46 *
47 * An interesting deviation from libuv's I/O model is that reads are discrete
48 * rather than continuous events. In layman's terms, when a read operation
49 * completes, the connection stops reading until further notice.
50 *
51 * The rationale for this approach is that we have to wait until the data
52 * has been sent out again before we can reuse the read buffer.
53 *
54 * It also pleasingly unifies with the request model that libuv uses for
55 * writes and everything else; libuv may switch to a request model for
56 * reads in the future.
57 */
58 enum conn_state {
59 c_busy, /* Busy; waiting for incoming data or for a write to complete. */
60 c_done, /* Done; read incoming data or write finished. */
61 c_stop, /* Stopped. */
62 c_dead
63 };
64
65 /* Session states. */
66 enum sess_state {
67 s_handshake, /* Wait for client handshake. */
68 s_handshake_auth, /* Wait for client authentication data. */
69 s_req_start, /* Start waiting for request data. */
70 s_req_parse, /* Wait for request data. */
71 s_req_lookup, /* Wait for upstream hostname DNS lookup to complete. */
72 s_req_connect, /* Wait for uv_tcp_connect() to complete. */
73 s_proxy_start, /* Connected. Start piping data. */
74 s_proxy, /* Connected. Pipe data back and forth. */
75 s_kill, /* Tear down session. */
76 s_almost_dead_0, /* Waiting for finalizers to complete. */
77 s_almost_dead_1, /* Waiting for finalizers to complete. */
78 s_almost_dead_2, /* Waiting for finalizers to complete. */
79 s_almost_dead_3, /* Waiting for finalizers to complete. */
80 s_almost_dead_4, /* Waiting for finalizers to complete. */
81 s_dead /* Dead. Safe to free now. */
82 };
83
84 static void do_next(client_ctx *cx);
85 static int do_handshake(client_ctx *cx);
86 static int do_handshake_auth(client_ctx *cx);
87 static int do_req_start(client_ctx *cx);
88 static int do_req_parse(client_ctx *cx);
89 static int do_req_lookup(client_ctx *cx);
90 static int do_req_connect_start(client_ctx *cx);
91 static int do_req_connect(client_ctx *cx);
92 static int do_proxy_start(client_ctx *cx);
93 static int do_proxy(client_ctx *cx);
94 static int do_kill(client_ctx *cx);
95 static int do_almost_dead(client_ctx *cx);
96 static int conn_cycle(const char *who, conn *a, conn *b);
97 static void conn_timer_reset(conn *c);
98 static void conn_timer_expire(uv_timer_t *handle, int status);
99 static void conn_getaddrinfo(conn *c, const char *hostname);
100 static void conn_getaddrinfo_done(uv_getaddrinfo_t *req,
101 int status,
102 struct addrinfo *ai);
103 static int conn_connect(conn *c);
104 static void conn_connect_done(uv_connect_t *req, int status);
105 static void conn_read(conn *c);
106 static void conn_read_done(uv_stream_t *handle,
107 ssize_t nread,
108 const uv_buf_t *buf);
109 static void conn_alloc(uv_handle_t *handle, size_t size, uv_buf_t *buf);
110 static void conn_write(conn *c, const void *data, unsigned int len);
111 static void conn_write_done(uv_write_t *req, int status);
112 static void conn_close(conn *c);
113 static void conn_close_done(uv_handle_t *handle);
114
115 /* |incoming| has been initialized by server.c when this is called. */
client_finish_init(server_ctx * sx,client_ctx * cx)116 void client_finish_init(server_ctx *sx, client_ctx *cx) {
117 conn *incoming;
118 conn *outgoing;
119
120 cx->sx = sx;
121 cx->state = s_handshake;
122 s5_init(&cx->parser);
123
124 incoming = &cx->incoming;
125 incoming->client = cx;
126 incoming->result = 0;
127 incoming->rdstate = c_stop;
128 incoming->wrstate = c_stop;
129 incoming->idle_timeout = sx->idle_timeout;
130 CHECK(0 == uv_timer_init(sx->loop, &incoming->timer_handle));
131
132 outgoing = &cx->outgoing;
133 outgoing->client = cx;
134 outgoing->result = 0;
135 outgoing->rdstate = c_stop;
136 outgoing->wrstate = c_stop;
137 outgoing->idle_timeout = sx->idle_timeout;
138 CHECK(0 == uv_tcp_init(cx->sx->loop, &outgoing->handle.tcp));
139 CHECK(0 == uv_timer_init(cx->sx->loop, &outgoing->timer_handle));
140
141 /* Wait for the initial packet. */
142 conn_read(incoming);
143 }
144
145 /* This is the core state machine that drives the client <-> upstream proxy.
146 * We move through the initial handshake and authentication steps first and
147 * end up (if all goes well) in the proxy state where we're just proxying
148 * data between the client and upstream.
149 */
do_next(client_ctx * cx)150 static void do_next(client_ctx *cx) {
151 int new_state;
152
153 ASSERT(cx->state != s_dead);
154 switch (cx->state) {
155 case s_handshake:
156 new_state = do_handshake(cx);
157 break;
158 case s_handshake_auth:
159 new_state = do_handshake_auth(cx);
160 break;
161 case s_req_start:
162 new_state = do_req_start(cx);
163 break;
164 case s_req_parse:
165 new_state = do_req_parse(cx);
166 break;
167 case s_req_lookup:
168 new_state = do_req_lookup(cx);
169 break;
170 case s_req_connect:
171 new_state = do_req_connect(cx);
172 break;
173 case s_proxy_start:
174 new_state = do_proxy_start(cx);
175 break;
176 case s_proxy:
177 new_state = do_proxy(cx);
178 break;
179 case s_kill:
180 new_state = do_kill(cx);
181 break;
182 case s_almost_dead_0:
183 case s_almost_dead_1:
184 case s_almost_dead_2:
185 case s_almost_dead_3:
186 case s_almost_dead_4:
187 new_state = do_almost_dead(cx);
188 break;
189 default:
190 UNREACHABLE();
191 }
192 cx->state = new_state;
193
194 if (cx->state == s_dead) {
195 if (DEBUG_CHECKS) {
196 memset(cx, -1, sizeof(*cx));
197 }
198 free(cx);
199 }
200 }
201
do_handshake(client_ctx * cx)202 static int do_handshake(client_ctx *cx) {
203 unsigned int methods;
204 conn *incoming;
205 s5_ctx *parser;
206 uint8_t *data;
207 size_t size;
208 int err;
209
210 parser = &cx->parser;
211 incoming = &cx->incoming;
212 ASSERT(incoming->rdstate == c_done);
213 ASSERT(incoming->wrstate == c_stop);
214 incoming->rdstate = c_stop;
215
216 if (incoming->result < 0) {
217 pr_err("read error: %s", uv_strerror(incoming->result));
218 return do_kill(cx);
219 }
220
221 data = (uint8_t *) incoming->t.buf;
222 size = (size_t) incoming->result;
223 err = s5_parse(parser, &data, &size);
224 if (err == s5_ok) {
225 conn_read(incoming);
226 return s_handshake; /* Need more data. */
227 }
228
229 if (size != 0) {
230 /* Could allow a round-trip saving shortcut here if the requested auth
231 * method is S5_AUTH_NONE (provided unauthenticated traffic is allowed.)
232 * Requires client support however.
233 */
234 pr_err("junk in handshake");
235 return do_kill(cx);
236 }
237
238 if (err != s5_auth_select) {
239 pr_err("handshake error: %s", s5_strerror(err));
240 return do_kill(cx);
241 }
242
243 methods = s5_auth_methods(parser);
244 if ((methods & S5_AUTH_NONE) && can_auth_none(cx->sx, cx)) {
245 s5_select_auth(parser, S5_AUTH_NONE);
246 conn_write(incoming, "\5\0", 2); /* No auth required. */
247 return s_req_start;
248 }
249
250 if ((methods & S5_AUTH_PASSWD) && can_auth_passwd(cx->sx, cx)) {
251 /* TODO(bnoordhuis) Implement username/password auth. */
252 }
253
254 conn_write(incoming, "\5\377", 2); /* No acceptable auth. */
255 return s_kill;
256 }
257
258 /* TODO(bnoordhuis) Implement username/password auth. */
do_handshake_auth(client_ctx * cx)259 static int do_handshake_auth(client_ctx *cx) {
260 UNREACHABLE();
261 return do_kill(cx);
262 }
263
do_req_start(client_ctx * cx)264 static int do_req_start(client_ctx *cx) {
265 conn *incoming;
266
267 incoming = &cx->incoming;
268 ASSERT(incoming->rdstate == c_stop);
269 ASSERT(incoming->wrstate == c_done);
270 incoming->wrstate = c_stop;
271
272 if (incoming->result < 0) {
273 pr_err("write error: %s", uv_strerror(incoming->result));
274 return do_kill(cx);
275 }
276
277 conn_read(incoming);
278 return s_req_parse;
279 }
280
do_req_parse(client_ctx * cx)281 static int do_req_parse(client_ctx *cx) {
282 conn *incoming;
283 conn *outgoing;
284 s5_ctx *parser;
285 uint8_t *data;
286 size_t size;
287 int err;
288
289 parser = &cx->parser;
290 incoming = &cx->incoming;
291 outgoing = &cx->outgoing;
292 ASSERT(incoming->rdstate == c_done);
293 ASSERT(incoming->wrstate == c_stop);
294 ASSERT(outgoing->rdstate == c_stop);
295 ASSERT(outgoing->wrstate == c_stop);
296 incoming->rdstate = c_stop;
297
298 if (incoming->result < 0) {
299 pr_err("read error: %s", uv_strerror(incoming->result));
300 return do_kill(cx);
301 }
302
303 data = (uint8_t *) incoming->t.buf;
304 size = (size_t) incoming->result;
305 err = s5_parse(parser, &data, &size);
306 if (err == s5_ok) {
307 conn_read(incoming);
308 return s_req_parse; /* Need more data. */
309 }
310
311 if (size != 0) {
312 pr_err("junk in request %u", (unsigned) size);
313 return do_kill(cx);
314 }
315
316 if (err != s5_exec_cmd) {
317 pr_err("request error: %s", s5_strerror(err));
318 return do_kill(cx);
319 }
320
321 if (parser->cmd == s5_cmd_tcp_bind) {
322 /* Not supported but relatively straightforward to implement. */
323 pr_warn("BIND requests are not supported.");
324 return do_kill(cx);
325 }
326
327 if (parser->cmd == s5_cmd_udp_assoc) {
328 /* Not supported. Might be hard to implement because libuv has no
329 * functionality for detecting the MTU size which the RFC mandates.
330 */
331 pr_warn("UDP ASSOC requests are not supported.");
332 return do_kill(cx);
333 }
334 ASSERT(parser->cmd == s5_cmd_tcp_connect);
335
336 if (parser->atyp == s5_atyp_host) {
337 conn_getaddrinfo(outgoing, (const char *) parser->daddr);
338 return s_req_lookup;
339 }
340
341 if (parser->atyp == s5_atyp_ipv4) {
342 memset(&outgoing->t.addr4, 0, sizeof(outgoing->t.addr4));
343 outgoing->t.addr4.sin_family = AF_INET;
344 outgoing->t.addr4.sin_port = htons(parser->dport);
345 memcpy(&outgoing->t.addr4.sin_addr,
346 parser->daddr,
347 sizeof(outgoing->t.addr4.sin_addr));
348 } else if (parser->atyp == s5_atyp_ipv6) {
349 memset(&outgoing->t.addr6, 0, sizeof(outgoing->t.addr6));
350 outgoing->t.addr6.sin6_family = AF_INET6;
351 outgoing->t.addr6.sin6_port = htons(parser->dport);
352 memcpy(&outgoing->t.addr6.sin6_addr,
353 parser->daddr,
354 sizeof(outgoing->t.addr6.sin6_addr));
355 } else {
356 UNREACHABLE();
357 }
358
359 return do_req_connect_start(cx);
360 }
361
do_req_lookup(client_ctx * cx)362 static int do_req_lookup(client_ctx *cx) {
363 s5_ctx *parser;
364 conn *incoming;
365 conn *outgoing;
366
367 parser = &cx->parser;
368 incoming = &cx->incoming;
369 outgoing = &cx->outgoing;
370 ASSERT(incoming->rdstate == c_stop);
371 ASSERT(incoming->wrstate == c_stop);
372 ASSERT(outgoing->rdstate == c_stop);
373 ASSERT(outgoing->wrstate == c_stop);
374
375 if (outgoing->result < 0) {
376 /* TODO(bnoordhuis) Escape control characters in parser->daddr. */
377 pr_err("lookup error for \"%s\": %s",
378 parser->daddr,
379 uv_strerror(outgoing->result));
380 /* Send back a 'Host unreachable' reply. */
381 conn_write(incoming, "\5\4\0\1\0\0\0\0\0\0", 10);
382 return s_kill;
383 }
384
385 /* Don't make assumptions about the offset of sin_port/sin6_port. */
386 switch (outgoing->t.addr.sa_family) {
387 case AF_INET:
388 outgoing->t.addr4.sin_port = htons(parser->dport);
389 break;
390 case AF_INET6:
391 outgoing->t.addr6.sin6_port = htons(parser->dport);
392 break;
393 default:
394 UNREACHABLE();
395 }
396
397 return do_req_connect_start(cx);
398 }
399
400 /* Assumes that cx->outgoing.t.sa contains a valid AF_INET/AF_INET6 address. */
do_req_connect_start(client_ctx * cx)401 static int do_req_connect_start(client_ctx *cx) {
402 conn *incoming;
403 conn *outgoing;
404 int err;
405
406 incoming = &cx->incoming;
407 outgoing = &cx->outgoing;
408 ASSERT(incoming->rdstate == c_stop);
409 ASSERT(incoming->wrstate == c_stop);
410 ASSERT(outgoing->rdstate == c_stop);
411 ASSERT(outgoing->wrstate == c_stop);
412
413 if (!can_access(cx->sx, cx, &outgoing->t.addr)) {
414 pr_warn("connection not allowed by ruleset");
415 /* Send a 'Connection not allowed by ruleset' reply. */
416 conn_write(incoming, "\5\2\0\1\0\0\0\0\0\0", 10);
417 return s_kill;
418 }
419
420 err = conn_connect(outgoing);
421 if (err != 0) {
422 pr_err("connect error: %s\n", uv_strerror(err));
423 return do_kill(cx);
424 }
425
426 return s_req_connect;
427 }
428
do_req_connect(client_ctx * cx)429 static int do_req_connect(client_ctx *cx) {
430 const struct sockaddr_in6 *in6;
431 const struct sockaddr_in *in;
432 char addr_storage[sizeof(*in6)];
433 conn *incoming;
434 conn *outgoing;
435 uint8_t *buf;
436 int addrlen;
437
438 incoming = &cx->incoming;
439 outgoing = &cx->outgoing;
440 ASSERT(incoming->rdstate == c_stop);
441 ASSERT(incoming->wrstate == c_stop);
442 ASSERT(outgoing->rdstate == c_stop);
443 ASSERT(outgoing->wrstate == c_stop);
444
445 /* Build and send the reply. Not very pretty but gets the job done. */
446 buf = (uint8_t *) incoming->t.buf;
447 if (outgoing->result == 0) {
448 /* The RFC mandates that the SOCKS server must include the local port
449 * and address in the reply. So that's what we do.
450 */
451 addrlen = sizeof(addr_storage);
452 CHECK(0 == uv_tcp_getsockname(&outgoing->handle.tcp,
453 (struct sockaddr *) addr_storage,
454 &addrlen));
455 buf[0] = 5; /* Version. */
456 buf[1] = 0; /* Success. */
457 buf[2] = 0; /* Reserved. */
458 if (addrlen == sizeof(*in)) {
459 buf[3] = 1; /* IPv4. */
460 in = (const struct sockaddr_in *) &addr_storage;
461 memcpy(buf + 4, &in->sin_addr, 4);
462 memcpy(buf + 8, &in->sin_port, 2);
463 conn_write(incoming, buf, 10);
464 } else if (addrlen == sizeof(*in6)) {
465 buf[3] = 4; /* IPv6. */
466 in6 = (const struct sockaddr_in6 *) &addr_storage;
467 memcpy(buf + 4, &in6->sin6_addr, 16);
468 memcpy(buf + 20, &in6->sin6_port, 2);
469 conn_write(incoming, buf, 22);
470 } else {
471 UNREACHABLE();
472 }
473 return s_proxy_start;
474 } else {
475 pr_err("upstream connection error: %s\n", uv_strerror(outgoing->result));
476 /* Send a 'Connection refused' reply. */
477 conn_write(incoming, "\5\5\0\1\0\0\0\0\0\0", 10);
478 return s_kill;
479 }
480
481 UNREACHABLE();
482 return s_kill;
483 }
484
do_proxy_start(client_ctx * cx)485 static int do_proxy_start(client_ctx *cx) {
486 conn *incoming;
487 conn *outgoing;
488
489 incoming = &cx->incoming;
490 outgoing = &cx->outgoing;
491 ASSERT(incoming->rdstate == c_stop);
492 ASSERT(incoming->wrstate == c_done);
493 ASSERT(outgoing->rdstate == c_stop);
494 ASSERT(outgoing->wrstate == c_stop);
495 incoming->wrstate = c_stop;
496
497 if (incoming->result < 0) {
498 pr_err("write error: %s", uv_strerror(incoming->result));
499 return do_kill(cx);
500 }
501
502 conn_read(incoming);
503 conn_read(outgoing);
504 return s_proxy;
505 }
506
507 /* Proxy incoming data back and forth. */
do_proxy(client_ctx * cx)508 static int do_proxy(client_ctx *cx) {
509 if (conn_cycle("client", &cx->incoming, &cx->outgoing)) {
510 return do_kill(cx);
511 }
512
513 if (conn_cycle("upstream", &cx->outgoing, &cx->incoming)) {
514 return do_kill(cx);
515 }
516
517 return s_proxy;
518 }
519
do_kill(client_ctx * cx)520 static int do_kill(client_ctx *cx) {
521 int new_state;
522
523 if (cx->state >= s_almost_dead_0) {
524 return cx->state;
525 }
526
527 /* Try to cancel the request. The callback still runs but if the
528 * cancellation succeeded, it gets called with status=UV_ECANCELED.
529 */
530 new_state = s_almost_dead_1;
531 if (cx->state == s_req_lookup) {
532 new_state = s_almost_dead_0;
533 uv_cancel(&cx->outgoing.t.req);
534 }
535
536 conn_close(&cx->incoming);
537 conn_close(&cx->outgoing);
538 return new_state;
539 }
540
do_almost_dead(client_ctx * cx)541 static int do_almost_dead(client_ctx *cx) {
542 ASSERT(cx->state >= s_almost_dead_0);
543 return cx->state + 1; /* Another finalizer completed. */
544 }
545
conn_cycle(const char * who,conn * a,conn * b)546 static int conn_cycle(const char *who, conn *a, conn *b) {
547 if (a->result < 0) {
548 if (a->result != UV_EOF) {
549 pr_err("%s error: %s", who, uv_strerror(a->result));
550 }
551 return -1;
552 }
553
554 if (b->result < 0) {
555 return -1;
556 }
557
558 if (a->wrstate == c_done) {
559 a->wrstate = c_stop;
560 }
561
562 /* The logic is as follows: read when we don't write and write when we don't
563 * read. That gives us back-pressure handling for free because if the peer
564 * sends data faster than we consume it, TCP congestion control kicks in.
565 */
566 if (a->wrstate == c_stop) {
567 if (b->rdstate == c_stop) {
568 conn_read(b);
569 } else if (b->rdstate == c_done) {
570 conn_write(a, b->t.buf, b->result);
571 b->rdstate = c_stop; /* Triggers the call to conn_read() above. */
572 }
573 }
574
575 return 0;
576 }
577
conn_timer_reset(conn * c)578 static void conn_timer_reset(conn *c) {
579 CHECK(0 == uv_timer_start(&c->timer_handle,
580 conn_timer_expire,
581 c->idle_timeout,
582 0));
583 }
584
conn_timer_expire(uv_timer_t * handle,int status)585 static void conn_timer_expire(uv_timer_t *handle, int status) {
586 conn *c;
587
588 CHECK(0 == status);
589 c = CONTAINER_OF(handle, conn, timer_handle);
590 c->result = UV_ETIMEDOUT;
591 do_next(c->client);
592 }
593
conn_getaddrinfo(conn * c,const char * hostname)594 static void conn_getaddrinfo(conn *c, const char *hostname) {
595 struct addrinfo hints;
596
597 memset(&hints, 0, sizeof(hints));
598 hints.ai_family = AF_UNSPEC;
599 hints.ai_socktype = SOCK_STREAM;
600 hints.ai_protocol = IPPROTO_TCP;
601 CHECK(0 == uv_getaddrinfo(c->client->sx->loop,
602 &c->t.addrinfo_req,
603 conn_getaddrinfo_done,
604 hostname,
605 NULL,
606 &hints));
607 conn_timer_reset(c);
608 }
609
conn_getaddrinfo_done(uv_getaddrinfo_t * req,int status,struct addrinfo * ai)610 static void conn_getaddrinfo_done(uv_getaddrinfo_t *req,
611 int status,
612 struct addrinfo *ai) {
613 conn *c;
614
615 c = CONTAINER_OF(req, conn, t.addrinfo_req);
616 c->result = status;
617
618 if (status == 0) {
619 /* FIXME(bnoordhuis) Should try all addresses. */
620 if (ai->ai_family == AF_INET) {
621 c->t.addr4 = *(const struct sockaddr_in *) ai->ai_addr;
622 } else if (ai->ai_family == AF_INET6) {
623 c->t.addr6 = *(const struct sockaddr_in6 *) ai->ai_addr;
624 } else {
625 UNREACHABLE();
626 }
627 }
628
629 uv_freeaddrinfo(ai);
630 do_next(c->client);
631 }
632
633 /* Assumes that c->t.sa contains a valid AF_INET or AF_INET6 address. */
conn_connect(conn * c)634 static int conn_connect(conn *c) {
635 ASSERT(c->t.addr.sa_family == AF_INET ||
636 c->t.addr.sa_family == AF_INET6);
637 conn_timer_reset(c);
638 return uv_tcp_connect(&c->t.connect_req,
639 &c->handle.tcp,
640 &c->t.addr,
641 conn_connect_done);
642 }
643
conn_connect_done(uv_connect_t * req,int status)644 static void conn_connect_done(uv_connect_t *req, int status) {
645 conn *c;
646
647 if (status == UV_ECANCELED) {
648 return; /* Handle has been closed. */
649 }
650
651 c = CONTAINER_OF(req, conn, t.connect_req);
652 c->result = status;
653 do_next(c->client);
654 }
655
conn_read(conn * c)656 static void conn_read(conn *c) {
657 ASSERT(c->rdstate == c_stop);
658 CHECK(0 == uv_read_start(&c->handle.stream, conn_alloc, conn_read_done));
659 c->rdstate = c_busy;
660 conn_timer_reset(c);
661 }
662
conn_read_done(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)663 static void conn_read_done(uv_stream_t *handle,
664 ssize_t nread,
665 const uv_buf_t *buf) {
666 conn *c;
667
668 c = CONTAINER_OF(handle, conn, handle);
669 ASSERT(c->t.buf == buf->base);
670 ASSERT(c->rdstate == c_busy);
671 c->rdstate = c_done;
672 c->result = nread;
673
674 uv_read_stop(&c->handle.stream);
675 do_next(c->client);
676 }
677
conn_alloc(uv_handle_t * handle,size_t size,uv_buf_t * buf)678 static void conn_alloc(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
679 conn *c;
680
681 c = CONTAINER_OF(handle, conn, handle);
682 ASSERT(c->rdstate == c_busy);
683 buf->base = c->t.buf;
684 buf->len = sizeof(c->t.buf);
685 }
686
conn_write(conn * c,const void * data,unsigned int len)687 static void conn_write(conn *c, const void *data, unsigned int len) {
688 uv_buf_t buf;
689
690 ASSERT(c->wrstate == c_stop || c->wrstate == c_done);
691 c->wrstate = c_busy;
692
693 /* It's okay to cast away constness here, uv_write() won't modify the
694 * memory.
695 */
696 buf.base = (char *) data;
697 buf.len = len;
698
699 CHECK(0 == uv_write(&c->write_req,
700 &c->handle.stream,
701 &buf,
702 1,
703 conn_write_done));
704 conn_timer_reset(c);
705 }
706
conn_write_done(uv_write_t * req,int status)707 static void conn_write_done(uv_write_t *req, int status) {
708 conn *c;
709
710 if (status == UV_ECANCELED) {
711 return; /* Handle has been closed. */
712 }
713
714 c = CONTAINER_OF(req, conn, write_req);
715 ASSERT(c->wrstate == c_busy);
716 c->wrstate = c_done;
717 c->result = status;
718 do_next(c->client);
719 }
720
conn_close(conn * c)721 static void conn_close(conn *c) {
722 ASSERT(c->rdstate != c_dead);
723 ASSERT(c->wrstate != c_dead);
724 c->rdstate = c_dead;
725 c->wrstate = c_dead;
726 c->timer_handle.data = c;
727 c->handle.handle.data = c;
728 uv_close(&c->handle.handle, conn_close_done);
729 uv_close((uv_handle_t *) &c->timer_handle, conn_close_done);
730 }
731
conn_close_done(uv_handle_t * handle)732 static void conn_close_done(uv_handle_t *handle) {
733 conn *c;
734
735 c = handle->data;
736 do_next(c->client);
737 }
738