xref: /dragonfly/sbin/hammer/cmd_mirror.c (revision bc3d4063)
1 /*
2  * Copyright (c) 2008 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.14 2008/08/21 23:28:43 thomas Exp $
35  */
36 
37 #include "hammer.h"
38 
39 #define SERIALBUF_SIZE	(512 * 1024)
40 
41 static int read_mrecords(int fd, char *buf, u_int size,
42 			 hammer_ioc_mrecord_head_t pickup);
43 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp,
44 			 hammer_ioc_mrecord_head_t pickup);
45 static void write_mrecord(int fdout, u_int32_t type,
46 			 hammer_ioc_mrecord_any_t mrec, int bytes);
47 static void generate_mrec_header(int fd, int fdout, int pfs_id,
48 			 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
49 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
50 			 struct hammer_ioc_mrecord_head *pickup,
51 			 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
52 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id);
53 static ssize_t writebw(int fd, const void *buf, size_t nbytes,
54 			u_int64_t *bwcount, struct timeval *tv1);
55 static void mirror_usage(int code);
56 
57 /*
58  * Generate a mirroring data stream from the specific source over the
59  * entire key range, but restricted to the specified transaction range.
60  *
61  * The HAMMER VFS does most of the work, we add a few new mrecord
62  * types to negotiate the TID ranges and verify that the entire
63  * stream made it to the destination.
64  */
65 void
66 hammer_cmd_mirror_read(char **av, int ac, int streaming)
67 {
68 	struct hammer_ioc_mirror_rw mirror;
69 	struct hammer_ioc_pseudofs_rw pfs;
70 	union hammer_ioc_mrecord_any mrec_tmp;
71 	struct hammer_ioc_mrecord_head pickup;
72 	hammer_ioc_mrecord_any_t mrec;
73 	hammer_tid_t sync_tid;
74 	const char *filesystem;
75 	char *buf = malloc(SERIALBUF_SIZE);
76 	int interrupted = 0;
77 	int error;
78 	int fd;
79 	int n;
80 	int didwork;
81 	int64_t total_bytes;
82 	time_t base_t = time(NULL);
83 	struct timeval bwtv;
84 	u_int64_t bwcount;
85 
86 	if (ac == 0 || ac > 2)
87 		mirror_usage(1);
88 	filesystem = av[0];
89 
90 	pickup.signature = 0;
91 	pickup.type = 0;
92 
93 again:
94 	bzero(&mirror, sizeof(mirror));
95 	hammer_key_beg_init(&mirror.key_beg);
96 	hammer_key_end_init(&mirror.key_end);
97 
98 	fd = getpfs(&pfs, filesystem);
99 
100 	if (streaming && VerboseOpt) {
101 		fprintf(stderr, "\nRunning");
102 		fflush(stderr);
103 	}
104 	total_bytes = 0;
105 	gettimeofday(&bwtv, NULL);
106 	bwcount = 0;
107 
108 	/*
109 	 * In 2-way mode the target will send us a PFS info packet
110 	 * first.  Use the target's current snapshot TID as our default
111 	 * begin TID.
112 	 */
113 	mirror.tid_beg = 0;
114 	if (TwoWayPipeOpt) {
115 		n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup,
116 					 NULL, &mirror.tid_beg);
117 		if (n < 0) {	/* got TERM record */
118 			relpfs(fd, &pfs);
119 			return;
120 		}
121 		++mirror.tid_beg;
122 	}
123 
124 	/*
125 	 * Write out the PFS header, tid_beg will be updated if our PFS
126 	 * has a larger begin sync.  tid_end is set to the latest source
127 	 * TID whos flush cycle has completed.
128 	 */
129 	generate_mrec_header(fd, 1, pfs.pfs_id,
130 			     &mirror.tid_beg, &mirror.tid_end);
131 
132 	/* XXX streaming mode support w/ cycle or command line arg */
133 	/*
134 	 * A cycle file overrides the beginning TID
135 	 */
136 	hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg);
137 
138 	if (ac == 2)
139 		mirror.tid_beg = strtoull(av[1], NULL, 0);
140 
141 	if (streaming == 0 || VerboseOpt >= 2) {
142 		fprintf(stderr,
143 			"Mirror-read: Mirror from %016llx to %016llx\n",
144 			mirror.tid_beg, mirror.tid_end);
145 	}
146 	if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) {
147 		fprintf(stderr, "Mirror-read: Resuming at object %016llx\n",
148 			mirror.key_beg.obj_id);
149 	}
150 
151 	/*
152 	 * Nothing to do if begin equals end.
153 	 */
154 	if (mirror.tid_beg >= mirror.tid_end) {
155 		if (streaming == 0 || VerboseOpt >= 2)
156 			fprintf(stderr, "Mirror-read: No work to do\n");
157 		didwork = 0;
158 		goto done;
159 	}
160 	didwork = 1;
161 
162 	/*
163 	 * Write out bulk records
164 	 */
165 	mirror.ubuf = buf;
166 	mirror.size = SERIALBUF_SIZE;
167 
168 	do {
169 		mirror.count = 0;
170 		mirror.pfs_id = pfs.pfs_id;
171 		mirror.shared_uuid = pfs.ondisk->shared_uuid;
172 		if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) {
173 			fprintf(stderr, "Mirror-read %s failed: %s\n",
174 				filesystem, strerror(errno));
175 			exit(1);
176 		}
177 		if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
178 			fprintf(stderr,
179 				"Mirror-read %s fatal error %d\n",
180 				filesystem, mirror.head.error);
181 			exit(1);
182 		}
183 		if (mirror.count) {
184 			if (BandwidthOpt) {
185 				n = writebw(1, mirror.ubuf, mirror.count,
186 					    &bwcount, &bwtv);
187 			} else {
188 				n = write(1, mirror.ubuf, mirror.count);
189 			}
190 			if (n != mirror.count) {
191 				fprintf(stderr, "Mirror-read %s failed: "
192 						"short write\n",
193 				filesystem);
194 				exit(1);
195 			}
196 		}
197 		total_bytes += mirror.count;
198 		if (streaming && VerboseOpt) {
199 			fprintf(stderr, "\r%016llx %11lld",
200 				mirror.key_cur.obj_id,
201 				total_bytes);
202 			fflush(stderr);
203 		}
204 		mirror.key_beg = mirror.key_cur;
205 		if (TimeoutOpt &&
206 		    (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) {
207 			fprintf(stderr,
208 				"Mirror-read %s interrupted by timer at"
209 				" %016llx\n",
210 				filesystem,
211 				mirror.key_cur.obj_id);
212 			interrupted = 1;
213 			break;
214 		}
215 	} while (mirror.count != 0);
216 
217 done:
218 	/*
219 	 * Write out the termination sync record - only if not interrupted
220 	 */
221 	if (interrupted == 0) {
222 		if (didwork) {
223 			write_mrecord(1, HAMMER_MREC_TYPE_SYNC,
224 				      &mrec_tmp, sizeof(mrec_tmp.sync));
225 		} else {
226 			write_mrecord(1, HAMMER_MREC_TYPE_IDLE,
227 				      &mrec_tmp, sizeof(mrec_tmp.sync));
228 		}
229 	}
230 
231 	/*
232 	 * If the -2 option was given (automatic when doing mirror-copy),
233 	 * a two-way pipe is assumed and we expect a response mrec from
234 	 * the target.
235 	 */
236 	if (TwoWayPipeOpt) {
237 		mrec = read_mrecord(0, &error, &pickup);
238 		if (mrec == NULL ||
239 		    mrec->head.type != HAMMER_MREC_TYPE_UPDATE ||
240 		    mrec->head.rec_size != sizeof(mrec->update)) {
241 			fprintf(stderr, "mirror_read: Did not get final "
242 					"acknowledgement packet from target\n");
243 			exit(1);
244 		}
245 		if (interrupted) {
246 			if (CyclePath) {
247 				hammer_set_cycle(&mirror.key_cur, mirror.tid_beg);
248 				fprintf(stderr, "Cyclefile %s updated for "
249 					"continuation\n", CyclePath);
250 			}
251 		} else {
252 			sync_tid = mrec->update.tid;
253 			if (CyclePath) {
254 				hammer_key_beg_init(&mirror.key_beg);
255 				hammer_set_cycle(&mirror.key_beg, sync_tid);
256 				fprintf(stderr, "Cyclefile %s updated to 0x%016llx\n",
257 					CyclePath, sync_tid);
258 			}
259 		}
260 	} else if (CyclePath) {
261 		/* NOTE! mirror.tid_beg cannot be updated */
262 		fprintf(stderr, "Warning: cycle file (-c option) cannot be "
263 				"fully updated unless you use mirror-copy\n");
264 		hammer_set_cycle(&mirror.key_beg, mirror.tid_beg);
265 	}
266 	if (streaming && interrupted == 0) {
267 		time_t t1 = time(NULL);
268 		time_t t2;
269 
270 		if (VerboseOpt) {
271 			fprintf(stderr, " W");
272 			fflush(stderr);
273 		}
274 		pfs.ondisk->sync_end_tid = mirror.tid_end;
275 		if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) {
276 			fprintf(stderr, "Mirror-read %s: cannot stream: %s\n",
277 				filesystem, strerror(errno));
278 		} else {
279 			t2 = time(NULL) - t1;
280 			if (t2 >= 0 && t2 < DelayOpt) {
281 				if (VerboseOpt) {
282 					fprintf(stderr, "\bD");
283 					fflush(stderr);
284 				}
285 				sleep(DelayOpt - t2);
286 			}
287 			if (VerboseOpt) {
288 				fprintf(stderr, "\b ");
289 				fflush(stderr);
290 			}
291 			relpfs(fd, &pfs);
292 			goto again;
293 		}
294 	}
295 	write_mrecord(1, HAMMER_MREC_TYPE_TERM,
296 		      &mrec_tmp, sizeof(mrec_tmp.sync));
297 	relpfs(fd, &pfs);
298 	fprintf(stderr, "Mirror-read %s succeeded\n", filesystem);
299 }
300 
301 /*
302  * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding
303  * some additional packet types to negotiate TID ranges and to verify
304  * completion.  The HAMMER VFS does most of the work.
305  *
306  * It is important to note that the mirror.key_{beg,end} range must
307  * match the ranged used by the original.  For now both sides use
308  * range the entire key space.
309  *
310  * It is even more important that the records in the stream conform
311  * to the TID range also supplied in the stream.  The HAMMER VFS will
312  * use the REC, PASS, and SKIP record types to track the portions of
313  * the B-Tree being scanned in order to be able to proactively delete
314  * records on the target within those active areas that are not mentioned
315  * by the source.
316  *
317  * The mirror.key_cur field is used by the VFS to do this tracking.  It
318  * must be initialized to key_beg but then is persistently updated by
319  * the HAMMER VFS on each successive ioctl() call.  If you blow up this
320  * field you will blow up the mirror target, possibly to the point of
321  * deleting everything.  As a safety measure the HAMMER VFS simply marks
322  * the records that the source has destroyed as deleted on the target,
323  * and normal pruning operations will deal with their final disposition
324  * at some later time.
325  */
326 void
327 hammer_cmd_mirror_write(char **av, int ac)
328 {
329 	struct hammer_ioc_mirror_rw mirror;
330 	const char *filesystem;
331 	char *buf = malloc(SERIALBUF_SIZE);
332 	struct hammer_ioc_pseudofs_rw pfs;
333 	struct hammer_ioc_mrecord_head pickup;
334 	struct hammer_ioc_synctid synctid;
335 	union hammer_ioc_mrecord_any mrec_tmp;
336 	hammer_ioc_mrecord_any_t mrec;
337 	int error;
338 	int fd;
339 	int n;
340 
341 	if (ac != 1)
342 		mirror_usage(1);
343 	filesystem = av[0];
344 
345 	pickup.signature = 0;
346 	pickup.type = 0;
347 
348 again:
349 	bzero(&mirror, sizeof(mirror));
350 	hammer_key_beg_init(&mirror.key_beg);
351 	hammer_key_end_init(&mirror.key_end);
352 	mirror.key_end = mirror.key_beg;
353 
354 	fd = getpfs(&pfs, filesystem);
355 
356 	/*
357 	 * In two-way mode the target writes out a PFS packet first.
358 	 * The source uses our tid_end as its tid_beg by default,
359 	 * picking up where it left off.
360 	 */
361 	mirror.tid_beg = 0;
362 	if (TwoWayPipeOpt) {
363 		generate_mrec_header(fd, 1, pfs.pfs_id,
364 				     &mirror.tid_beg, &mirror.tid_end);
365 	}
366 
367 	/*
368 	 * Read and process the PFS header.  The source informs us of
369 	 * the TID range the stream represents.
370 	 */
371 	n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup,
372 				 &mirror.tid_beg, &mirror.tid_end);
373 	if (n < 0) {	/* got TERM record */
374 		relpfs(fd, &pfs);
375 		return;
376 	}
377 
378 	mirror.ubuf = buf;
379 	mirror.size = SERIALBUF_SIZE;
380 
381 	/*
382 	 * Read and process bulk records (REC, PASS, and SKIP types).
383 	 *
384 	 * On your life, do NOT mess with mirror.key_cur or your mirror
385 	 * target may become history.
386 	 */
387 	for (;;) {
388 		mirror.count = 0;
389 		mirror.pfs_id = pfs.pfs_id;
390 		mirror.shared_uuid = pfs.ondisk->shared_uuid;
391 		mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
392 		if (mirror.size <= 0)
393 			break;
394 		if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) {
395 			fprintf(stderr, "Mirror-write %s failed: %s\n",
396 				filesystem, strerror(errno));
397 			exit(1);
398 		}
399 		if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
400 			fprintf(stderr,
401 				"Mirror-write %s fatal error %d\n",
402 				filesystem, mirror.head.error);
403 			exit(1);
404 		}
405 #if 0
406 		if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) {
407 			fprintf(stderr,
408 				"Mirror-write %s interrupted by timer at"
409 				" %016llx\n",
410 				filesystem,
411 				mirror.key_cur.obj_id);
412 			exit(0);
413 		}
414 #endif
415 	}
416 
417 	/*
418 	 * Read and process the termination sync record.
419 	 */
420 	mrec = read_mrecord(0, &error, &pickup);
421 
422 	if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) {
423 		fprintf(stderr, "Mirror-write: received termination request\n");
424 		free(mrec);
425 		return;
426 	}
427 
428 	if (mrec == NULL ||
429 	    (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
430 	     mrec->head.type != HAMMER_MREC_TYPE_IDLE) ||
431 	    mrec->head.rec_size != sizeof(mrec->sync)) {
432 		fprintf(stderr, "Mirror-write %s: Did not get termination "
433 				"sync record, or rec_size is wrong rt=%d\n",
434 				filesystem, mrec->head.type);
435 		exit(1);
436 	}
437 
438 	/*
439 	 * Update the PFS info on the target so the user has visibility
440 	 * into the new snapshot, and sync the target filesystem.
441 	 */
442 	if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) {
443 		update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id);
444 
445 		bzero(&synctid, sizeof(synctid));
446 		synctid.op = HAMMER_SYNCTID_SYNC2;
447 		ioctl(fd, HAMMERIOC_SYNCTID, &synctid);
448 
449 		if (VerboseOpt >= 2) {
450 			fprintf(stderr, "Mirror-write %s: succeeded\n",
451 				filesystem);
452 		}
453 	}
454 
455 	free(mrec);
456 	mrec = NULL;
457 
458 	/*
459 	 * Report back to the originator.
460 	 */
461 	if (TwoWayPipeOpt) {
462 		mrec_tmp.update.tid = mirror.tid_end;
463 		write_mrecord(1, HAMMER_MREC_TYPE_UPDATE,
464 			      &mrec_tmp, sizeof(mrec_tmp.update));
465 	} else {
466 		printf("Source can update synctid to 0x%016llx\n",
467 		       mirror.tid_end);
468 	}
469 	relpfs(fd, &pfs);
470 	goto again;
471 }
472 
473 void
474 hammer_cmd_mirror_dump(void)
475 {
476 	char *buf = malloc(SERIALBUF_SIZE);
477 	struct hammer_ioc_mrecord_head pickup;
478 	hammer_ioc_mrecord_any_t mrec;
479 	int error;
480 	int size;
481 	int offset;
482 	int bytes;
483 
484 	/*
485 	 * Read and process the PFS header
486 	 */
487 	pickup.signature = 0;
488 	pickup.type = 0;
489 
490 	mrec = read_mrecord(0, &error, &pickup);
491 
492 	/*
493 	 * Read and process bulk records
494 	 */
495 	for (;;) {
496 		size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
497 		if (size <= 0)
498 			break;
499 		offset = 0;
500 		while (offset < size) {
501 			mrec = (void *)((char *)buf + offset);
502 			bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
503 			if (offset + bytes > size) {
504 				fprintf(stderr, "Misaligned record\n");
505 				exit(1);
506 			}
507 
508 			switch(mrec->head.type) {
509 			case HAMMER_MREC_TYPE_REC:
510 				printf("Record obj=%016llx key=%016llx "
511 				       "rt=%02x ot=%02x\n",
512 					mrec->rec.leaf.base.obj_id,
513 					mrec->rec.leaf.base.key,
514 					mrec->rec.leaf.base.rec_type,
515 					mrec->rec.leaf.base.obj_type);
516 				printf("       tids %016llx:%016llx data=%d\n",
517 					mrec->rec.leaf.base.create_tid,
518 					mrec->rec.leaf.base.delete_tid,
519 					mrec->rec.leaf.data_len);
520 				break;
521 			case HAMMER_MREC_TYPE_PASS:
522 				printf("Pass   obj=%016llx key=%016llx "
523 				       "rt=%02x ot=%02x\n",
524 					mrec->rec.leaf.base.obj_id,
525 					mrec->rec.leaf.base.key,
526 					mrec->rec.leaf.base.rec_type,
527 					mrec->rec.leaf.base.obj_type);
528 				printf("       tids %016llx:%016llx data=%d\n",
529 					mrec->rec.leaf.base.create_tid,
530 					mrec->rec.leaf.base.delete_tid,
531 					mrec->rec.leaf.data_len);
532 				break;
533 			case HAMMER_MREC_TYPE_SKIP:
534 				printf("Skip   obj=%016llx key=%016llx rt=%02x to\n"
535 				       "       obj=%016llx key=%016llx rt=%02x\n",
536 				       mrec->skip.skip_beg.obj_id,
537 				       mrec->skip.skip_beg.key,
538 				       mrec->skip.skip_beg.rec_type,
539 				       mrec->skip.skip_end.obj_id,
540 				       mrec->skip.skip_end.key,
541 				       mrec->skip.skip_end.rec_type);
542 			default:
543 				break;
544 			}
545 			offset += bytes;
546 		}
547 	}
548 
549 	/*
550 	 * Read and process the termination sync record.
551 	 */
552 	mrec = read_mrecord(0, &error, &pickup);
553 	if (mrec == NULL ||
554 	    (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
555 	     mrec->head.type != HAMMER_MREC_TYPE_IDLE)
556 	 ) {
557 		fprintf(stderr, "Mirror-dump: Did not get termination "
558 				"sync record\n");
559 	}
560 }
561 
562 void
563 hammer_cmd_mirror_copy(char **av, int ac, int streaming)
564 {
565 	pid_t pid1;
566 	pid_t pid2;
567 	int fds[2];
568 	const char *xav[16];
569 	char tbuf[16];
570 	char *ptr;
571 	int xac;
572 
573 	if (ac != 2)
574 		mirror_usage(1);
575 
576 	if (pipe(fds) < 0) {
577 		perror("pipe");
578 		exit(1);
579 	}
580 
581 	TwoWayPipeOpt = 1;
582 
583 	/*
584 	 * Source
585 	 */
586 	if ((pid1 = fork()) == 0) {
587 		dup2(fds[0], 0);
588 		dup2(fds[0], 1);
589 		close(fds[0]);
590 		close(fds[1]);
591 		if ((ptr = strchr(av[0], ':')) != NULL) {
592 			*ptr++ = 0;
593 			xac = 0;
594 			xav[xac++] = "ssh";
595 			xav[xac++] = av[0];
596 			xav[xac++] = "hammer";
597 
598 			switch(VerboseOpt) {
599 			case 0:
600 				break;
601 			case 1:
602 				xav[xac++] = "-v";
603 				break;
604 			case 2:
605 				xav[xac++] = "-vv";
606 				break;
607 			default:
608 				xav[xac++] = "-vvv";
609 				break;
610 			}
611 			xav[xac++] = "-2";
612 			if (TimeoutOpt) {
613 				snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt);
614 				xav[xac++] = "-t";
615 				xav[xac++] = tbuf;
616 			}
617 			if (streaming)
618 				xav[xac++] = "mirror-read-streaming";
619 			else
620 				xav[xac++] = "mirror-read";
621 			xav[xac++] = ptr;
622 			xav[xac++] = NULL;
623 			execv("/usr/bin/ssh", (void *)xav);
624 		} else {
625 			hammer_cmd_mirror_read(av, 1, streaming);
626 			fflush(stdout);
627 			fflush(stderr);
628 		}
629 		_exit(1);
630 	}
631 
632 	/*
633 	 * Target
634 	 */
635 	if ((pid2 = fork()) == 0) {
636 		dup2(fds[1], 0);
637 		dup2(fds[1], 1);
638 		close(fds[0]);
639 		close(fds[1]);
640 		if ((ptr = strchr(av[1], ':')) != NULL) {
641 			*ptr++ = 0;
642 			xac = 0;
643 			xav[xac++] = "ssh";
644 			xav[xac++] = av[1];
645 			xav[xac++] = "hammer";
646 
647 			switch(VerboseOpt) {
648 			case 0:
649 				break;
650 			case 1:
651 				xav[xac++] = "-v";
652 				break;
653 			case 2:
654 				xav[xac++] = "-vv";
655 				break;
656 			default:
657 				xav[xac++] = "-vvv";
658 				break;
659 			}
660 
661 			xav[xac++] = "-2";
662 			xav[xac++] = "mirror-write";
663 			xav[xac++] = ptr;
664 			xav[xac++] = NULL;
665 			execv("/usr/bin/ssh", (void *)xav);
666 		} else {
667 			hammer_cmd_mirror_write(av + 1, 1);
668 			fflush(stdout);
669 			fflush(stderr);
670 		}
671 		_exit(1);
672 	}
673 	close(fds[0]);
674 	close(fds[1]);
675 
676 	while (waitpid(pid1, NULL, 0) <= 0)
677 		;
678 	while (waitpid(pid2, NULL, 0) <= 0)
679 		;
680 }
681 
682 /*
683  * Read and return multiple mrecords
684  */
685 static int
686 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup)
687 {
688 	hammer_ioc_mrecord_any_t mrec;
689 	u_int count;
690 	size_t n;
691 	size_t i;
692 	size_t bytes;
693 
694 	count = 0;
695 	while (size - count >= HAMMER_MREC_HEADSIZE) {
696 		/*
697 		 * Cached the record header in case we run out of buffer
698 		 * space.
699 		 */
700 		fflush(stdout);
701 		if (pickup->signature == 0) {
702 			for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
703 				i = read(fd, (char *)pickup + n,
704 					 HAMMER_MREC_HEADSIZE - n);
705 				if (i <= 0)
706 					break;
707 			}
708 			if (n == 0)
709 				break;
710 			if (n != HAMMER_MREC_HEADSIZE) {
711 				fprintf(stderr, "read_mrecords: short read on pipe\n");
712 				exit(1);
713 			}
714 
715 			if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) {
716 				fprintf(stderr, "read_mrecords: malformed record on pipe, "
717 					"bad signature\n");
718 				exit(1);
719 			}
720 		}
721 		if (pickup->rec_size < HAMMER_MREC_HEADSIZE ||
722 		    pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) {
723 			fprintf(stderr, "read_mrecords: malformed record on pipe, "
724 				"illegal rec_size\n");
725 			exit(1);
726 		}
727 
728 		/*
729 		 * Stop if we have insufficient space for the record and data.
730 		 */
731 		bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size);
732 		if (size - count < bytes)
733 			break;
734 
735 		/*
736 		 * Stop if the record type is not a REC or a SKIP (the only
737 		 * two types the ioctl supports.  Other types are used only
738 		 * by the userland protocol).
739 		 */
740 		if (pickup->type != HAMMER_MREC_TYPE_REC &&
741 		    pickup->type != HAMMER_MREC_TYPE_SKIP &&
742 		    pickup->type != HAMMER_MREC_TYPE_PASS) {
743 			break;
744 		}
745 
746 		/*
747 		 * Read the remainder and clear the pickup signature.
748 		 */
749 		for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) {
750 			i = read(fd, buf + count + n, bytes - n);
751 			if (i <= 0)
752 				break;
753 		}
754 		if (n != bytes) {
755 			fprintf(stderr, "read_mrecords: short read on pipe\n");
756 			exit(1);
757 		}
758 
759 		bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE);
760 		pickup->signature = 0;
761 		pickup->type = 0;
762 		mrec = (void *)(buf + count);
763 
764 		/*
765 		 * Validate the completed record
766 		 */
767 		if (mrec->head.rec_crc !=
768 		    crc32((char *)mrec + HAMMER_MREC_CRCOFF,
769 			  mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
770 			fprintf(stderr, "read_mrecords: malformed record "
771 					"on pipe, bad crc\n");
772 			exit(1);
773 		}
774 
775 		/*
776 		 * If its a B-Tree record validate the data crc
777 		 */
778 		if (mrec->head.type == HAMMER_MREC_TYPE_REC) {
779 			if (mrec->head.rec_size <
780 			    sizeof(mrec->rec) + mrec->rec.leaf.data_len) {
781 				fprintf(stderr,
782 					"read_mrecords: malformed record on "
783 					"pipe, illegal element data_len\n");
784 				exit(1);
785 			}
786 			if (mrec->rec.leaf.data_len &&
787 			    mrec->rec.leaf.data_offset &&
788 			    hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) {
789 				fprintf(stderr,
790 					"read_mrecords: data_crc did not "
791 					"match data! obj=%016llx key=%016llx\n",
792 					mrec->rec.leaf.base.obj_id,
793 					mrec->rec.leaf.base.key);
794 				fprintf(stderr,
795 					"continuing, but there are problems\n");
796 			}
797 		}
798 		count += bytes;
799 	}
800 	return(count);
801 }
802 
803 /*
804  * Read and return a single mrecord.
805  */
806 static
807 hammer_ioc_mrecord_any_t
808 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup)
809 {
810 	hammer_ioc_mrecord_any_t mrec;
811 	struct hammer_ioc_mrecord_head mrechd;
812 	size_t bytes;
813 	size_t n;
814 	size_t i;
815 
816 	if (pickup && pickup->type != 0) {
817 		mrechd = *pickup;
818 		pickup->signature = 0;
819 		pickup->type = 0;
820 		n = HAMMER_MREC_HEADSIZE;
821 	} else {
822 		/*
823 		 * Read in the PFSD header from the sender.
824 		 */
825 		for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
826 			i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n);
827 			if (i <= 0)
828 				break;
829 		}
830 		if (n == 0) {
831 			*errorp = 0;	/* EOF */
832 			return(NULL);
833 		}
834 		if (n != HAMMER_MREC_HEADSIZE) {
835 			fprintf(stderr, "short read of mrecord header\n");
836 			*errorp = EPIPE;
837 			return(NULL);
838 		}
839 	}
840 	if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) {
841 		fprintf(stderr, "read_mrecord: bad signature\n");
842 		*errorp = EINVAL;
843 		return(NULL);
844 	}
845 	bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size);
846 	assert(bytes >= sizeof(mrechd));
847 	mrec = malloc(bytes);
848 	mrec->head = mrechd;
849 
850 	while (n < bytes) {
851 		i = read(fdin, (char *)mrec + n, bytes - n);
852 		if (i <= 0)
853 			break;
854 		n += i;
855 	}
856 	if (n != bytes) {
857 		fprintf(stderr, "read_mrecord: short read on payload\n");
858 		*errorp = EPIPE;
859 		return(NULL);
860 	}
861 	if (mrec->head.rec_crc !=
862 	    crc32((char *)mrec + HAMMER_MREC_CRCOFF,
863 		  mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
864 		fprintf(stderr, "read_mrecord: bad CRC\n");
865 		*errorp = EINVAL;
866 		return(NULL);
867 	}
868 	*errorp = 0;
869 	return(mrec);
870 }
871 
872 static
873 void
874 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec,
875 	      int bytes)
876 {
877 	char zbuf[HAMMER_HEAD_ALIGN];
878 	int pad;
879 
880 	pad = HAMMER_HEAD_DOALIGN(bytes) - bytes;
881 
882 	assert(bytes >= (int)sizeof(mrec->head));
883 	bzero(&mrec->head, sizeof(mrec->head));
884 	mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
885 	mrec->head.type = type;
886 	mrec->head.rec_size = bytes;
887 	mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF,
888 				   bytes - HAMMER_MREC_CRCOFF);
889 	if (write(fdout, mrec, bytes) != bytes) {
890 		fprintf(stderr, "write_mrecord: error %d (%s)\n",
891 			errno, strerror(errno));
892 		exit(1);
893 	}
894 	if (pad) {
895 		bzero(zbuf, pad);
896 		if (write(fdout, zbuf, pad) != pad) {
897 			fprintf(stderr, "write_mrecord: error %d (%s)\n",
898 				errno, strerror(errno));
899 			exit(1);
900 		}
901 	}
902 }
903 
904 /*
905  * Generate a mirroring header with the pfs information of the
906  * originating filesytem.
907  */
908 static void
909 generate_mrec_header(int fd, int fdout, int pfs_id,
910 		     hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
911 {
912 	struct hammer_ioc_pseudofs_rw pfs;
913 	union hammer_ioc_mrecord_any mrec_tmp;
914 
915 	bzero(&pfs, sizeof(pfs));
916 	bzero(&mrec_tmp, sizeof(mrec_tmp));
917 	pfs.pfs_id = pfs_id;
918 	pfs.ondisk = &mrec_tmp.pfs.pfsd;
919 	pfs.bytes = sizeof(mrec_tmp.pfs.pfsd);
920 	if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
921 		fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n");
922 		exit(1);
923 	}
924 	if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
925 		fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n");
926 		exit(1);
927 	}
928 
929 	/*
930 	 * sync_beg_tid - lowest TID on source after which a full history
931 	 *	 	  is available.
932 	 *
933 	 * sync_end_tid - highest fully synchronized TID from source.
934 	 */
935 	if (tid_begp && *tid_begp < mrec_tmp.pfs.pfsd.sync_beg_tid)
936 		*tid_begp = mrec_tmp.pfs.pfsd.sync_beg_tid;
937 	if (tid_endp)
938 		*tid_endp = mrec_tmp.pfs.pfsd.sync_end_tid;
939 	mrec_tmp.pfs.version = pfs.version;
940 	write_mrecord(fdout, HAMMER_MREC_TYPE_PFSD,
941 		      &mrec_tmp, sizeof(mrec_tmp.pfs));
942 }
943 
944 /*
945  * Validate the pfs information from the originating filesystem
946  * against the target filesystem.  shared_uuid must match.
947  *
948  * return -1 if we got a TERM record
949  */
950 static int
951 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
952 		     struct hammer_ioc_mrecord_head *pickup,
953 		     hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
954 {
955 	struct hammer_ioc_pseudofs_rw pfs;
956 	struct hammer_pseudofs_data pfsd;
957 	hammer_ioc_mrecord_any_t mrec;
958 	int error;
959 
960 	/*
961 	 * Get the PFSD info from the target filesystem.
962 	 */
963 	bzero(&pfs, sizeof(pfs));
964 	bzero(&pfsd, sizeof(pfsd));
965 	pfs.pfs_id = pfs_id;
966 	pfs.ondisk = &pfsd;
967 	pfs.bytes = sizeof(pfsd);
968 	if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
969 		fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n");
970 		exit(1);
971 	}
972 	if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
973 		fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n");
974 		exit(1);
975 	}
976 
977 	mrec = read_mrecord(fdin, &error, pickup);
978 	if (mrec == NULL) {
979 		if (error == 0)
980 			fprintf(stderr, "validate_mrec_header: short read\n");
981 		exit(1);
982 	}
983 	if (mrec->head.type == HAMMER_MREC_TYPE_TERM) {
984 		free(mrec);
985 		return(-1);
986 	}
987 
988 	if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) {
989 		fprintf(stderr, "validate_mrec_header: did not get expected "
990 				"PFSD record type\n");
991 		exit(1);
992 	}
993 	if (mrec->head.rec_size != sizeof(mrec->pfs)) {
994 		fprintf(stderr, "validate_mrec_header: unexpected payload "
995 				"size\n");
996 		exit(1);
997 	}
998 	if (mrec->pfs.version != pfs.version) {
999 		fprintf(stderr, "validate_mrec_header: Version mismatch\n");
1000 		exit(1);
1001 	}
1002 
1003 	/*
1004 	 * Whew.  Ok, is the read PFS info compatible with the target?
1005 	 */
1006 	if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid,
1007 		 sizeof(pfsd.shared_uuid)) != 0) {
1008 		fprintf(stderr,
1009 			"mirror-write: source and target have "
1010 			"different shared-uuid's!\n");
1011 		exit(1);
1012 	}
1013 	if (is_target &&
1014 	    (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) {
1015 		fprintf(stderr, "mirror-write: target must be in slave mode\n");
1016 		exit(1);
1017 	}
1018 	if (tid_begp)
1019 		*tid_begp = mrec->pfs.pfsd.sync_beg_tid;
1020 	if (tid_endp)
1021 		*tid_endp = mrec->pfs.pfsd.sync_end_tid;
1022 	free(mrec);
1023 	return(0);
1024 }
1025 
1026 static void
1027 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id)
1028 {
1029 	struct hammer_ioc_pseudofs_rw pfs;
1030 	struct hammer_pseudofs_data pfsd;
1031 
1032 	bzero(&pfs, sizeof(pfs));
1033 	bzero(&pfsd, sizeof(pfsd));
1034 	pfs.pfs_id = pfs_id;
1035 	pfs.ondisk = &pfsd;
1036 	pfs.bytes = sizeof(pfsd);
1037 	if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
1038 		perror("update_pfs_snapshot (read)");
1039 		exit(1);
1040 	}
1041 	if (pfsd.sync_end_tid != snapshot_tid) {
1042 		pfsd.sync_end_tid = snapshot_tid;
1043 		if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) {
1044 			perror("update_pfs_snapshot (rewrite)");
1045 			exit(1);
1046 		}
1047 		if (VerboseOpt >= 2) {
1048 			fprintf(stderr,
1049 				"Mirror-write: Completed, updated snapshot "
1050 				"to %016llx\n",
1051 				snapshot_tid);
1052 		}
1053 	}
1054 }
1055 
1056 /*
1057  * Bandwidth-limited write in chunks
1058  */
1059 static
1060 ssize_t
1061 writebw(int fd, const void *buf, size_t nbytes,
1062 	u_int64_t *bwcount, struct timeval *tv1)
1063 {
1064 	struct timeval tv2;
1065 	size_t n;
1066 	ssize_t r;
1067 	ssize_t a;
1068 	int usec;
1069 
1070 	a = 0;
1071 	r = 0;
1072 	while (nbytes) {
1073 		if (*bwcount + nbytes > BandwidthOpt)
1074 			n = BandwidthOpt - *bwcount;
1075 		else
1076 			n = nbytes;
1077 		if (n)
1078 			r = write(fd, buf, n);
1079 		if (r >= 0) {
1080 			a += r;
1081 			nbytes -= r;
1082 			buf = (const char *)buf + r;
1083 		}
1084 		if ((size_t)r != n)
1085 			break;
1086 		*bwcount += n;
1087 		if (*bwcount >= BandwidthOpt) {
1088 			gettimeofday(&tv2, NULL);
1089 			usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 +
1090 				(int)(tv2.tv_usec - tv1->tv_usec);
1091 			if (usec >= 0 && usec < 1000000)
1092 				usleep(1000000 - usec);
1093 			gettimeofday(tv1, NULL);
1094 			*bwcount -= BandwidthOpt;
1095 		}
1096 	}
1097 	return(a ? a : r);
1098 }
1099 
1100 static void
1101 mirror_usage(int code)
1102 {
1103 	fprintf(stderr,
1104 		"hammer mirror-read <filesystem> [begin-tid]\n"
1105 		"hammer mirror-read-stream <filesystem> [begin-tid]\n"
1106 		"hammer mirror-write <filesystem>\n"
1107 		"hammer mirror-dump\n"
1108 		"hammer mirror-copy [[user@]host:]<filesystem>"
1109 				  " [[user@]host:]<filesystem>\n"
1110 		"hammer mirror-stream [[user@]host:]<filesystem>"
1111 				    " [[user@]host:]<filesystem>\n"
1112 	);
1113 	exit(code);
1114 }
1115