1 /* $OpenBSD: ioev.c,v 1.49 2023/02/08 08:20:54 tb Exp $ */ 2 /* 3 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> 4 * 5 * Permission to use, copy, modify, and distribute this software for any 6 * purpose with or without fee is hereby granted, provided that the above 7 * copyright notice and this permission notice appear in all copies. 8 * 9 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 10 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 11 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 12 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 13 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 14 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 15 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 */ 17 18 #include <sys/socket.h> 19 20 #include <errno.h> 21 #include <event.h> 22 #include <fcntl.h> 23 #include <inttypes.h> 24 #include <stdlib.h> 25 #include <string.h> 26 #include <stdio.h> 27 #ifdef IO_TLS 28 #include <tls.h> 29 #endif 30 #include <unistd.h> 31 32 #include "ioev.h" 33 #include "iobuf.h" 34 #include "log.h" 35 36 enum { 37 IO_STATE_NONE, 38 IO_STATE_CONNECT, 39 IO_STATE_CONNECT_TLS, 40 IO_STATE_ACCEPT_TLS, 41 IO_STATE_UP, 42 43 IO_STATE_MAX, 44 }; 45 46 #define IO_PAUSE_IN IO_IN 47 #define IO_PAUSE_OUT IO_OUT 48 #define IO_READ 0x04 49 #define IO_WRITE 0x08 50 #define IO_RW (IO_READ | IO_WRITE) 51 #define IO_RESET 0x10 /* internal */ 52 #define IO_HELD 0x20 /* internal */ 53 54 struct io { 55 int sock; 56 void *arg; 57 void (*cb)(struct io*, int, void *); 58 struct iobuf iobuf; 59 size_t lowat; 60 int timeout; 61 int flags; 62 int state; 63 struct event ev; 64 struct tls *tls; 65 66 const char *error; /* only valid immediately on callback */ 67 }; 68 69 const char* io_strflags(int); 70 const char* io_evstr(short); 71 72 void _io_init(void); 73 void io_hold(struct io *); 74 void io_release(struct io *); 75 void io_callback(struct io*, int); 76 void io_dispatch(int, short, void *); 77 void io_dispatch_connect(int, short, void *); 78 size_t io_pending(struct io *); 79 size_t io_queued(struct io*); 80 void io_reset(struct io *, short, void (*)(int, short, void*)); 81 void io_frame_enter(const char *, struct io *, int); 82 void io_frame_leave(struct io *); 83 84 #ifdef IO_TLS 85 void io_dispatch_handshake_tls(int, short, void *); 86 void io_dispatch_accept_tls(int, short, void *); 87 void io_dispatch_connect_tls(int, short, void *); 88 void io_dispatch_read_tls(int, short, void *); 89 void io_dispatch_write_tls(int, short, void *); 90 void io_reload_tls(struct io *io); 91 #endif 92 93 static struct io *current = NULL; 94 static uint64_t frame = 0; 95 static int _io_debug = 0; 96 97 #define io_debug(args...) do { if (_io_debug) printf(args); } while(0) 98 99 100 const char* 101 io_strio(struct io *io) 102 { 103 static char buf[128]; 104 char ssl[128]; 105 106 ssl[0] = '\0'; 107 #ifdef IO_TLS 108 if (io->tls) { 109 (void)snprintf(ssl, sizeof ssl, " tls=%s:%s", 110 tls_conn_version(io->tls), 111 tls_conn_cipher(io->tls)); 112 } 113 #endif 114 115 (void)snprintf(buf, sizeof buf, 116 "<io:%p fd=%d to=%d fl=%s%s ib=%zu ob=%zu>", 117 io, io->sock, io->timeout, io_strflags(io->flags), ssl, 118 io_pending(io), io_queued(io)); 119 120 return (buf); 121 } 122 123 #define CASE(x) case x : return #x 124 125 const char* 126 io_strevent(int evt) 127 { 128 static char buf[32]; 129 130 switch (evt) { 131 CASE(IO_CONNECTED); 132 CASE(IO_TLSREADY); 133 CASE(IO_DATAIN); 134 CASE(IO_LOWAT); 135 CASE(IO_DISCONNECTED); 136 CASE(IO_TIMEOUT); 137 CASE(IO_ERROR); 138 default: 139 (void)snprintf(buf, sizeof(buf), "IO_? %d", evt); 140 return buf; 141 } 142 } 143 144 void 145 io_set_nonblocking(int fd) 146 { 147 int flags; 148 149 if ((flags = fcntl(fd, F_GETFL)) == -1) 150 fatal("io_set_blocking:fcntl(F_GETFL)"); 151 152 flags |= O_NONBLOCK; 153 154 if (fcntl(fd, F_SETFL, flags) == -1) 155 fatal("io_set_blocking:fcntl(F_SETFL)"); 156 } 157 158 void 159 io_set_nolinger(int fd) 160 { 161 struct linger l; 162 163 memset(&l, 0, sizeof(l)); 164 if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, sizeof(l)) == -1) 165 fatal("io_set_linger:setsockopt"); 166 } 167 168 /* 169 * Event framing must not rely on an io pointer to refer to the "same" io 170 * throughout the frame, because this is not always the case: 171 * 172 * 1) enter(addr0) -> free(addr0) -> leave(addr0) = SEGV 173 * 2) enter(addr0) -> free(addr0) -> malloc == addr0 -> leave(addr0) = BAD! 174 * 175 * In both case, the problem is that the io is freed in the callback, so 176 * the pointer becomes invalid. If that happens, the user is required to 177 * call io_clear, so we can adapt the frame state there. 178 */ 179 void 180 io_frame_enter(const char *where, struct io *io, int ev) 181 { 182 io_debug("\n=== %" PRIu64 " ===\n" 183 "io_frame_enter(%s, %s, %s)\n", 184 frame, where, io_evstr(ev), io_strio(io)); 185 186 if (current) 187 fatalx("io_frame_enter: interleaved frames"); 188 189 current = io; 190 191 io_hold(io); 192 } 193 194 void 195 io_frame_leave(struct io *io) 196 { 197 io_debug("io_frame_leave(%" PRIu64 ")\n", frame); 198 199 if (current && current != io) 200 fatalx("io_frame_leave: io mismatch"); 201 202 /* io has been cleared */ 203 if (current == NULL) 204 goto done; 205 206 /* TODO: There is a possible optimization there: 207 * In a typical half-duplex request/response scenario, 208 * the io is waiting to read a request, and when done, it queues 209 * the response in the output buffer and goes to write mode. 210 * There, the write event is set and will be triggered in the next 211 * event frame. In most case, the write call could be done 212 * immediately as part of the last read frame, thus avoiding to go 213 * through the event loop machinery. So, as an optimisation, we 214 * could detect that case here and force an event dispatching. 215 */ 216 217 /* Reload the io if it has not been reset already. */ 218 io_release(io); 219 current = NULL; 220 done: 221 io_debug("=== /%" PRIu64 "\n", frame); 222 223 frame += 1; 224 } 225 226 void 227 _io_init(void) 228 { 229 static int init = 0; 230 231 if (init) 232 return; 233 234 init = 1; 235 _io_debug = getenv("IO_DEBUG") != NULL; 236 } 237 238 struct io * 239 io_new(void) 240 { 241 struct io *io; 242 243 _io_init(); 244 245 if ((io = calloc(1, sizeof(*io))) == NULL) 246 return NULL; 247 248 io->sock = -1; 249 io->timeout = -1; 250 251 if (iobuf_init(&io->iobuf, 0, 0) == -1) { 252 free(io); 253 return NULL; 254 } 255 256 return io; 257 } 258 259 void 260 io_free(struct io *io) 261 { 262 io_debug("io_clear(%p)\n", io); 263 264 /* the current io is virtually dead */ 265 if (io == current) 266 current = NULL; 267 268 #ifdef IO_TLS 269 tls_free(io->tls); 270 io->tls = NULL; 271 #endif 272 273 if (event_initialized(&io->ev)) 274 event_del(&io->ev); 275 if (io->sock != -1) { 276 close(io->sock); 277 io->sock = -1; 278 } 279 280 iobuf_clear(&io->iobuf); 281 free(io); 282 } 283 284 void 285 io_hold(struct io *io) 286 { 287 io_debug("io_enter(%p)\n", io); 288 289 if (io->flags & IO_HELD) 290 fatalx("io_hold: io is already held"); 291 292 io->flags &= ~IO_RESET; 293 io->flags |= IO_HELD; 294 } 295 296 void 297 io_release(struct io *io) 298 { 299 if (!(io->flags & IO_HELD)) 300 fatalx("io_release: io is not held"); 301 302 io->flags &= ~IO_HELD; 303 if (!(io->flags & IO_RESET)) 304 io_reload(io); 305 } 306 307 void 308 io_set_fd(struct io *io, int fd) 309 { 310 io->sock = fd; 311 if (fd != -1) 312 io_reload(io); 313 } 314 315 void 316 io_set_callback(struct io *io, void(*cb)(struct io *, int, void *), void *arg) 317 { 318 io->cb = cb; 319 io->arg = arg; 320 } 321 322 void 323 io_set_timeout(struct io *io, int msec) 324 { 325 io_debug("io_set_timeout(%p, %d)\n", io, msec); 326 327 io->timeout = msec; 328 } 329 330 void 331 io_set_lowat(struct io *io, size_t lowat) 332 { 333 io_debug("io_set_lowat(%p, %zu)\n", io, lowat); 334 335 io->lowat = lowat; 336 } 337 338 void 339 io_pause(struct io *io, int dir) 340 { 341 io_debug("io_pause(%p, %x)\n", io, dir); 342 343 io->flags |= dir & (IO_PAUSE_IN | IO_PAUSE_OUT); 344 io_reload(io); 345 } 346 347 void 348 io_resume(struct io *io, int dir) 349 { 350 io_debug("io_resume(%p, %x)\n", io, dir); 351 352 io->flags &= ~(dir & (IO_PAUSE_IN | IO_PAUSE_OUT)); 353 io_reload(io); 354 } 355 356 void 357 io_set_read(struct io *io) 358 { 359 int mode; 360 361 io_debug("io_set_read(%p)\n", io); 362 363 mode = io->flags & IO_RW; 364 if (!(mode == 0 || mode == IO_WRITE)) 365 fatalx("io_set_read: full-duplex or reading"); 366 367 io->flags &= ~IO_RW; 368 io->flags |= IO_READ; 369 io_reload(io); 370 } 371 372 void 373 io_set_write(struct io *io) 374 { 375 int mode; 376 377 io_debug("io_set_write(%p)\n", io); 378 379 mode = io->flags & IO_RW; 380 if (!(mode == 0 || mode == IO_READ)) 381 fatalx("io_set_write: full-duplex or writing"); 382 383 io->flags &= ~IO_RW; 384 io->flags |= IO_WRITE; 385 io_reload(io); 386 } 387 388 const char * 389 io_error(struct io *io) 390 { 391 return io->error; 392 } 393 394 struct tls * 395 io_tls(struct io *io) 396 { 397 return io->tls; 398 } 399 400 int 401 io_fileno(struct io *io) 402 { 403 return io->sock; 404 } 405 406 int 407 io_paused(struct io *io, int what) 408 { 409 return (io->flags & (IO_PAUSE_IN | IO_PAUSE_OUT)) == what; 410 } 411 412 /* 413 * Buffered output functions 414 */ 415 416 int 417 io_write(struct io *io, const void *buf, size_t len) 418 { 419 int r; 420 421 r = iobuf_queue(&io->iobuf, buf, len); 422 423 io_reload(io); 424 425 return r; 426 } 427 428 int 429 io_writev(struct io *io, const struct iovec *iov, int iovcount) 430 { 431 int r; 432 433 r = iobuf_queuev(&io->iobuf, iov, iovcount); 434 435 io_reload(io); 436 437 return r; 438 } 439 440 int 441 io_print(struct io *io, const char *s) 442 { 443 return io_write(io, s, strlen(s)); 444 } 445 446 int 447 io_printf(struct io *io, const char *fmt, ...) 448 { 449 va_list ap; 450 int r; 451 452 va_start(ap, fmt); 453 r = io_vprintf(io, fmt, ap); 454 va_end(ap); 455 456 return r; 457 } 458 459 int 460 io_vprintf(struct io *io, const char *fmt, va_list ap) 461 { 462 463 char *buf; 464 int len; 465 466 len = vasprintf(&buf, fmt, ap); 467 if (len == -1) 468 return -1; 469 len = io_write(io, buf, len); 470 free(buf); 471 472 return len; 473 } 474 475 size_t 476 io_queued(struct io *io) 477 { 478 return iobuf_queued(&io->iobuf); 479 } 480 481 /* 482 * Buffered input functions 483 */ 484 485 void * 486 io_data(struct io *io) 487 { 488 return iobuf_data(&io->iobuf); 489 } 490 491 size_t 492 io_datalen(struct io *io) 493 { 494 return iobuf_len(&io->iobuf); 495 } 496 497 char * 498 io_getline(struct io *io, size_t *sz) 499 { 500 return iobuf_getline(&io->iobuf, sz); 501 } 502 503 void 504 io_drop(struct io *io, size_t sz) 505 { 506 return iobuf_drop(&io->iobuf, sz); 507 } 508 509 510 #define IO_READING(io) (((io)->flags & IO_RW) != IO_WRITE) 511 #define IO_WRITING(io) (((io)->flags & IO_RW) != IO_READ) 512 513 /* 514 * Setup the necessary events as required by the current io state, 515 * honouring duplex mode and i/o pauses. 516 */ 517 void 518 io_reload(struct io *io) 519 { 520 short events; 521 522 /* io will be reloaded at release time */ 523 if (io->flags & IO_HELD) 524 return; 525 526 iobuf_normalize(&io->iobuf); 527 528 #ifdef IO_TLS 529 if (io->tls) { 530 io_reload_tls(io); 531 return; 532 } 533 #endif 534 535 io_debug("io_reload(%p)\n", io); 536 537 events = 0; 538 if (IO_READING(io) && !(io->flags & IO_PAUSE_IN)) 539 events = EV_READ; 540 if (IO_WRITING(io) && !(io->flags & IO_PAUSE_OUT) && io_queued(io)) 541 events |= EV_WRITE; 542 543 io_reset(io, events, io_dispatch); 544 } 545 546 /* Set the requested event. */ 547 void 548 io_reset(struct io *io, short events, void (*dispatch)(int, short, void*)) 549 { 550 struct timeval tv, *ptv; 551 552 io_debug("io_reset(%p, %s, %p) -> %s\n", 553 io, io_evstr(events), dispatch, io_strio(io)); 554 555 /* 556 * Indicate that the event has already been reset so that reload 557 * is not called on frame_leave. 558 */ 559 io->flags |= IO_RESET; 560 561 if (event_initialized(&io->ev)) 562 event_del(&io->ev); 563 564 /* 565 * The io is paused by the user, so we don't want the timeout to be 566 * effective. 567 */ 568 if (events == 0) 569 return; 570 571 event_set(&io->ev, io->sock, events, dispatch, io); 572 if (io->timeout >= 0) { 573 tv.tv_sec = io->timeout / 1000; 574 tv.tv_usec = (io->timeout % 1000) * 1000; 575 ptv = &tv; 576 } else 577 ptv = NULL; 578 579 event_add(&io->ev, ptv); 580 } 581 582 size_t 583 io_pending(struct io *io) 584 { 585 return iobuf_len(&io->iobuf); 586 } 587 588 const char* 589 io_strflags(int flags) 590 { 591 static char buf[64]; 592 593 buf[0] = '\0'; 594 595 switch (flags & IO_RW) { 596 case 0: 597 (void)strlcat(buf, "rw", sizeof buf); 598 break; 599 case IO_READ: 600 (void)strlcat(buf, "R", sizeof buf); 601 break; 602 case IO_WRITE: 603 (void)strlcat(buf, "W", sizeof buf); 604 break; 605 case IO_RW: 606 (void)strlcat(buf, "RW", sizeof buf); 607 break; 608 } 609 610 if (flags & IO_PAUSE_IN) 611 (void)strlcat(buf, ",F_PI", sizeof buf); 612 if (flags & IO_PAUSE_OUT) 613 (void)strlcat(buf, ",F_PO", sizeof buf); 614 615 return buf; 616 } 617 618 const char* 619 io_evstr(short ev) 620 { 621 static char buf[64]; 622 char buf2[16]; 623 int n; 624 625 n = 0; 626 buf[0] = '\0'; 627 628 if (ev == 0) { 629 (void)strlcat(buf, "<NONE>", sizeof(buf)); 630 return buf; 631 } 632 633 if (ev & EV_TIMEOUT) { 634 (void)strlcat(buf, "EV_TIMEOUT", sizeof(buf)); 635 ev &= ~EV_TIMEOUT; 636 n++; 637 } 638 639 if (ev & EV_READ) { 640 if (n) 641 (void)strlcat(buf, "|", sizeof(buf)); 642 (void)strlcat(buf, "EV_READ", sizeof(buf)); 643 ev &= ~EV_READ; 644 n++; 645 } 646 647 if (ev & EV_WRITE) { 648 if (n) 649 (void)strlcat(buf, "|", sizeof(buf)); 650 (void)strlcat(buf, "EV_WRITE", sizeof(buf)); 651 ev &= ~EV_WRITE; 652 n++; 653 } 654 655 if (ev & EV_SIGNAL) { 656 if (n) 657 (void)strlcat(buf, "|", sizeof(buf)); 658 (void)strlcat(buf, "EV_SIGNAL", sizeof(buf)); 659 ev &= ~EV_SIGNAL; 660 n++; 661 } 662 663 if (ev) { 664 if (n) 665 (void)strlcat(buf, "|", sizeof(buf)); 666 (void)strlcat(buf, "EV_?=0x", sizeof(buf)); 667 (void)snprintf(buf2, sizeof(buf2), "%hx", ev); 668 (void)strlcat(buf, buf2, sizeof(buf)); 669 } 670 671 return buf; 672 } 673 674 void 675 io_dispatch(int fd, short ev, void *humppa) 676 { 677 struct io *io = humppa; 678 size_t w; 679 ssize_t n; 680 int saved_errno; 681 682 io_frame_enter("io_dispatch", io, ev); 683 684 if (ev == EV_TIMEOUT) { 685 io_callback(io, IO_TIMEOUT); 686 goto leave; 687 } 688 689 if (ev & EV_WRITE && (w = io_queued(io))) { 690 if ((n = iobuf_write(&io->iobuf, io->sock)) < 0) { 691 if (n == IOBUF_WANT_WRITE) /* kqueue bug? */ 692 goto read; 693 if (n == IOBUF_CLOSED) 694 io_callback(io, IO_DISCONNECTED); 695 else { 696 saved_errno = errno; 697 io->error = strerror(errno); 698 errno = saved_errno; 699 io_callback(io, IO_ERROR); 700 } 701 goto leave; 702 } 703 if (w > io->lowat && w - n <= io->lowat) 704 io_callback(io, IO_LOWAT); 705 } 706 read: 707 708 if (ev & EV_READ) { 709 iobuf_normalize(&io->iobuf); 710 if ((n = iobuf_read(&io->iobuf, io->sock)) < 0) { 711 if (n == IOBUF_CLOSED) 712 io_callback(io, IO_DISCONNECTED); 713 else { 714 saved_errno = errno; 715 io->error = strerror(errno); 716 errno = saved_errno; 717 io_callback(io, IO_ERROR); 718 } 719 goto leave; 720 } 721 if (n) 722 io_callback(io, IO_DATAIN); 723 } 724 725 leave: 726 io_frame_leave(io); 727 } 728 729 void 730 io_callback(struct io *io, int evt) 731 { 732 io->cb(io, evt, io->arg); 733 } 734 735 int 736 io_connect(struct io *io, const struct sockaddr *sa, const struct sockaddr *bsa) 737 { 738 int sock, errno_save; 739 740 if ((sock = socket(sa->sa_family, SOCK_STREAM, 0)) == -1) 741 goto fail; 742 743 io_set_nonblocking(sock); 744 io_set_nolinger(sock); 745 746 if (bsa && bind(sock, bsa, bsa->sa_len) == -1) 747 goto fail; 748 749 if (connect(sock, sa, sa->sa_len) == -1) 750 if (errno != EINPROGRESS) 751 goto fail; 752 753 io->sock = sock; 754 io_reset(io, EV_WRITE, io_dispatch_connect); 755 756 return (sock); 757 758 fail: 759 if (sock != -1) { 760 errno_save = errno; 761 close(sock); 762 errno = errno_save; 763 io->error = strerror(errno); 764 } 765 return (-1); 766 } 767 768 void 769 io_dispatch_connect(int fd, short ev, void *humppa) 770 { 771 struct io *io = humppa; 772 int r, e; 773 socklen_t sl; 774 775 io_frame_enter("io_dispatch_connect", io, ev); 776 777 if (ev == EV_TIMEOUT) { 778 close(fd); 779 io->sock = -1; 780 io_callback(io, IO_TIMEOUT); 781 } else { 782 sl = sizeof(e); 783 r = getsockopt(fd, SOL_SOCKET, SO_ERROR, &e, &sl); 784 if (r == -1) { 785 log_warn("io_dispatch_connect: getsockopt"); 786 e = errno; 787 } 788 if (e) { 789 close(fd); 790 io->sock = -1; 791 io->error = strerror(e); 792 io_callback(io, e == ETIMEDOUT ? IO_TIMEOUT : IO_ERROR); 793 } 794 else { 795 io->state = IO_STATE_UP; 796 io_callback(io, IO_CONNECTED); 797 } 798 } 799 800 io_frame_leave(io); 801 } 802 803 #ifdef IO_TLS 804 int 805 io_connect_tls(struct io *io, struct tls *tls, const char *hostname) 806 { 807 int mode; 808 809 mode = io->flags & IO_RW; 810 if (mode != IO_WRITE) 811 fatalx("io_connect_tls: expect IO_WRITE mode"); 812 813 if (io->tls) 814 fatalx("io_connect_tls: TLS already started"); 815 816 if (tls_connect_socket(tls, io->sock, hostname) == -1) { 817 io->error = tls_error(tls); 818 return (-1); 819 } 820 821 io->tls = tls; 822 io->state = IO_STATE_CONNECT_TLS; 823 io_reset(io, EV_READ|EV_WRITE, io_dispatch_handshake_tls); 824 825 return (0); 826 } 827 828 int 829 io_accept_tls(struct io *io, struct tls *tls) 830 { 831 int mode; 832 833 mode = io->flags & IO_RW; 834 if (mode != IO_READ) 835 fatalx("io_accept_tls: expect IO_READ mode"); 836 837 if (io->tls) 838 fatalx("io_accept_tls: TLS already started"); 839 840 if (tls_accept_socket(tls, &io->tls, io->sock) == -1) { 841 io->error = tls_error(tls); 842 return (-1); 843 } 844 845 io->state = IO_STATE_ACCEPT_TLS; 846 io_reset(io, EV_READ|EV_WRITE, io_dispatch_handshake_tls); 847 848 return (0); 849 } 850 851 void 852 io_dispatch_handshake_tls(int fd, short event, void *humppa) 853 { 854 struct io *io = humppa; 855 int ret; 856 857 io_frame_enter("io_dispatch_handshake_tls", io, event); 858 859 if (event == EV_TIMEOUT) { 860 io_callback(io, IO_TIMEOUT); 861 goto leave; 862 } 863 864 if ((ret = tls_handshake(io->tls)) == 0) { 865 io->state = IO_STATE_UP; 866 io_callback(io, IO_TLSREADY); 867 goto leave; 868 } 869 if (ret == TLS_WANT_POLLIN) 870 io_reset(io, EV_READ, io_dispatch_handshake_tls); 871 else if (ret == TLS_WANT_POLLOUT) 872 io_reset(io, EV_WRITE, io_dispatch_handshake_tls); 873 else { 874 io->error = tls_error(io->tls); 875 io_callback(io, IO_ERROR); 876 } 877 878 leave: 879 io_frame_leave(io); 880 return; 881 } 882 883 void 884 io_dispatch_read_tls(int fd, short event, void *humppa) 885 { 886 struct io *io = humppa; 887 int n; 888 889 io_frame_enter("io_dispatch_read_tls", io, event); 890 891 if (event == EV_TIMEOUT) { 892 io_callback(io, IO_TIMEOUT); 893 goto leave; 894 } 895 896 again: 897 iobuf_normalize(&io->iobuf); 898 switch ((n = iobuf_read_tls(&io->iobuf, io->tls))) { 899 case IOBUF_WANT_READ: 900 io_reset(io, EV_READ, io_dispatch_read_tls); 901 break; 902 case IOBUF_WANT_WRITE: 903 io_reset(io, EV_WRITE, io_dispatch_read_tls); 904 break; 905 case IOBUF_CLOSED: 906 io_callback(io, IO_DISCONNECTED); 907 break; 908 case IOBUF_ERROR: 909 io->error = tls_error(io->tls); 910 io_callback(io, IO_ERROR); 911 break; 912 default: 913 io_debug("io_dispatch_read_tls(...) -> r=%d\n", n); 914 io_callback(io, IO_DATAIN); 915 if (current == io && IO_READING(io)) 916 goto again; 917 } 918 919 leave: 920 io_frame_leave(io); 921 } 922 923 void 924 io_dispatch_write_tls(int fd, short event, void *humppa) 925 { 926 struct io *io = humppa; 927 int n; 928 size_t w2, w; 929 930 io_frame_enter("io_dispatch_write_tls", io, event); 931 932 if (event == EV_TIMEOUT) { 933 io_callback(io, IO_TIMEOUT); 934 goto leave; 935 } 936 937 w = io_queued(io); 938 switch ((n = iobuf_write_tls(&io->iobuf, io->tls))) { 939 case IOBUF_WANT_READ: 940 io_reset(io, EV_READ, io_dispatch_write_tls); 941 break; 942 case IOBUF_WANT_WRITE: 943 io_reset(io, EV_WRITE, io_dispatch_write_tls); 944 break; 945 case IOBUF_CLOSED: 946 io_callback(io, IO_DISCONNECTED); 947 break; 948 case IOBUF_ERROR: 949 io->error = tls_error(io->tls); 950 io_callback(io, IO_ERROR); 951 break; 952 default: 953 io_debug("io_dispatch_write_tls(...) -> w=%d\n", n); 954 w2 = io_queued(io); 955 if (w > io->lowat && w2 <= io->lowat) 956 io_callback(io, IO_LOWAT); 957 break; 958 } 959 960 leave: 961 io_frame_leave(io); 962 } 963 964 void 965 io_reload_tls(struct io *io) 966 { 967 if (io->state != IO_STATE_UP) 968 fatalx("io_reload_tls: bad state"); 969 970 if (IO_READING(io) && !(io->flags & IO_PAUSE_IN)) { 971 io_reset(io, EV_READ, io_dispatch_read_tls); 972 return; 973 } 974 975 if (IO_WRITING(io) && !(io->flags & IO_PAUSE_OUT) && io_queued(io)) { 976 io_reset(io, EV_WRITE, io_dispatch_write_tls); 977 return; 978 } 979 980 /* paused */ 981 } 982 983 #endif /* IO_TLS */ 984