xref: /openbsd/usr.bin/rsync/sender.c (revision e5dd7070)
1 /*	$Id: sender.c,v 1.25 2019/06/28 13:35:03 deraadt Exp $ */
2 /*
3  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
4  *
5  * Permission to use, copy, modify, and distribute this software for any
6  * purpose with or without fee is hereby granted, provided that the above
7  * copyright notice and this permission notice appear in all copies.
8  *
9  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16  */
17 #include <sys/mman.h>
18 #include <sys/queue.h>
19 #include <sys/stat.h>
20 
21 #include <assert.h>
22 #include <fcntl.h>
23 #include <inttypes.h>
24 #include <poll.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <unistd.h>
28 
29 #include <openssl/md4.h>
30 
31 #include "extern.h"
32 
33 /*
34  * A request from the receiver to download updated file data.
35  */
36 struct	send_dl {
37 	int32_t			 idx; /* index in our file list */
38 	struct blkset		*blks; /* the sender's block information */
39 	TAILQ_ENTRY(send_dl)	 entries;
40 };
41 
42 /*
43  * The current file being "updated": sent from sender to receiver.
44  * If there is no file being uploaded, "cur" is NULL.
45  */
46 struct	send_up {
47 	struct send_dl	*cur; /* file being updated or NULL */
48 	struct blkstat	 stat; /* status of file being updated */
49 };
50 
51 TAILQ_HEAD(send_dlq, send_dl);
52 
53 /*
54  * We have finished updating the receiver's file with sender data.
55  * Deallocate and wipe clean all resources required for that.
56  */
57 static void
58 send_up_reset(struct send_up *p)
59 {
60 
61 	assert(p != NULL);
62 
63 	/* Free the download request, if applicable. */
64 
65 	if (p->cur != NULL) {
66 		free(p->cur->blks);
67 		free(p->cur);
68 		p->cur = NULL;
69 	}
70 
71 	/* If we mapped a file for scanning, unmap it and close. */
72 
73 	if (p->stat.map != MAP_FAILED)
74 		munmap(p->stat.map, p->stat.mapsz);
75 
76 	p->stat.map = MAP_FAILED;
77 	p->stat.mapsz = 0;
78 
79 	if (p->stat.fd != -1)
80 		close(p->stat.fd);
81 
82 	p->stat.fd = -1;
83 
84 	/* Now clear the in-transfer information. */
85 
86 	p->stat.offs = 0;
87 	p->stat.hint = 0;
88 	p->stat.curst = BLKSTAT_NONE;
89 }
90 
91 /*
92  * This is the bulk of the sender work.
93  * Here we tend to an output buffer that responds to receiver requests
94  * for data.
95  * This does not act upon the output descriptor itself so as to avoid
96  * blocking, which otherwise would deadlock the protocol.
97  * Returns zero on failure, non-zero on success.
98  */
99 static int
100 send_up_fsm(struct sess *sess, size_t *phase,
101 	struct send_up *up, void **wb, size_t *wbsz, size_t *wbmax,
102 	const struct flist *fl)
103 {
104 	size_t		 pos = 0, isz = sizeof(int32_t),
105 			 dsz = MD4_DIGEST_LENGTH;
106 	unsigned char	 fmd[MD4_DIGEST_LENGTH];
107 	off_t		 sz;
108 	char		 buf[20];
109 
110 	switch (up->stat.curst) {
111 	case BLKSTAT_DATA:
112 		/*
113 		 * A data segment to be written: buffer both the length
114 		 * and the data.
115 		 * If we've finished the transfer, move on to the token;
116 		 * otherwise, keep sending data.
117 		 */
118 
119 		sz = MINIMUM(MAX_CHUNK,
120 			up->stat.curlen - up->stat.curpos);
121 		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
122 			ERRX1("io_lowbuffer_alloc");
123 			return 0;
124 		}
125 		io_lowbuffer_int(sess, *wb, &pos, *wbsz, sz);
126 		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, sz)) {
127 			ERRX1("io_lowbuffer_alloc");
128 			return 0;
129 		}
130 		io_lowbuffer_buf(sess, *wb, &pos, *wbsz,
131 			up->stat.map + up->stat.curpos, sz);
132 
133 		up->stat.curpos += sz;
134 		if (up->stat.curpos == up->stat.curlen)
135 			up->stat.curst = BLKSTAT_TOK;
136 		return 1;
137 	case BLKSTAT_TOK:
138 		/*
139 		 * The data token following (maybe) a data segment.
140 		 * These can also come standalone if, say, the file's
141 		 * being fully written.
142 		 * It's followed by a hash or another data segment,
143 		 * depending on the token.
144 		 */
145 
146 		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
147 			ERRX1("io_lowbuffer_alloc");
148 			return 0;
149 		}
150 		io_lowbuffer_int(sess, *wb,
151 			&pos, *wbsz, up->stat.curtok);
152 		up->stat.curst = up->stat.curtok ?
153 			BLKSTAT_NEXT : BLKSTAT_HASH;
154 		return 1;
155 	case BLKSTAT_HASH:
156 		/*
157 		 * The hash following transmission of all file contents.
158 		 * This is always followed by the state that we're
159 		 * finished with the file.
160 		 */
161 
162 		hash_file(up->stat.map, up->stat.mapsz, fmd, sess);
163 		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, dsz)) {
164 			ERRX1("io_lowbuffer_alloc");
165 			return 0;
166 		}
167 		io_lowbuffer_buf(sess, *wb, &pos, *wbsz, fmd, dsz);
168 		up->stat.curst = BLKSTAT_DONE;
169 		return 1;
170 	case BLKSTAT_DONE:
171 		/*
172 		 * The data has been written.
173 		 * Clear our current send file and allow the block below
174 		 * to find another.
175 		 */
176 
177 		if (!sess->opts->dry_run)
178 			LOG3("%s: flushed %jd KB total, %.2f%% uploaded",
179 			    fl[up->cur->idx].path,
180 			    (intmax_t)up->stat.total / 1024,
181 			    100.0 * up->stat.dirty / up->stat.total);
182 		send_up_reset(up);
183 		return 1;
184 	case BLKSTAT_PHASE:
185 		/*
186 		 * This is where we actually stop the algorithm: we're
187 		 * already at the second phase.
188 		 */
189 
190 		send_up_reset(up);
191 		(*phase)++;
192 		return 1;
193 	case BLKSTAT_NEXT:
194 		/*
195 		 * Our last case: we need to find the
196 		 * next block (and token) to transmit to
197 		 * the receiver.
198 		 * These will drive the finite state
199 		 * machine in the first few conditional
200 		 * blocks of this set.
201 		 */
202 
203 		assert(up->stat.fd != -1);
204 		blk_match(sess, up->cur->blks,
205 			fl[up->cur->idx].path, &up->stat);
206 		return 1;
207 	case BLKSTAT_NONE:
208 		break;
209 	}
210 
211 	assert(BLKSTAT_NONE == up->stat.curst);
212 
213 	/*
214 	 * We've either hit the phase change following the last file (or
215 	 * start, or prior phase change), or we need to prime the next
216 	 * file for transmission.
217 	 * We special-case dry-run mode.
218 	 */
219 
220 	if (up->cur->idx < 0) {
221 		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
222 			ERRX1("io_lowbuffer_alloc");
223 			return 0;
224 		}
225 		io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);
226 
227 		if (sess->opts->server && sess->rver > 27) {
228 			if (!io_lowbuffer_alloc(sess,
229 			    wb, wbsz, wbmax, isz)) {
230 				ERRX1("io_lowbuffer_alloc");
231 				return 0;
232 			}
233 			io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);
234 		}
235 		up->stat.curst = BLKSTAT_PHASE;
236 	} else if (sess->opts->dry_run) {
237 		if (!sess->opts->server)
238 			LOG1("%s", fl[up->cur->idx].wpath);
239 
240 		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
241 			ERRX1("io_lowbuffer_alloc");
242 			return 0;
243 		}
244 		io_lowbuffer_int(sess, *wb, &pos, *wbsz, up->cur->idx);
245 		up->stat.curst = BLKSTAT_DONE;
246 	} else {
247 		assert(up->stat.fd != -1);
248 
249 		/*
250 		 * FIXME: use the nice output of log_file() and so on in
251 		 * downloader.c, which means moving this into
252 		 * BLKSTAT_DONE instead of having it be here.
253 		 */
254 
255 		if (!sess->opts->server)
256 			LOG1("%s", fl[up->cur->idx].wpath);
257 
258 		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, 20)) {
259 			ERRX1("io_lowbuffer_alloc");
260 			return 0;
261 		}
262 		assert(sizeof(buf) == 20);
263 		blk_recv_ack(buf, up->cur->blks, up->cur->idx);
264 		io_lowbuffer_buf(sess, *wb, &pos, *wbsz, buf, 20);
265 
266 		LOG3("%s: primed for %jd B total",
267 		    fl[up->cur->idx].path, (intmax_t)up->cur->blks->size);
268 		up->stat.curst = BLKSTAT_NEXT;
269 	}
270 
271 	return 1;
272 }
273 
274 /*
275  * Enqueue a download request, getting it off the read channel as
276  * quickly a possible.
277  * This frees up the read channel for further incoming requests.
278  * We'll handle each element in turn, up to and including the last
279  * request (phase change), which is always a -1 idx.
280  * Returns zero on failure, non-zero on success.
281  */
282 static int
283 send_dl_enqueue(struct sess *sess, struct send_dlq *q,
284 	int32_t idx, const struct flist *fl, size_t flsz, int fd)
285 {
286 	struct send_dl	*s;
287 
288 	/* End-of-phase marker. */
289 
290 	if (idx == -1) {
291 		if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
292 			ERR("calloc");
293 			return 0;
294 		}
295 		s->idx = -1;
296 		s->blks = NULL;
297 		TAILQ_INSERT_TAIL(q, s, entries);
298 		return 1;
299 	}
300 
301 	/* Validate the index. */
302 
303 	if (idx < 0 || (uint32_t)idx >= flsz) {
304 		ERRX("file index out of bounds: invalid %d out of %zu",
305 		    idx, flsz);
306 		return 0;
307 	} else if (S_ISDIR(fl[idx].st.mode)) {
308 		ERRX("blocks requested for "
309 			"directory: %s", fl[idx].path);
310 		return 0;
311 	} else if (S_ISLNK(fl[idx].st.mode)) {
312 		ERRX("blocks requested for "
313 			"symlink: %s", fl[idx].path);
314 		return 0;
315 	} else if (!S_ISREG(fl[idx].st.mode)) {
316 		ERRX("blocks requested for "
317 			"special: %s", fl[idx].path);
318 		return 0;
319 	}
320 
321 	if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
322 		ERR("callloc");
323 		return 0;
324 	}
325 	s->idx = idx;
326 	s->blks = NULL;
327 	TAILQ_INSERT_TAIL(q, s, entries);
328 
329 	/*
330 	 * This blocks til the full blockset has been read.
331 	 * That's ok, because the most important thing is getting data
332 	 * off the wire.
333 	 */
334 
335 	if (!sess->opts->dry_run) {
336 		s->blks = blk_recv(sess, fd, fl[idx].path);
337 		if (s->blks == NULL) {
338 			ERRX1("blk_recv");
339 			return 0;
340 		}
341 	}
342 	return 1;
343 }
344 
345 /*
346  * A client sender manages the read-only source files and sends data to
347  * the receiver as requested.
348  * First it sends its list of files, then it waits for the server to
349  * request updates to individual files.
350  * It queues requests for updates as soon as it receives them.
351  * Returns zero on failure, non-zero on success.
352  *
353  * Pledges: stdio, rpath, unveil.
354  */
355 int
356 rsync_sender(struct sess *sess, int fdin,
357 	int fdout, size_t argc, char **argv)
358 {
359 	struct flist	   *fl = NULL;
360 	const struct flist *f;
361 	size_t		    i, flsz = 0, phase = 0, excl;
362 	int		    rc = 0, c;
363 	int32_t		    idx;
364 	struct pollfd	    pfd[3];
365 	struct send_dlq	    sdlq;
366 	struct send_dl	   *dl;
367 	struct send_up	    up;
368 	struct stat	    st;
369 	void		   *wbuf = NULL;
370 	size_t		    wbufpos = 0, wbufsz = 0, wbufmax = 0;
371 	ssize_t		    ssz;
372 
373 	if (pledge("stdio getpw rpath unveil", NULL) == -1) {
374 		ERR("pledge");
375 		return 0;
376 	}
377 
378 	memset(&up, 0, sizeof(struct send_up));
379 	TAILQ_INIT(&sdlq);
380 	up.stat.fd = -1;
381 	up.stat.map = MAP_FAILED;
382 	up.stat.blktab = blkhash_alloc();
383 
384 	/*
385 	 * Generate the list of files we want to send from our
386 	 * command-line input.
387 	 * This will also remove all invalid files.
388 	 */
389 
390 	if (!flist_gen(sess, argc, argv, &fl, &flsz)) {
391 		ERRX1("flist_gen");
392 		goto out;
393 	}
394 
395 	/* Client sends zero-length exclusions if deleting. */
396 
397 	if (!sess->opts->server && sess->opts->del &&
398 	     !io_write_int(sess, fdout, 0)) {
399 		ERRX1("io_write_int");
400 		goto out;
401 	}
402 
403 	/*
404 	 * Then the file list in any mode.
405 	 * Finally, the IO error (always zero for us).
406 	 */
407 
408 	if (!flist_send(sess, fdin, fdout, fl, flsz)) {
409 		ERRX1("flist_send");
410 		goto out;
411 	} else if (!io_write_int(sess, fdout, 0)) {
412 		ERRX1("io_write_int");
413 		goto out;
414 	}
415 
416 	/* Exit if we're the server with zero files. */
417 
418 	if (flsz == 0 && sess->opts->server) {
419 		WARNX("sender has empty file list: exiting");
420 		rc = 1;
421 		goto out;
422 	} else if (!sess->opts->server)
423 		LOG1("Transfer starting: %zu files", flsz);
424 
425 	/*
426 	 * If we're the server, read our exclusion list.
427 	 * This is always 0 for now.
428 	 */
429 
430 	if (sess->opts->server) {
431 		if (!io_read_size(sess, fdin, &excl)) {
432 			ERRX1("io_read_size");
433 			goto out;
434 		} else if (excl != 0) {
435 			ERRX1("exclusion list is non-empty");
436 			goto out;
437 		}
438 	}
439 
440 	/*
441 	 * Set up our poll events.
442 	 * We start by polling only in receiver requests, enabling other
443 	 * poll events on demand.
444 	 */
445 
446 	pfd[0].fd = fdin; /* from receiver */
447 	pfd[0].events = POLLIN;
448 	pfd[1].fd = -1; /* to receiver */
449 	pfd[1].events = POLLOUT;
450 	pfd[2].fd = -1; /* from local file */
451 	pfd[2].events = POLLIN;
452 
453 	for (;;) {
454 		assert(pfd[0].fd != -1);
455 		if ((c = poll(pfd, 3, POLL_TIMEOUT)) == -1) {
456 			ERR("poll");
457 			goto out;
458 		} else if (c == 0) {
459 			ERRX("poll: timeout");
460 			goto out;
461 		}
462 		for (i = 0; i < 3; i++)
463 			if (pfd[i].revents & (POLLERR|POLLNVAL)) {
464 				ERRX("poll: bad fd");
465 				goto out;
466 			} else if (pfd[i].revents & POLLHUP) {
467 				ERRX("poll: hangup");
468 				goto out;
469 			}
470 
471 		/*
472 		 * If we have a request coming down off the wire, pull
473 		 * it in as quickly as possible into our buffer.
474 		 * Start by seeing if we have a log message.
475 		 * If we do, pop it off, then see if we have anything
476 		 * left and hit it again if so (read priority).
477 		 */
478 
479 		if (sess->mplex_reads && (pfd[0].revents & POLLIN)) {
480 			if (!io_read_flush(sess, fdin)) {
481 				ERRX1("io_read_flush");
482 				goto out;
483 			} else if (sess->mplex_read_remain == 0) {
484 				c = io_read_check(fdin);
485 				if (c < 0) {
486 					ERRX1("io_read_check");
487 					goto out;
488 				} else if (c > 0)
489 					continue;
490 				pfd[0].revents &= ~POLLIN;
491 			}
492 		}
493 
494 		/*
495 		 * Now that we've handled the log messages, we're left
496 		 * here if we have any actual data coming down.
497 		 * Enqueue message requests, then loop again if we see
498 		 * more data (read priority).
499 		 */
500 
501 		if (pfd[0].revents & POLLIN) {
502 			if (!io_read_int(sess, fdin, &idx)) {
503 				ERRX1("io_read_int");
504 				goto out;
505 			}
506 			if (!send_dl_enqueue(sess,
507 			    &sdlq, idx, fl, flsz, fdin)) {
508 				ERRX1("send_dl_enqueue");
509 				goto out;
510 			}
511 			c = io_read_check(fdin);
512 			if (c < 0) {
513 				ERRX1("io_read_check");
514 				goto out;
515 			} else if (c > 0)
516 				continue;
517 		}
518 
519 		/*
520 		 * One of our local files has been opened in response
521 		 * to a receiver request and now we can map it.
522 		 * We'll respond to the event by looking at the map when
523 		 * the writer is available.
524 		 * Here we also enable the poll event for output.
525 		 */
526 
527 		if (pfd[2].revents & POLLIN) {
528 			assert(up.cur != NULL);
529 			assert(up.stat.fd != -1);
530 			assert(up.stat.map == MAP_FAILED);
531 			assert(up.stat.mapsz == 0);
532 			f = &fl[up.cur->idx];
533 
534 			if (fstat(up.stat.fd, &st) == -1) {
535 				ERR("%s: fstat", f->path);
536 				goto out;
537 			}
538 
539 			/*
540 			 * If the file is zero-length, the map will
541 			 * fail, but either way we want to unset that
542 			 * we're waiting for the file to open and set
543 			 * that we're ready for the output channel.
544 			 */
545 
546 			if ((up.stat.mapsz = st.st_size) > 0) {
547 				up.stat.map = mmap(NULL,
548 					up.stat.mapsz, PROT_READ,
549 					MAP_SHARED, up.stat.fd, 0);
550 				if (up.stat.map == MAP_FAILED) {
551 					ERR("%s: mmap", f->path);
552 					goto out;
553 				}
554 			}
555 
556 			pfd[2].fd = -1;
557 			pfd[1].fd = fdout;
558 		}
559 
560 		/*
561 		 * If we have buffers waiting to write, write them out
562 		 * as soon as we can in a non-blocking fashion.
563 		 * We must not be waiting for any local files.
564 		 * ALL WRITES MUST HAPPEN HERE.
565 		 * This keeps the sender deadlock-free.
566 		 */
567 
568 		if ((pfd[1].revents & POLLOUT) && wbufsz > 0) {
569 			assert(pfd[2].fd == -1);
570 			assert(wbufsz - wbufpos);
571 			ssz = write(fdout,
572 				wbuf + wbufpos, wbufsz - wbufpos);
573 			if (ssz == -1) {
574 				ERR("write");
575 				goto out;
576 			}
577 			wbufpos += ssz;
578 			if (wbufpos == wbufsz)
579 				wbufpos = wbufsz = 0;
580 			pfd[1].revents &= ~POLLOUT;
581 
582 			/* This is usually in io.c... */
583 
584 			sess->total_write += ssz;
585 		}
586 
587 		/*
588 		 * Engage the FSM for the current transfer.
589 		 * If our phase changes, stop processing.
590 		 */
591 
592 		if (pfd[1].revents & POLLOUT && up.cur != NULL) {
593 			assert(pfd[2].fd == -1);
594 			assert(wbufpos == 0 && wbufsz == 0);
595 			if (!send_up_fsm(sess, &phase,
596 			    &up, &wbuf, &wbufsz, &wbufmax, fl)) {
597 				ERRX1("send_up_fsm");
598 				goto out;
599 			} else if (phase > 1)
600 				break;
601 		}
602 
603 		/*
604 		 * Incoming queue management.
605 		 * If we have no queue component that we're waiting on,
606 		 * then pull off the receiver-request queue and start
607 		 * processing the request.
608 		 */
609 
610 		if (up.cur == NULL) {
611 			assert(pfd[2].fd == -1);
612 			assert(up.stat.fd == -1);
613 			assert(up.stat.map == MAP_FAILED);
614 			assert(up.stat.mapsz == 0);
615 			assert(wbufsz == 0 && wbufpos == 0);
616 			pfd[1].fd = -1;
617 
618 			/*
619 			 * If there's nothing in the queue, then keep
620 			 * the output channel disabled and wait for
621 			 * whatever comes next from the reader.
622 			 */
623 
624 			if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL)
625 				continue;
626 			TAILQ_REMOVE(&sdlq, up.cur, entries);
627 
628 			/* Hash our blocks. */
629 
630 			blkhash_set(up.stat.blktab, up.cur->blks);
631 
632 			/*
633 			 * End of phase: enable channel to receiver.
634 			 * We'll need our output buffer enabled in order
635 			 * to process this event.
636 			 */
637 
638 			if (up.cur->idx == -1) {
639 				pfd[1].fd = fdout;
640 				continue;
641 			}
642 
643 			/*
644 			 * Non-blocking open of file.
645 			 * This will be picked up in the state machine
646 			 * block of not being primed.
647 			 */
648 
649 			up.stat.fd = open(fl[up.cur->idx].path,
650 				O_RDONLY|O_NONBLOCK, 0);
651 			if (up.stat.fd == -1) {
652 				ERR("%s: open", fl[up.cur->idx].path);
653 				goto out;
654 			}
655 			pfd[2].fd = up.stat.fd;
656 		}
657 	}
658 
659 	if (!TAILQ_EMPTY(&sdlq)) {
660 		ERRX("phases complete with files still queued");
661 		goto out;
662 	}
663 
664 	if (!sess_stats_send(sess, fdout)) {
665 		ERRX1("sess_stats_end");
666 		goto out;
667 	}
668 
669 	/* Final "goodbye" message. */
670 
671 	if (!io_read_int(sess, fdin, &idx)) {
672 		ERRX1("io_read_int");
673 		goto out;
674 	} else if (idx != -1) {
675 		ERRX("read incorrect update complete ack");
676 		goto out;
677 	}
678 
679 	LOG2("sender finished updating");
680 	rc = 1;
681 out:
682 	send_up_reset(&up);
683 	while ((dl = TAILQ_FIRST(&sdlq)) != NULL) {
684 		TAILQ_REMOVE(&sdlq, dl, entries);
685 		free(dl->blks);
686 		free(dl);
687 	}
688 	flist_free(fl, flsz);
689 	free(wbuf);
690 	blkhash_free(up.stat.blktab);
691 	return rc;
692 }
693