1 /* $OpenBSD: io.c,v 1.23 2022/12/26 19:16:02 jmc Exp $ */
2 /*
3 * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
4 *
5 * Permission to use, copy, modify, and distribute this software for any
6 * purpose with or without fee is hereby granted, provided that the above
7 * copyright notice and this permission notice appear in all copies.
8 *
9 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16 */
17 #include <sys/stat.h>
18
19 #include <assert.h>
20 #include <endian.h>
21 #include <errno.h>
22 #include <poll.h>
23 #include <stdint.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <unistd.h>
28
29 #include "extern.h"
30
31 /*
32 * A non-blocking check to see whether there's POLLIN data in fd.
33 * Returns <0 on failure, 0 if there's no data, >0 if there is.
34 */
35 int
io_read_check(int fd)36 io_read_check(int fd)
37 {
38 struct pollfd pfd;
39
40 pfd.fd = fd;
41 pfd.events = POLLIN;
42
43 if (poll(&pfd, 1, 0) == -1) {
44 ERR("poll");
45 return -1;
46 }
47 return (pfd.revents & POLLIN);
48 }
49
50 /*
51 * Write buffer to non-blocking descriptor.
52 * Returns zero on failure, non-zero on success (zero or more bytes).
53 * On success, fills in "sz" with the amount written.
54 */
55 static int
io_write_nonblocking(int fd,const void * buf,size_t bsz,size_t * sz)56 io_write_nonblocking(int fd, const void *buf, size_t bsz,
57 size_t *sz)
58 {
59 struct pollfd pfd;
60 ssize_t wsz;
61 int c;
62
63 *sz = 0;
64
65 if (bsz == 0)
66 return 1;
67
68 pfd.fd = fd;
69 pfd.events = POLLOUT;
70
71 /* Poll and check for all possible errors. */
72
73 if ((c = poll(&pfd, 1, poll_timeout)) == -1) {
74 ERR("poll");
75 return 0;
76 } else if (c == 0) {
77 ERRX("poll: timeout");
78 return 0;
79 } else if ((pfd.revents & (POLLERR|POLLNVAL))) {
80 ERRX("poll: bad fd");
81 return 0;
82 } else if ((pfd.revents & POLLHUP)) {
83 ERRX("poll: hangup");
84 return 0;
85 } else if (!(pfd.revents & POLLOUT)) {
86 ERRX("poll: unknown event");
87 return 0;
88 }
89
90 /* Now the non-blocking write. */
91
92 if ((wsz = write(fd, buf, bsz)) == -1) {
93 ERR("write");
94 return 0;
95 }
96
97 *sz = wsz;
98 return 1;
99 }
100
101 /*
102 * Blocking write of the full size of the buffer.
103 * Returns 0 on failure, non-zero on success (all bytes written).
104 */
105 static int
io_write_blocking(int fd,const void * buf,size_t sz)106 io_write_blocking(int fd, const void *buf, size_t sz)
107 {
108 size_t wsz;
109 int c;
110
111 while (sz > 0) {
112 c = io_write_nonblocking(fd, buf, sz, &wsz);
113 if (!c) {
114 ERRX1("io_write_nonblocking");
115 return 0;
116 } else if (wsz == 0) {
117 ERRX("io_write_nonblocking: short write");
118 return 0;
119 }
120 buf += wsz;
121 sz -= wsz;
122 }
123
124 return 1;
125 }
126
127 /*
128 * Write "buf" of size "sz" to non-blocking descriptor.
129 * Returns zero on failure, non-zero on success (all bytes written to
130 * the descriptor).
131 */
132 int
io_write_buf(struct sess * sess,int fd,const void * buf,size_t sz)133 io_write_buf(struct sess *sess, int fd, const void *buf, size_t sz)
134 {
135 int32_t tag, tagbuf;
136 size_t wsz;
137 int c;
138
139 if (!sess->mplex_writes) {
140 c = io_write_blocking(fd, buf, sz);
141 sess->total_write += sz;
142 return c;
143 }
144
145 while (sz > 0) {
146 wsz = (sz < 0xFFFFFF) ? sz : 0xFFFFFF;
147 tag = (7 << 24) + wsz;
148 tagbuf = htole32(tag);
149 if (!io_write_blocking(fd, &tagbuf, sizeof(tagbuf))) {
150 ERRX1("io_write_blocking");
151 return 0;
152 }
153 if (!io_write_blocking(fd, buf, wsz)) {
154 ERRX1("io_write_blocking");
155 return 0;
156 }
157 sess->total_write += wsz;
158 sz -= wsz;
159 buf += wsz;
160 }
161
162 return 1;
163 }
164
165 /*
166 * Write "line" (NUL-terminated) followed by a newline.
167 * Returns zero on failure, non-zero on success.
168 */
169 int
io_write_line(struct sess * sess,int fd,const char * line)170 io_write_line(struct sess *sess, int fd, const char *line)
171 {
172
173 if (!io_write_buf(sess, fd, line, strlen(line)))
174 ERRX1("io_write_buf");
175 else if (!io_write_byte(sess, fd, '\n'))
176 ERRX1("io_write_byte");
177 else
178 return 1;
179
180 return 0;
181 }
182
183 /*
184 * Read buffer from non-blocking descriptor.
185 * Returns zero on failure, non-zero on success (zero or more bytes).
186 */
187 static int
io_read_nonblocking(int fd,void * buf,size_t bsz,size_t * sz)188 io_read_nonblocking(int fd, void *buf, size_t bsz, size_t *sz)
189 {
190 struct pollfd pfd;
191 ssize_t rsz;
192 int c;
193
194 *sz = 0;
195
196 if (bsz == 0)
197 return 1;
198
199 pfd.fd = fd;
200 pfd.events = POLLIN;
201
202 /* Poll and check for all possible errors. */
203
204 if ((c = poll(&pfd, 1, poll_timeout)) == -1) {
205 ERR("poll");
206 return 0;
207 } else if (c == 0) {
208 ERRX("poll: timeout");
209 return 0;
210 } else if ((pfd.revents & (POLLERR|POLLNVAL))) {
211 ERRX("poll: bad fd");
212 return 0;
213 } else if (!(pfd.revents & (POLLIN|POLLHUP))) {
214 ERRX("poll: unknown event");
215 return 0;
216 }
217
218 /* Now the non-blocking read, checking for EOF. */
219
220 if ((rsz = read(fd, buf, bsz)) == -1) {
221 ERR("read");
222 return 0;
223 } else if (rsz == 0) {
224 ERRX("unexpected end of file");
225 return 0;
226 }
227
228 *sz = rsz;
229 return 1;
230 }
231
232 /*
233 * Blocking read of the full size of the buffer.
234 * This can be called from either the error type message or a regular
235 * message---or for that matter, multiplexed or not.
236 * Returns 0 on failure, non-zero on success (all bytes read).
237 */
238 static int
io_read_blocking(int fd,void * buf,size_t sz)239 io_read_blocking(int fd, void *buf, size_t sz)
240 {
241 size_t rsz;
242 int c;
243
244 while (sz > 0) {
245 c = io_read_nonblocking(fd, buf, sz, &rsz);
246 if (!c) {
247 ERRX1("io_read_nonblocking");
248 return 0;
249 } else if (rsz == 0) {
250 ERRX("io_read_nonblocking: short read");
251 return 0;
252 }
253 buf += rsz;
254 sz -= rsz;
255 }
256
257 return 1;
258 }
259
260 /*
261 * When we do a lot of writes in a row (such as when the sender emits
262 * the file list), the server might be sending us multiplexed log
263 * messages.
264 * If it sends too many, it clogs the socket.
265 * This function looks into the read buffer and clears out any log
266 * messages pending.
267 * If called when there are valid data reads available, this function
268 * does nothing.
269 * Returns zero on failure, non-zero on success.
270 */
271 int
io_read_flush(struct sess * sess,int fd)272 io_read_flush(struct sess *sess, int fd)
273 {
274 int32_t tagbuf, tag;
275 char mpbuf[1024];
276
277 if (sess->mplex_read_remain)
278 return 1;
279
280 /*
281 * First, read the 4-byte multiplex tag.
282 * The first byte is the tag identifier (7 for normal
283 * data, !7 for out-of-band data), the last three are
284 * for the remaining data size.
285 */
286
287 if (!io_read_blocking(fd, &tagbuf, sizeof(tagbuf))) {
288 ERRX1("io_read_blocking");
289 return 0;
290 }
291 tag = le32toh(tagbuf);
292 sess->mplex_read_remain = tag & 0xFFFFFF;
293 tag >>= 24;
294 if (tag == 7)
295 return 1;
296
297 tag -= 7;
298
299 if (sess->mplex_read_remain > sizeof(mpbuf)) {
300 ERRX("multiplex buffer overflow");
301 return 0;
302 } else if (sess->mplex_read_remain == 0)
303 return 1;
304
305 if (!io_read_blocking(fd, mpbuf, sess->mplex_read_remain)) {
306 ERRX1("io_read_blocking");
307 return 0;
308 }
309 if (mpbuf[sess->mplex_read_remain - 1] == '\n')
310 mpbuf[--sess->mplex_read_remain] = '\0';
311
312 /*
313 * Always print the server's messages, as the server
314 * will control its own log levelling.
315 */
316
317 LOG0("%.*s", (int)sess->mplex_read_remain, mpbuf);
318 sess->mplex_read_remain = 0;
319
320 /*
321 * I only know that a tag of one means an error.
322 * This means that we should exit.
323 */
324
325 if (tag == 1) {
326 ERRX1("error from remote host");
327 return 0;
328 }
329 return 1;
330 }
331
332 /*
333 * Read buffer from non-blocking descriptor, possibly in multiplex read
334 * mode.
335 * Returns zero on failure, non-zero on success (all bytes read from
336 * the descriptor).
337 */
338 int
io_read_buf(struct sess * sess,int fd,void * buf,size_t sz)339 io_read_buf(struct sess *sess, int fd, void *buf, size_t sz)
340 {
341 size_t rsz;
342 int c;
343
344 /* If we're not multiplexing, read directly. */
345
346 if (!sess->mplex_reads) {
347 assert(sess->mplex_read_remain == 0);
348 c = io_read_blocking(fd, buf, sz);
349 sess->total_read += sz;
350 return c;
351 }
352
353 while (sz > 0) {
354 /*
355 * First, check to see if we have any regular data
356 * hanging around waiting to be read.
357 * If so, read the lesser of that data and whatever
358 * amount we currently want.
359 */
360
361 if (sess->mplex_read_remain) {
362 rsz = sess->mplex_read_remain < sz ?
363 sess->mplex_read_remain : sz;
364 if (!io_read_blocking(fd, buf, rsz)) {
365 ERRX1("io_read_blocking");
366 return 0;
367 }
368 sz -= rsz;
369 sess->mplex_read_remain -= rsz;
370 buf += rsz;
371 sess->total_read += rsz;
372 continue;
373 }
374
375 assert(sess->mplex_read_remain == 0);
376 if (!io_read_flush(sess, fd)) {
377 ERRX1("io_read_flush");
378 return 0;
379 }
380 }
381
382 return 1;
383 }
384
385 /*
386 * Like io_write_buf(), but for a long (which is a composite type).
387 * Returns zero on failure, non-zero on success.
388 */
389 int
io_write_ulong(struct sess * sess,int fd,uint64_t val)390 io_write_ulong(struct sess *sess, int fd, uint64_t val)
391 {
392 uint64_t nv;
393 int64_t sval = (int64_t)val;
394
395 /* Short-circuit: send as an integer if possible. */
396
397 if (sval <= INT32_MAX && sval >= 0) {
398 if (!io_write_int(sess, fd, (int32_t)val)) {
399 ERRX1("io_write_int");
400 return 0;
401 }
402 return 1;
403 }
404
405 /* Otherwise, pad with -1 32-bit, then send 64-bit. */
406
407 nv = htole64(val);
408
409 if (!io_write_int(sess, fd, -1))
410 ERRX1("io_write_int");
411 else if (!io_write_buf(sess, fd, &nv, sizeof(int64_t)))
412 ERRX1("io_write_buf");
413 else
414 return 1;
415
416 return 0;
417 }
418
419 int
io_write_long(struct sess * sess,int fd,int64_t val)420 io_write_long(struct sess *sess, int fd, int64_t val)
421 {
422 return io_write_ulong(sess, fd, (uint64_t)val);
423 }
424
425 /*
426 * Like io_write_buf(), but for an unsigned integer.
427 * Returns zero on failure, non-zero on success.
428 */
429 int
io_write_uint(struct sess * sess,int fd,uint32_t val)430 io_write_uint(struct sess *sess, int fd, uint32_t val)
431 {
432 uint32_t nv;
433
434 nv = htole32(val);
435
436 if (!io_write_buf(sess, fd, &nv, sizeof(uint32_t))) {
437 ERRX1("io_write_buf");
438 return 0;
439 }
440 return 1;
441 }
442
443 /*
444 * Like io_write_buf(), but for an integer.
445 * Returns zero on failure, non-zero on success.
446 */
447 int
io_write_int(struct sess * sess,int fd,int32_t val)448 io_write_int(struct sess *sess, int fd, int32_t val)
449 {
450 return io_write_uint(sess, fd, (uint32_t)val);
451 }
452
453 /*
454 * A simple assertion-protected memory copy from th einput "val" or size
455 * "valsz" into our buffer "buf", full size "buflen", position "bufpos".
456 * Increases our "bufpos" appropriately.
457 * This has no return value, but will assert() if the size of the buffer
458 * is insufficient for the new data.
459 */
460 void
io_buffer_buf(void * buf,size_t * bufpos,size_t buflen,const void * val,size_t valsz)461 io_buffer_buf(void *buf, size_t *bufpos, size_t buflen, const void *val,
462 size_t valsz)
463 {
464
465 assert(*bufpos + valsz <= buflen);
466 memcpy(buf + *bufpos, val, valsz);
467 *bufpos += valsz;
468 }
469
470 /*
471 * Like io_buffer_buf(), but also accommodating for multiplexing codes.
472 * This should NEVER be passed to io_write_buf(), but instead passed
473 * directly to a write operation.
474 */
475 void
io_lowbuffer_buf(struct sess * sess,void * buf,size_t * bufpos,size_t buflen,const void * val,size_t valsz)476 io_lowbuffer_buf(struct sess *sess, void *buf,
477 size_t *bufpos, size_t buflen, const void *val, size_t valsz)
478 {
479 int32_t tagbuf;
480
481 if (valsz == 0)
482 return;
483
484 if (!sess->mplex_writes) {
485 io_buffer_buf(buf, bufpos, buflen, val, valsz);
486 return;
487 }
488
489 assert(*bufpos + valsz + sizeof(int32_t) <= buflen);
490 assert(valsz == (valsz & 0xFFFFFF));
491 tagbuf = htole32((7 << 24) + valsz);
492
493 io_buffer_int(buf, bufpos, buflen, tagbuf);
494 io_buffer_buf(buf, bufpos, buflen, val, valsz);
495 }
496
497 /*
498 * Allocate the space needed for io_lowbuffer_buf() and friends.
499 * This should be called for *each* lowbuffer operation, so:
500 * io_lowbuffer_alloc(... sizeof(int32_t));
501 * io_lowbuffer_int(...);
502 * io_lowbuffer_alloc(... sizeof(int32_t));
503 * io_lowbuffer_int(...);
504 * And not sizeof(int32_t) * 2 or whatnot.
505 * Returns zero on failure, non-zero on success.
506 */
507 int
io_lowbuffer_alloc(struct sess * sess,void ** buf,size_t * bufsz,size_t * bufmax,size_t sz)508 io_lowbuffer_alloc(struct sess *sess, void **buf,
509 size_t *bufsz, size_t *bufmax, size_t sz)
510 {
511 void *pp;
512 size_t extra;
513
514 extra = sess->mplex_writes ? sizeof(int32_t) : 0;
515
516 if (*bufsz + sz + extra > *bufmax) {
517 pp = realloc(*buf, *bufsz + sz + extra);
518 if (pp == NULL) {
519 ERR("realloc");
520 return 0;
521 }
522 *buf = pp;
523 *bufmax = *bufsz + sz + extra;
524 }
525 *bufsz += sz + extra;
526 return 1;
527 }
528
529 /*
530 * Like io_lowbuffer_buf(), but for a single integer.
531 */
532 void
io_lowbuffer_int(struct sess * sess,void * buf,size_t * bufpos,size_t buflen,int32_t val)533 io_lowbuffer_int(struct sess *sess, void *buf,
534 size_t *bufpos, size_t buflen, int32_t val)
535 {
536 int32_t nv = htole32(val);
537
538 io_lowbuffer_buf(sess, buf, bufpos, buflen, &nv, sizeof(int32_t));
539 }
540
541 /*
542 * Like io_buffer_buf(), but for a single integer.
543 */
544 void
io_buffer_int(void * buf,size_t * bufpos,size_t buflen,int32_t val)545 io_buffer_int(void *buf, size_t *bufpos, size_t buflen, int32_t val)
546 {
547 int32_t nv = htole32(val);
548
549 io_buffer_buf(buf, bufpos, buflen, &nv, sizeof(int32_t));
550 }
551
552 /*
553 * Like io_read_buf(), but for a long >=0.
554 * Returns zero on failure, non-zero on success.
555 */
556 int
io_read_long(struct sess * sess,int fd,int64_t * val)557 io_read_long(struct sess *sess, int fd, int64_t *val)
558 {
559 uint64_t uoval;
560
561 if (!io_read_ulong(sess, fd, &uoval)) {
562 ERRX1("io_read_long");
563 return 0;
564 }
565 *val = (int64_t)uoval;
566 if (*val < 0) {
567 ERRX1("io_read_long negative");
568 return 0;
569 }
570 return 1;
571 }
572
573 /*
574 * Like io_read_buf(), but for a long.
575 * Returns zero on failure, non-zero on success.
576 */
577 int
io_read_ulong(struct sess * sess,int fd,uint64_t * val)578 io_read_ulong(struct sess *sess, int fd, uint64_t *val)
579 {
580 uint64_t oval;
581 int32_t sval;
582
583 /* Start with the short-circuit: read as an int. */
584
585 if (!io_read_int(sess, fd, &sval)) {
586 ERRX1("io_read_int");
587 return 0;
588 }
589 if (sval != -1) {
590 *val = sval;
591 return 1;
592 }
593
594 /* If the int is -1, read as 64 bits. */
595
596 if (!io_read_buf(sess, fd, &oval, sizeof(uint64_t))) {
597 ERRX1("io_read_buf");
598 return 0;
599 }
600
601 *val = le64toh(oval);
602 return 1;
603 }
604
605 /*
606 * One thing we often need to do is read a size_t.
607 * These are transmitted as int32_t, so make sure that the value
608 * transmitted is not out of range.
609 * FIXME: I assume that size_t can handle int32_t's max.
610 * Returns zero on failure, non-zero on success.
611 */
612 int
io_read_size(struct sess * sess,int fd,size_t * val)613 io_read_size(struct sess *sess, int fd, size_t *val)
614 {
615 int32_t oval;
616
617 if (!io_read_int(sess, fd, &oval)) {
618 ERRX1("io_read_int");
619 return 0;
620 } else if (oval < 0) {
621 ERRX("io_read_size: negative value");
622 return 0;
623 }
624
625 *val = oval;
626 return 1;
627 }
628
629 /*
630 * Like io_read_buf(), but for an integer.
631 * Returns zero on failure, non-zero on success.
632 */
633 int
io_read_uint(struct sess * sess,int fd,uint32_t * val)634 io_read_uint(struct sess *sess, int fd, uint32_t *val)
635 {
636 uint32_t oval;
637
638 if (!io_read_buf(sess, fd, &oval, sizeof(uint32_t))) {
639 ERRX1("io_read_buf");
640 return 0;
641 }
642
643 *val = le32toh(oval);
644 return 1;
645 }
646
647 int
io_read_int(struct sess * sess,int fd,int32_t * val)648 io_read_int(struct sess *sess, int fd, int32_t *val)
649 {
650 return io_read_uint(sess, fd, (uint32_t *)val);
651 }
652
653 /*
654 * Copies "valsz" from "buf", full size "bufsz" at position" bufpos",
655 * into "val".
656 * Calls assert() if the source doesn't have enough data.
657 * Increases "bufpos" to the new position.
658 */
659 void
io_unbuffer_buf(const void * buf,size_t * bufpos,size_t bufsz,void * val,size_t valsz)660 io_unbuffer_buf(const void *buf, size_t *bufpos, size_t bufsz, void *val,
661 size_t valsz)
662 {
663
664 assert(*bufpos + valsz <= bufsz);
665 memcpy(val, buf + *bufpos, valsz);
666 *bufpos += valsz;
667 }
668
669 /*
670 * Calls io_unbuffer_buf() and converts.
671 */
672 void
io_unbuffer_int(const void * buf,size_t * bufpos,size_t bufsz,int32_t * val)673 io_unbuffer_int(const void *buf, size_t *bufpos, size_t bufsz, int32_t *val)
674 {
675 int32_t oval;
676
677 io_unbuffer_buf(buf, bufpos, bufsz, &oval, sizeof(int32_t));
678 *val = le32toh(oval);
679 }
680
681 /*
682 * Calls io_unbuffer_buf() and converts.
683 */
684 int
io_unbuffer_size(const void * buf,size_t * bufpos,size_t bufsz,size_t * val)685 io_unbuffer_size(const void *buf, size_t *bufpos, size_t bufsz, size_t *val)
686 {
687 int32_t oval;
688
689 io_unbuffer_int(buf, bufpos, bufsz, &oval);
690 if (oval < 0) {
691 ERRX("io_unbuffer_size: negative value");
692 return 0;
693 }
694 *val = oval;
695 return 1;
696 }
697
698 /*
699 * Like io_read_buf(), but for a single byte >=0.
700 * Returns zero on failure, non-zero on success.
701 */
702 int
io_read_byte(struct sess * sess,int fd,uint8_t * val)703 io_read_byte(struct sess *sess, int fd, uint8_t *val)
704 {
705
706 if (!io_read_buf(sess, fd, val, sizeof(uint8_t))) {
707 ERRX1("io_read_buf");
708 return 0;
709 }
710 return 1;
711 }
712
713 /*
714 * Like io_write_buf(), but for a single byte.
715 * Returns zero on failure, non-zero on success.
716 */
717 int
io_write_byte(struct sess * sess,int fd,uint8_t val)718 io_write_byte(struct sess *sess, int fd, uint8_t val)
719 {
720
721 if (!io_write_buf(sess, fd, &val, sizeof(uint8_t))) {
722 ERRX1("io_write_buf");
723 return 0;
724 }
725 return 1;
726 }
727