1 /* $OpenBSD: sender.c,v 1.33 2024/03/20 09:26:42 claudio Exp $ */
2 /*
3 * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
4 *
5 * Permission to use, copy, modify, and distribute this software for any
6 * purpose with or without fee is hereby granted, provided that the above
7 * copyright notice and this permission notice appear in all copies.
8 *
9 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16 */
17 #include <sys/mman.h>
18 #include <sys/queue.h>
19 #include <sys/stat.h>
20
21 #include <assert.h>
22 #include <fcntl.h>
23 #include <inttypes.h>
24 #include <poll.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <unistd.h>
28
29 #include <openssl/md4.h>
30
31 #include "extern.h"
32
33 /*
34 * A request from the receiver to download updated file data.
35 */
36 struct send_dl {
37 int32_t idx; /* index in our file list */
38 struct blkset *blks; /* the sender's block information */
39 TAILQ_ENTRY(send_dl) entries;
40 };
41
42 /*
43 * The current file being "updated": sent from sender to receiver.
44 * If there is no file being uploaded, "cur" is NULL.
45 */
46 struct send_up {
47 struct send_dl *cur; /* file being updated or NULL */
48 struct blkstat stat; /* status of file being updated */
49 };
50
51 TAILQ_HEAD(send_dlq, send_dl);
52
53 /*
54 * We have finished updating the receiver's file with sender data.
55 * Deallocate and wipe clean all resources required for that.
56 */
57 static void
send_up_reset(struct send_up * p)58 send_up_reset(struct send_up *p)
59 {
60
61 assert(p != NULL);
62
63 /* Free the download request, if applicable. */
64
65 if (p->cur != NULL) {
66 free(p->cur->blks);
67 free(p->cur);
68 p->cur = NULL;
69 }
70
71 /* If we mapped a file for scanning, unmap it and close. */
72
73 if (p->stat.map != MAP_FAILED)
74 munmap(p->stat.map, p->stat.mapsz);
75
76 p->stat.map = MAP_FAILED;
77 p->stat.mapsz = 0;
78
79 if (p->stat.fd != -1)
80 close(p->stat.fd);
81
82 p->stat.fd = -1;
83
84 /* Now clear the in-transfer information. */
85
86 p->stat.offs = 0;
87 p->stat.hint = 0;
88 p->stat.curst = BLKSTAT_NONE;
89 }
90
91 /*
92 * This is the bulk of the sender work.
93 * Here we tend to an output buffer that responds to receiver requests
94 * for data.
95 * This does not act upon the output descriptor itself so as to avoid
96 * blocking, which otherwise would deadlock the protocol.
97 * Returns zero on failure, non-zero on success.
98 */
99 static int
send_up_fsm(struct sess * sess,size_t * phase,struct send_up * up,void ** wb,size_t * wbsz,size_t * wbmax,const struct flist * fl)100 send_up_fsm(struct sess *sess, size_t *phase,
101 struct send_up *up, void **wb, size_t *wbsz, size_t *wbmax,
102 const struct flist *fl)
103 {
104 size_t pos = 0, isz = sizeof(int32_t),
105 dsz = MD4_DIGEST_LENGTH;
106 unsigned char fmd[MD4_DIGEST_LENGTH];
107 off_t sz;
108 char buf[20];
109
110 switch (up->stat.curst) {
111 case BLKSTAT_DATA:
112 /*
113 * A data segment to be written: buffer both the length
114 * and the data.
115 * If we've finished the transfer, move on to the token;
116 * otherwise, keep sending data.
117 */
118
119 sz = MINIMUM(MAX_CHUNK,
120 up->stat.curlen - up->stat.curpos);
121 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
122 ERRX1("io_lowbuffer_alloc");
123 return 0;
124 }
125 io_lowbuffer_int(sess, *wb, &pos, *wbsz, sz);
126 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, sz)) {
127 ERRX1("io_lowbuffer_alloc");
128 return 0;
129 }
130 io_lowbuffer_buf(sess, *wb, &pos, *wbsz,
131 up->stat.map + up->stat.curpos, sz);
132
133 up->stat.curpos += sz;
134 if (up->stat.curpos == up->stat.curlen)
135 up->stat.curst = BLKSTAT_TOK;
136 return 1;
137 case BLKSTAT_TOK:
138 /*
139 * The data token following (maybe) a data segment.
140 * These can also come standalone if, say, the file's
141 * being fully written.
142 * It's followed by a hash or another data segment,
143 * depending on the token.
144 */
145
146 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
147 ERRX1("io_lowbuffer_alloc");
148 return 0;
149 }
150 io_lowbuffer_int(sess, *wb,
151 &pos, *wbsz, up->stat.curtok);
152 up->stat.curst = up->stat.curtok ?
153 BLKSTAT_NEXT : BLKSTAT_HASH;
154 return 1;
155 case BLKSTAT_HASH:
156 /*
157 * The hash following transmission of all file contents.
158 * This is always followed by the state that we're
159 * finished with the file.
160 */
161
162 hash_file_final(&up->stat.ctx, fmd);
163 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, dsz)) {
164 ERRX1("io_lowbuffer_alloc");
165 return 0;
166 }
167 io_lowbuffer_buf(sess, *wb, &pos, *wbsz, fmd, dsz);
168 up->stat.curst = BLKSTAT_DONE;
169 return 1;
170 case BLKSTAT_DONE:
171 /*
172 * The data has been written.
173 * Clear our current send file and allow the block below
174 * to find another.
175 */
176
177 if (!sess->opts->dry_run)
178 LOG3("%s: flushed %jd KB total, %.2f%% uploaded",
179 fl[up->cur->idx].path,
180 (intmax_t)up->stat.total / 1024,
181 100.0 * up->stat.dirty / up->stat.total);
182 send_up_reset(up);
183 return 1;
184 case BLKSTAT_PHASE:
185 /*
186 * This is where we actually stop the algorithm: we're
187 * already at the second phase.
188 */
189
190 send_up_reset(up);
191 (*phase)++;
192 return 1;
193 case BLKSTAT_NEXT:
194 /*
195 * Our last case: we need to find the
196 * next block (and token) to transmit to
197 * the receiver.
198 * These will drive the finite state
199 * machine in the first few conditional
200 * blocks of this set.
201 */
202
203 assert(up->stat.fd != -1);
204 blk_match(sess, up->cur->blks,
205 fl[up->cur->idx].path, &up->stat);
206 return 1;
207 case BLKSTAT_NONE:
208 break;
209 }
210
211 assert(BLKSTAT_NONE == up->stat.curst);
212
213 /*
214 * We've either hit the phase change following the last file (or
215 * start, or prior phase change), or we need to prime the next
216 * file for transmission.
217 * We special-case dry-run mode.
218 */
219
220 if (up->cur->idx < 0) {
221 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
222 ERRX1("io_lowbuffer_alloc");
223 return 0;
224 }
225 io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);
226
227 if (sess->opts->server && sess->rver > 27) {
228 if (!io_lowbuffer_alloc(sess,
229 wb, wbsz, wbmax, isz)) {
230 ERRX1("io_lowbuffer_alloc");
231 return 0;
232 }
233 io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);
234 }
235 up->stat.curst = BLKSTAT_PHASE;
236 } else if (sess->opts->dry_run) {
237 if (!sess->opts->server)
238 LOG1("%s", fl[up->cur->idx].wpath);
239
240 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
241 ERRX1("io_lowbuffer_alloc");
242 return 0;
243 }
244 io_lowbuffer_int(sess, *wb, &pos, *wbsz, up->cur->idx);
245 up->stat.curst = BLKSTAT_DONE;
246 } else {
247 assert(up->stat.fd != -1);
248
249 /*
250 * FIXME: use the nice output of log_file() and so on in
251 * downloader.c, which means moving this into
252 * BLKSTAT_DONE instead of having it be here.
253 */
254
255 if (!sess->opts->server)
256 LOG1("%s", fl[up->cur->idx].wpath);
257
258 if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, 20)) {
259 ERRX1("io_lowbuffer_alloc");
260 return 0;
261 }
262 assert(sizeof(buf) == 20);
263 blk_recv_ack(buf, up->cur->blks, up->cur->idx);
264 io_lowbuffer_buf(sess, *wb, &pos, *wbsz, buf, 20);
265
266 LOG3("%s: primed for %jd B total",
267 fl[up->cur->idx].path, (intmax_t)up->cur->blks->size);
268 up->stat.curst = BLKSTAT_NEXT;
269 }
270
271 return 1;
272 }
273
274 /*
275 * Enqueue a download request, getting it off the read channel as
276 * quickly a possible.
277 * This frees up the read channel for further incoming requests.
278 * We'll handle each element in turn, up to and including the last
279 * request (phase change), which is always a -1 idx.
280 * Returns zero on failure, non-zero on success.
281 */
282 static int
send_dl_enqueue(struct sess * sess,struct send_dlq * q,int32_t idx,const struct flist * fl,size_t flsz,int fd)283 send_dl_enqueue(struct sess *sess, struct send_dlq *q,
284 int32_t idx, const struct flist *fl, size_t flsz, int fd)
285 {
286 struct send_dl *s;
287
288 /* End-of-phase marker. */
289
290 if (idx == -1) {
291 if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
292 ERR("calloc");
293 return 0;
294 }
295 s->idx = -1;
296 s->blks = NULL;
297 TAILQ_INSERT_TAIL(q, s, entries);
298 return 1;
299 }
300
301 /* Validate the index. */
302
303 if (idx < 0 || (uint32_t)idx >= flsz) {
304 ERRX("file index out of bounds: invalid %d out of %zu",
305 idx, flsz);
306 return 0;
307 } else if (S_ISDIR(fl[idx].st.mode)) {
308 ERRX("blocks requested for "
309 "directory: %s", fl[idx].path);
310 return 0;
311 } else if (S_ISLNK(fl[idx].st.mode)) {
312 ERRX("blocks requested for "
313 "symlink: %s", fl[idx].path);
314 return 0;
315 } else if (!S_ISREG(fl[idx].st.mode)) {
316 ERRX("blocks requested for "
317 "special: %s", fl[idx].path);
318 return 0;
319 }
320
321 if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
322 ERR("callloc");
323 return 0;
324 }
325 s->idx = idx;
326 s->blks = NULL;
327 TAILQ_INSERT_TAIL(q, s, entries);
328
329 /*
330 * This blocks til the full blockset has been read.
331 * That's ok, because the most important thing is getting data
332 * off the wire.
333 */
334
335 if (!sess->opts->dry_run) {
336 s->blks = blk_recv(sess, fd, fl[idx].path);
337 if (s->blks == NULL) {
338 ERRX1("blk_recv");
339 return 0;
340 }
341 }
342 return 1;
343 }
344
345 /*
346 * A client sender manages the read-only source files and sends data to
347 * the receiver as requested.
348 * First it sends its list of files, then it waits for the server to
349 * request updates to individual files.
350 * It queues requests for updates as soon as it receives them.
351 * Returns zero on failure, non-zero on success.
352 *
353 * Pledges: stdio, getpw, rpath.
354 */
355 int
rsync_sender(struct sess * sess,int fdin,int fdout,size_t argc,char ** argv)356 rsync_sender(struct sess *sess, int fdin,
357 int fdout, size_t argc, char **argv)
358 {
359 struct flist *fl = NULL;
360 const struct flist *f;
361 size_t i, flsz = 0, phase = 0;
362 int rc = 0, c;
363 int32_t idx;
364 struct pollfd pfd[3];
365 struct send_dlq sdlq;
366 struct send_dl *dl;
367 struct send_up up;
368 struct stat st;
369 void *wbuf = NULL;
370 size_t wbufpos = 0, wbufsz = 0, wbufmax = 0;
371 ssize_t ssz;
372
373 if (pledge("stdio getpw rpath", NULL) == -1) {
374 ERR("pledge");
375 return 0;
376 }
377
378 memset(&up, 0, sizeof(struct send_up));
379 TAILQ_INIT(&sdlq);
380 up.stat.fd = -1;
381 up.stat.map = MAP_FAILED;
382 up.stat.blktab = blkhash_alloc();
383
384 /*
385 * Generate the list of files we want to send from our
386 * command-line input.
387 * This will also remove all invalid files.
388 */
389
390 if (!flist_gen(sess, argc, argv, &fl, &flsz)) {
391 ERRX1("flist_gen");
392 goto out;
393 }
394
395 /* Client sends zero-length exclusions if deleting. */
396 if (!sess->opts->server && sess->opts->del)
397 send_rules(sess, fdout);
398
399 /*
400 * Then the file list in any mode.
401 * Finally, the IO error (always zero for us).
402 */
403
404 if (!flist_send(sess, fdin, fdout, fl, flsz)) {
405 ERRX1("flist_send");
406 goto out;
407 } else if (!io_write_int(sess, fdout, 0)) {
408 ERRX1("io_write_int");
409 goto out;
410 }
411
412 /* Exit if we're the server with zero files. */
413
414 if (flsz == 0 && sess->opts->server) {
415 WARNX("sender has empty file list: exiting");
416 rc = 1;
417 goto out;
418 } else if (!sess->opts->server)
419 LOG1("Transfer starting: %zu files", flsz);
420
421 /*
422 * If we're the server, read our exclusion list.
423 * This is always 0 for now.
424 */
425
426 if (sess->opts->server)
427 recv_rules(sess, fdin);
428
429 /*
430 * Set up our poll events.
431 * We start by polling only in receiver requests, enabling other
432 * poll events on demand.
433 */
434
435 pfd[0].fd = -1; /* from receiver */
436 pfd[0].events = POLLIN;
437 pfd[1].fd = -1; /* to receiver */
438 pfd[1].events = POLLOUT;
439 pfd[2].fd = -1; /* from local file */
440 pfd[2].events = POLLIN;
441
442 for (;;) {
443 /* disable recevier until all buffered data was sent */
444 if (pfd[1].fd != -1 && wbufsz > 0)
445 pfd[0].fd = -1;
446 else
447 pfd[0].fd = fdin;
448 if ((c = poll(pfd, 3, poll_timeout)) == -1) {
449 ERR("poll");
450 goto out;
451 } else if (c == 0) {
452 ERRX("poll: timeout");
453 goto out;
454 }
455 for (i = 0; i < 3; i++)
456 if (pfd[i].revents & (POLLERR|POLLNVAL)) {
457 ERRX("poll: bad fd");
458 goto out;
459 } else if (pfd[i].revents & POLLHUP) {
460 ERRX("poll: hangup");
461 goto out;
462 }
463
464 /*
465 * If we have a request coming down off the wire, pull
466 * it in as quickly as possible into our buffer.
467 * Start by seeing if we have a log message.
468 * If we do, pop it off, then see if we have anything
469 * left and hit it again if so (read priority).
470 */
471
472 if (sess->mplex_reads && (pfd[0].revents & POLLIN)) {
473 if (!io_read_flush(sess, fdin)) {
474 ERRX1("io_read_flush");
475 goto out;
476 } else if (sess->mplex_read_remain == 0) {
477 c = io_read_check(fdin);
478 if (c < 0) {
479 ERRX1("io_read_check");
480 goto out;
481 } else if (c > 0)
482 continue;
483 pfd[0].revents &= ~POLLIN;
484 }
485 }
486
487 /*
488 * Now that we've handled the log messages, we're left
489 * here if we have any actual data coming down.
490 * Enqueue message requests, then loop again if we see
491 * more data (read priority).
492 */
493
494 if (pfd[0].revents & POLLIN) {
495 if (!io_read_int(sess, fdin, &idx)) {
496 ERRX1("io_read_int");
497 goto out;
498 }
499 if (!send_dl_enqueue(sess,
500 &sdlq, idx, fl, flsz, fdin)) {
501 ERRX1("send_dl_enqueue");
502 goto out;
503 }
504 c = io_read_check(fdin);
505 if (c < 0) {
506 ERRX1("io_read_check");
507 goto out;
508 } else if (c > 0)
509 continue;
510 }
511
512 /*
513 * One of our local files has been opened in response
514 * to a receiver request and now we can map it.
515 * We'll respond to the event by looking at the map when
516 * the writer is available.
517 * Here we also enable the poll event for output.
518 */
519
520 if (pfd[2].revents & POLLIN) {
521 assert(up.cur != NULL);
522 assert(up.stat.fd != -1);
523 assert(up.stat.map == MAP_FAILED);
524 assert(up.stat.mapsz == 0);
525 f = &fl[up.cur->idx];
526
527 if (fstat(up.stat.fd, &st) == -1) {
528 ERR("%s: fstat", f->path);
529 goto out;
530 }
531
532 /*
533 * If the file is zero-length, the map will
534 * fail, but either way we want to unset that
535 * we're waiting for the file to open and set
536 * that we're ready for the output channel.
537 */
538
539 if ((up.stat.mapsz = st.st_size) > 0) {
540 up.stat.map = mmap(NULL,
541 up.stat.mapsz, PROT_READ,
542 MAP_SHARED, up.stat.fd, 0);
543 if (up.stat.map == MAP_FAILED) {
544 ERR("%s: mmap", f->path);
545 goto out;
546 }
547 }
548
549 pfd[2].fd = -1;
550 pfd[1].fd = fdout;
551 }
552
553 /*
554 * If we have buffers waiting to write, write them out
555 * as soon as we can in a non-blocking fashion.
556 * We must not be waiting for any local files.
557 * ALL WRITES MUST HAPPEN HERE.
558 * This keeps the sender deadlock-free.
559 */
560
561 if ((pfd[1].revents & POLLOUT) && wbufsz > 0) {
562 assert(pfd[2].fd == -1);
563 assert(wbufsz - wbufpos);
564 ssz = write(fdout, wbuf + wbufpos, wbufsz - wbufpos);
565 if (ssz == -1) {
566 ERR("write");
567 goto out;
568 }
569 wbufpos += ssz;
570 if (wbufpos == wbufsz)
571 wbufpos = wbufsz = 0;
572 pfd[1].revents &= ~POLLOUT;
573
574 /* This is usually in io.c... */
575
576 sess->total_write += ssz;
577 }
578
579 /*
580 * Engage the FSM for the current transfer.
581 * If our phase changes, stop processing.
582 */
583
584 if (pfd[1].revents & POLLOUT && up.cur != NULL) {
585 assert(pfd[2].fd == -1);
586 assert(wbufpos == 0 && wbufsz == 0);
587 if (!send_up_fsm(sess, &phase,
588 &up, &wbuf, &wbufsz, &wbufmax, fl)) {
589 ERRX1("send_up_fsm");
590 goto out;
591 }
592 if (phase > 1)
593 break;
594 }
595
596 /*
597 * Incoming queue management.
598 * If we have no queue component that we're waiting on,
599 * then pull off the receiver-request queue and start
600 * processing the request.
601 */
602
603 if (up.cur == NULL) {
604 assert(pfd[2].fd == -1);
605 assert(up.stat.fd == -1);
606 assert(up.stat.map == MAP_FAILED);
607 assert(up.stat.mapsz == 0);
608 assert(wbufsz == 0 && wbufpos == 0);
609 pfd[1].fd = -1;
610
611 /*
612 * If there's nothing in the queue, then keep
613 * the output channel disabled and wait for
614 * whatever comes next from the reader.
615 */
616
617 if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL)
618 continue;
619 TAILQ_REMOVE(&sdlq, up.cur, entries);
620
621 /* Hash our blocks. */
622
623 hash_file_start(&up.stat.ctx, sess);
624 blkhash_set(up.stat.blktab, up.cur->blks);
625
626 /*
627 * End of phase: enable channel to receiver.
628 * We'll need our output buffer enabled in order
629 * to process this event.
630 */
631
632 if (up.cur->idx == -1) {
633 pfd[1].fd = fdout;
634 continue;
635 }
636
637 /*
638 * Non-blocking open of file.
639 * This will be picked up in the state machine
640 * block of not being primed.
641 */
642
643 up.stat.fd = open(fl[up.cur->idx].path,
644 O_RDONLY|O_NONBLOCK, 0);
645 if (up.stat.fd == -1) {
646 ERR("%s: open", fl[up.cur->idx].path);
647 goto out;
648 }
649 pfd[2].fd = up.stat.fd;
650 }
651 }
652
653 if (!TAILQ_EMPTY(&sdlq)) {
654 ERRX("phases complete with files still queued");
655 goto out;
656 }
657
658 if (!sess_stats_send(sess, fdout)) {
659 ERRX1("sess_stats_end");
660 goto out;
661 }
662
663 /* Final "goodbye" message. */
664
665 if (!io_read_int(sess, fdin, &idx)) {
666 ERRX1("io_read_int");
667 goto out;
668 }
669 if (idx != -1) {
670 ERRX("read incorrect update complete ack");
671 goto out;
672 }
673
674 LOG2("sender finished updating");
675 rc = 1;
676 out:
677 send_up_reset(&up);
678 while ((dl = TAILQ_FIRST(&sdlq)) != NULL) {
679 TAILQ_REMOVE(&sdlq, dl, entries);
680 free(dl->blks);
681 free(dl);
682 }
683 flist_free(fl, flsz);
684 free(wbuf);
685 blkhash_free(up.stat.blktab);
686 return rc;
687 }
688