1 /* $OpenBSD: io.c,v 1.20 2021/06/30 13:10:04 claudio Exp $ */ 2 /* 3 * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv> 4 * 5 * Permission to use, copy, modify, and distribute this software for any 6 * purpose with or without fee is hereby granted, provided that the above 7 * copyright notice and this permission notice appear in all copies. 8 * 9 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 10 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 11 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 12 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 13 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 14 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 15 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 */ 17 #include <sys/stat.h> 18 19 #include <assert.h> 20 #include <endian.h> 21 #include <errno.h> 22 #include <poll.h> 23 #include <stdint.h> 24 #include <stdio.h> 25 #include <stdlib.h> 26 #include <string.h> 27 #include <unistd.h> 28 29 #include "extern.h" 30 31 /* 32 * A non-blocking check to see whether there's POLLIN data in fd. 33 * Returns <0 on failure, 0 if there's no data, >0 if there is. 34 */ 35 int 36 io_read_check(int fd) 37 { 38 struct pollfd pfd; 39 40 pfd.fd = fd; 41 pfd.events = POLLIN; 42 43 if (poll(&pfd, 1, 0) == -1) { 44 ERR("poll"); 45 return -1; 46 } 47 return (pfd.revents & POLLIN); 48 } 49 50 /* 51 * Write buffer to non-blocking descriptor. 52 * Returns zero on failure, non-zero on success (zero or more bytes). 53 * On success, fills in "sz" with the amount written. 54 */ 55 static int 56 io_write_nonblocking(int fd, const void *buf, size_t bsz, 57 size_t *sz) 58 { 59 struct pollfd pfd; 60 ssize_t wsz; 61 int c; 62 63 *sz = 0; 64 65 if (bsz == 0) 66 return 1; 67 68 pfd.fd = fd; 69 pfd.events = POLLOUT; 70 71 /* Poll and check for all possible errors. */ 72 73 if ((c = poll(&pfd, 1, poll_timeout)) == -1) { 74 ERR("poll"); 75 return 0; 76 } else if (c == 0) { 77 ERRX("poll: timeout"); 78 return 0; 79 } else if ((pfd.revents & (POLLERR|POLLNVAL))) { 80 ERRX("poll: bad fd"); 81 return 0; 82 } else if ((pfd.revents & POLLHUP)) { 83 ERRX("poll: hangup"); 84 return 0; 85 } else if (!(pfd.revents & POLLOUT)) { 86 ERRX("poll: unknown event"); 87 return 0; 88 } 89 90 /* Now the non-blocking write. */ 91 92 if ((wsz = write(fd, buf, bsz)) == -1) { 93 ERR("write"); 94 return 0; 95 } 96 97 *sz = wsz; 98 return 1; 99 } 100 101 /* 102 * Blocking write of the full size of the buffer. 103 * Returns 0 on failure, non-zero on success (all bytes written). 104 */ 105 static int 106 io_write_blocking(int fd, const void *buf, size_t sz) 107 { 108 size_t wsz; 109 int c; 110 111 while (sz > 0) { 112 c = io_write_nonblocking(fd, buf, sz, &wsz); 113 if (!c) { 114 ERRX1("io_write_nonblocking"); 115 return 0; 116 } else if (wsz == 0) { 117 ERRX("io_write_nonblocking: short write"); 118 return 0; 119 } 120 buf += wsz; 121 sz -= wsz; 122 } 123 124 return 1; 125 } 126 127 /* 128 * Write "buf" of size "sz" to non-blocking descriptor. 129 * Returns zero on failure, non-zero on success (all bytes written to 130 * the descriptor). 131 */ 132 int 133 io_write_buf(struct sess *sess, int fd, const void *buf, size_t sz) 134 { 135 int32_t tag, tagbuf; 136 size_t wsz; 137 int c; 138 139 if (!sess->mplex_writes) { 140 c = io_write_blocking(fd, buf, sz); 141 sess->total_write += sz; 142 return c; 143 } 144 145 while (sz > 0) { 146 wsz = sz & 0xFFFFFF; 147 tag = (7 << 24) + wsz; 148 tagbuf = htole32(tag); 149 if (!io_write_blocking(fd, &tagbuf, sizeof(tagbuf))) { 150 ERRX1("io_write_blocking"); 151 return 0; 152 } 153 if (!io_write_blocking(fd, buf, wsz)) { 154 ERRX1("io_write_blocking"); 155 return 0; 156 } 157 sess->total_write += wsz; 158 sz -= wsz; 159 buf += wsz; 160 } 161 162 return 1; 163 } 164 165 /* 166 * Write "line" (NUL-terminated) followed by a newline. 167 * Returns zero on failure, non-zero on succcess. 168 */ 169 int 170 io_write_line(struct sess *sess, int fd, const char *line) 171 { 172 173 if (!io_write_buf(sess, fd, line, strlen(line))) 174 ERRX1("io_write_buf"); 175 else if (!io_write_byte(sess, fd, '\n')) 176 ERRX1("io_write_byte"); 177 else 178 return 1; 179 180 return 0; 181 } 182 183 /* 184 * Read buffer from non-blocking descriptor. 185 * Returns zero on failure, non-zero on success (zero or more bytes). 186 */ 187 static int 188 io_read_nonblocking(int fd, void *buf, size_t bsz, size_t *sz) 189 { 190 struct pollfd pfd; 191 ssize_t rsz; 192 int c; 193 194 *sz = 0; 195 196 if (bsz == 0) 197 return 1; 198 199 pfd.fd = fd; 200 pfd.events = POLLIN; 201 202 /* Poll and check for all possible errors. */ 203 204 if ((c = poll(&pfd, 1, poll_timeout)) == -1) { 205 ERR("poll"); 206 return 0; 207 } else if (c == 0) { 208 ERRX("poll: timeout"); 209 return 0; 210 } else if ((pfd.revents & (POLLERR|POLLNVAL))) { 211 ERRX("poll: bad fd"); 212 return 0; 213 } else if (!(pfd.revents & (POLLIN|POLLHUP))) { 214 ERRX("poll: unknown event"); 215 return 0; 216 } 217 218 /* Now the non-blocking read, checking for EOF. */ 219 220 if ((rsz = read(fd, buf, bsz)) == -1) { 221 ERR("read"); 222 return 0; 223 } else if (rsz == 0) { 224 ERRX("unexpected end of file"); 225 return 0; 226 } 227 228 *sz = rsz; 229 return 1; 230 } 231 232 /* 233 * Blocking read of the full size of the buffer. 234 * This can be called from either the error type message or a regular 235 * message---or for that matter, multiplexed or not. 236 * Returns 0 on failure, non-zero on success (all bytes read). 237 */ 238 static int 239 io_read_blocking(int fd, void *buf, size_t sz) 240 { 241 size_t rsz; 242 int c; 243 244 while (sz > 0) { 245 c = io_read_nonblocking(fd, buf, sz, &rsz); 246 if (!c) { 247 ERRX1("io_read_nonblocking"); 248 return 0; 249 } else if (rsz == 0) { 250 ERRX("io_read_nonblocking: short read"); 251 return 0; 252 } 253 buf += rsz; 254 sz -= rsz; 255 } 256 257 return 1; 258 } 259 260 /* 261 * When we do a lot of writes in a row (such as when the sender emits 262 * the file list), the server might be sending us multiplexed log 263 * messages. 264 * If it sends too many, it clogs the socket. 265 * This function looks into the read buffer and clears out any log 266 * messages pending. 267 * If called when there are valid data reads available, this function 268 * does nothing. 269 * Returns zero on failure, non-zero on success. 270 */ 271 int 272 io_read_flush(struct sess *sess, int fd) 273 { 274 int32_t tagbuf, tag; 275 char mpbuf[1024]; 276 277 if (sess->mplex_read_remain) 278 return 1; 279 280 /* 281 * First, read the 4-byte multiplex tag. 282 * The first byte is the tag identifier (7 for normal 283 * data, !7 for out-of-band data), the last three are 284 * for the remaining data size. 285 */ 286 287 if (!io_read_blocking(fd, &tagbuf, sizeof(tagbuf))) { 288 ERRX1("io_read_blocking"); 289 return 0; 290 } 291 tag = le32toh(tagbuf); 292 sess->mplex_read_remain = tag & 0xFFFFFF; 293 tag >>= 24; 294 if (tag == 7) 295 return 1; 296 297 tag -= 7; 298 299 if (sess->mplex_read_remain > sizeof(mpbuf)) { 300 ERRX("multiplex buffer overflow"); 301 return 0; 302 } else if (sess->mplex_read_remain == 0) 303 return 1; 304 305 if (!io_read_blocking(fd, mpbuf, sess->mplex_read_remain)) { 306 ERRX1("io_read_blocking"); 307 return 0; 308 } 309 if (mpbuf[sess->mplex_read_remain - 1] == '\n') 310 mpbuf[--sess->mplex_read_remain] = '\0'; 311 312 /* 313 * Always print the server's messages, as the server 314 * will control its own log levelling. 315 */ 316 317 LOG0("%.*s", (int)sess->mplex_read_remain, mpbuf); 318 sess->mplex_read_remain = 0; 319 320 /* 321 * I only know that a tag of one means an error. 322 * This means that we should exit. 323 */ 324 325 if (tag == 1) { 326 ERRX1("error from remote host"); 327 return 0; 328 } 329 return 1; 330 } 331 332 /* 333 * Read buffer from non-blocking descriptor, possibly in multiplex read 334 * mode. 335 * Returns zero on failure, non-zero on success (all bytes read from 336 * the descriptor). 337 */ 338 int 339 io_read_buf(struct sess *sess, int fd, void *buf, size_t sz) 340 { 341 size_t rsz; 342 int c; 343 344 /* If we're not multiplexing, read directly. */ 345 346 if (!sess->mplex_reads) { 347 assert(sess->mplex_read_remain == 0); 348 c = io_read_blocking(fd, buf, sz); 349 sess->total_read += sz; 350 return c; 351 } 352 353 while (sz > 0) { 354 /* 355 * First, check to see if we have any regular data 356 * hanging around waiting to be read. 357 * If so, read the lesser of that data and whatever 358 * amount we currently want. 359 */ 360 361 if (sess->mplex_read_remain) { 362 rsz = sess->mplex_read_remain < sz ? 363 sess->mplex_read_remain : sz; 364 if (!io_read_blocking(fd, buf, rsz)) { 365 ERRX1("io_read_blocking"); 366 return 0; 367 } 368 sz -= rsz; 369 sess->mplex_read_remain -= rsz; 370 buf += rsz; 371 sess->total_read += rsz; 372 continue; 373 } 374 375 assert(sess->mplex_read_remain == 0); 376 if (!io_read_flush(sess, fd)) { 377 ERRX1("io_read_flush"); 378 return 0; 379 } 380 } 381 382 return 1; 383 } 384 385 /* 386 * Like io_write_buf(), but for a long (which is a composite type). 387 * Returns zero on failure, non-zero on success. 388 */ 389 int 390 io_write_ulong(struct sess *sess, int fd, uint64_t val) 391 { 392 uint64_t nv; 393 int64_t sval = (int64_t)val; 394 395 /* Short-circuit: send as an integer if possible. */ 396 397 if (sval <= INT32_MAX && sval >= 0) { 398 if (!io_write_int(sess, fd, (int32_t)val)) { 399 ERRX1("io_write_int"); 400 return 0; 401 } 402 return 1; 403 } 404 405 /* Otherwise, pad with -1 32-bit, then send 64-bit. */ 406 407 nv = htole64(val); 408 409 if (!io_write_int(sess, fd, -1)) 410 ERRX1("io_write_int"); 411 else if (!io_write_buf(sess, fd, &nv, sizeof(int64_t))) 412 ERRX1("io_write_buf"); 413 else 414 return 1; 415 416 return 0; 417 } 418 419 int 420 io_write_long(struct sess *sess, int fd, int64_t val) 421 { 422 return io_write_ulong(sess, fd, (uint64_t)val); 423 } 424 425 /* 426 * Like io_write_buf(), but for an unsigned integer. 427 * Returns zero on failure, non-zero on success. 428 */ 429 int 430 io_write_uint(struct sess *sess, int fd, uint32_t val) 431 { 432 uint32_t nv; 433 434 nv = htole32(val); 435 436 if (!io_write_buf(sess, fd, &nv, sizeof(uint32_t))) { 437 ERRX1("io_write_buf"); 438 return 0; 439 } 440 return 1; 441 } 442 443 /* 444 * Like io_write_buf(), but for an integer. 445 * Returns zero on failure, non-zero on success. 446 */ 447 int 448 io_write_int(struct sess *sess, int fd, int32_t val) 449 { 450 return io_write_uint(sess, fd, (uint32_t)val); 451 } 452 453 /* 454 * A simple assertion-protected memory copy from th einput "val" or size 455 * "valsz" into our buffer "buf", full size "buflen", position "bufpos". 456 * Increases our "bufpos" appropriately. 457 * This has no return value, but will assert() if the size of the buffer 458 * is insufficient for the new data. 459 */ 460 void 461 io_buffer_buf(void *buf, size_t *bufpos, size_t buflen, const void *val, 462 size_t valsz) 463 { 464 465 assert(*bufpos + valsz <= buflen); 466 memcpy(buf + *bufpos, val, valsz); 467 *bufpos += valsz; 468 } 469 470 /* 471 * Like io_buffer_buf(), but also accomodating for multiplexing codes. 472 * This should NEVER be passed to io_write_buf(), but instead passed 473 * directly to a write operation. 474 */ 475 void 476 io_lowbuffer_buf(struct sess *sess, void *buf, 477 size_t *bufpos, size_t buflen, const void *val, size_t valsz) 478 { 479 int32_t tagbuf; 480 481 if (valsz == 0) 482 return; 483 484 if (!sess->mplex_writes) { 485 io_buffer_buf(buf, bufpos, buflen, val, valsz); 486 return; 487 } 488 489 assert(*bufpos + valsz + sizeof(int32_t) <= buflen); 490 assert(valsz == (valsz & 0xFFFFFF)); 491 tagbuf = htole32((7 << 24) + valsz); 492 493 io_buffer_int(buf, bufpos, buflen, tagbuf); 494 io_buffer_buf(buf, bufpos, buflen, val, valsz); 495 } 496 497 /* 498 * Allocate the space needed for io_lowbuffer_buf() and friends. 499 * This should be called for *each* lowbuffer operation, so: 500 * io_lowbuffer_alloc(... sizeof(int32_t)); 501 * io_lowbuffer_int(...); 502 * io_lowbuffer_alloc(... sizeof(int32_t)); 503 * io_lowbuffer_int(...); 504 * And not sizeof(int32_t) * 2 or whatnot. 505 * Returns zero on failure, non-zero on succes. 506 */ 507 int 508 io_lowbuffer_alloc(struct sess *sess, void **buf, 509 size_t *bufsz, size_t *bufmax, size_t sz) 510 { 511 void *pp; 512 size_t extra; 513 514 extra = sess->mplex_writes ? sizeof(int32_t) : 0; 515 516 if (*bufsz + sz + extra > *bufmax) { 517 pp = realloc(*buf, *bufsz + sz + extra); 518 if (pp == NULL) { 519 ERR("realloc"); 520 return 0; 521 } 522 *buf = pp; 523 *bufmax = *bufsz + sz + extra; 524 } 525 *bufsz += sz + extra; 526 return 1; 527 } 528 529 /* 530 * Like io_lowbuffer_buf(), but for a single integer. 531 */ 532 void 533 io_lowbuffer_int(struct sess *sess, void *buf, 534 size_t *bufpos, size_t buflen, int32_t val) 535 { 536 int32_t nv = htole32(val); 537 538 io_lowbuffer_buf(sess, buf, bufpos, buflen, &nv, sizeof(int32_t)); 539 } 540 541 /* 542 * Like io_buffer_buf(), but for a single integer. 543 */ 544 void 545 io_buffer_int(void *buf, size_t *bufpos, size_t buflen, int32_t val) 546 { 547 int32_t nv = htole32(val); 548 549 io_buffer_buf(buf, bufpos, buflen, &nv, sizeof(int32_t)); 550 } 551 552 /* 553 * Like io_read_buf(), but for a long >=0. 554 * Returns zero on failure, non-zero on success. 555 */ 556 int 557 io_read_long(struct sess *sess, int fd, int64_t *val) 558 { 559 uint64_t uoval; 560 561 if (!io_read_ulong(sess, fd, &uoval)) { 562 ERRX1("io_read_long"); 563 return 0; 564 } 565 *val = (int64_t)uoval; 566 if (*val < 0) { 567 ERRX1("io_read_long negative"); 568 return 0; 569 } 570 return 1; 571 } 572 573 /* 574 * Like io_read_buf(), but for a long. 575 * Returns zero on failure, non-zero on success. 576 */ 577 int 578 io_read_ulong(struct sess *sess, int fd, uint64_t *val) 579 { 580 uint64_t oval; 581 int32_t sval; 582 583 /* Start with the short-circuit: read as an int. */ 584 585 if (!io_read_int(sess, fd, &sval)) { 586 ERRX1("io_read_int"); 587 return 0; 588 } else if (sval != -1) { 589 *val = (uint64_t)le32toh(sval); 590 return 1; 591 } 592 593 /* If the int is -1, read as 64 bits. */ 594 595 if (!io_read_buf(sess, fd, &oval, sizeof(uint64_t))) { 596 ERRX1("io_read_buf"); 597 return 0; 598 } 599 600 *val = le64toh(oval); 601 return 1; 602 } 603 604 /* 605 * One thing we often need to do is read a size_t. 606 * These are transmitted as int32_t, so make sure that the value 607 * transmitted is not out of range. 608 * FIXME: I assume that size_t can handle int32_t's max. 609 * Returns zero on failure, non-zero on success. 610 */ 611 int 612 io_read_size(struct sess *sess, int fd, size_t *val) 613 { 614 int32_t oval; 615 616 if (!io_read_int(sess, fd, &oval)) { 617 ERRX1("io_read_int"); 618 return 0; 619 } else if (oval < 0) { 620 ERRX("io_read_size: negative value"); 621 return 0; 622 } 623 624 *val = oval; 625 return 1; 626 } 627 628 /* 629 * Like io_read_buf(), but for an integer. 630 * Returns zero on failure, non-zero on success. 631 */ 632 int 633 io_read_uint(struct sess *sess, int fd, uint32_t *val) 634 { 635 uint32_t oval; 636 637 if (!io_read_buf(sess, fd, &oval, sizeof(uint32_t))) { 638 ERRX1("io_read_buf"); 639 return 0; 640 } 641 642 *val = le32toh(oval); 643 return 1; 644 } 645 646 int 647 io_read_int(struct sess *sess, int fd, int32_t *val) 648 { 649 return io_read_uint(sess, fd, (uint32_t *)val); 650 } 651 652 /* 653 * Copies "valsz" from "buf", full size "bufsz" at position" bufpos", 654 * into "val". 655 * Calls assert() if the source doesn't have enough data. 656 * Increases "bufpos" to the new position. 657 */ 658 void 659 io_unbuffer_buf(const void *buf, size_t *bufpos, size_t bufsz, void *val, 660 size_t valsz) 661 { 662 663 assert(*bufpos + valsz <= bufsz); 664 memcpy(val, buf + *bufpos, valsz); 665 *bufpos += valsz; 666 } 667 668 /* 669 * Calls io_unbuffer_buf() and converts. 670 */ 671 void 672 io_unbuffer_int(const void *buf, size_t *bufpos, size_t bufsz, int32_t *val) 673 { 674 int32_t oval; 675 676 io_unbuffer_buf(buf, bufpos, bufsz, &oval, sizeof(int32_t)); 677 *val = le32toh(oval); 678 } 679 680 /* 681 * Calls io_unbuffer_buf() and converts. 682 */ 683 int 684 io_unbuffer_size(const void *buf, size_t *bufpos, size_t bufsz, size_t *val) 685 { 686 int32_t oval; 687 688 io_unbuffer_int(buf, bufpos, bufsz, &oval); 689 if (oval < 0) { 690 ERRX("io_unbuffer_size: negative value"); 691 return 0; 692 } 693 *val = oval; 694 return 1; 695 } 696 697 /* 698 * Like io_read_buf(), but for a single byte >=0. 699 * Returns zero on failure, non-zero on success. 700 */ 701 int 702 io_read_byte(struct sess *sess, int fd, uint8_t *val) 703 { 704 705 if (!io_read_buf(sess, fd, val, sizeof(uint8_t))) { 706 ERRX1("io_read_buf"); 707 return 0; 708 } 709 return 1; 710 } 711 712 /* 713 * Like io_write_buf(), but for a single byte. 714 * Returns zero on failure, non-zero on success. 715 */ 716 int 717 io_write_byte(struct sess *sess, int fd, uint8_t val) 718 { 719 720 if (!io_write_buf(sess, fd, &val, sizeof(uint8_t))) { 721 ERRX1("io_write_buf"); 722 return 0; 723 } 724 return 1; 725 } 726