1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.16 2008/11/09 05:22:56 dillon Exp $ 35 */ 36 37 #include "hammer.h" 38 39 #define SERIALBUF_SIZE (512 * 1024) 40 41 static int read_mrecords(int fd, char *buf, u_int size, 42 hammer_ioc_mrecord_head_t pickup); 43 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 44 hammer_ioc_mrecord_head_t pickup); 45 static void write_mrecord(int fdout, u_int32_t type, 46 hammer_ioc_mrecord_any_t mrec, int bytes); 47 static void generate_mrec_header(int fd, int fdout, int pfs_id, 48 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 49 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 50 struct hammer_ioc_mrecord_head *pickup, 51 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 52 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 53 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 54 u_int64_t *bwcount, struct timeval *tv1); 55 static int getyn(void); 56 static void mirror_usage(int code); 57 58 /* 59 * Generate a mirroring data stream from the specific source over the 60 * entire key range, but restricted to the specified transaction range. 61 * 62 * The HAMMER VFS does most of the work, we add a few new mrecord 63 * types to negotiate the TID ranges and verify that the entire 64 * stream made it to the destination. 65 */ 66 void 67 hammer_cmd_mirror_read(char **av, int ac, int streaming) 68 { 69 struct hammer_ioc_mirror_rw mirror; 70 struct hammer_ioc_pseudofs_rw pfs; 71 union hammer_ioc_mrecord_any mrec_tmp; 72 struct hammer_ioc_mrecord_head pickup; 73 hammer_ioc_mrecord_any_t mrec; 74 hammer_tid_t sync_tid; 75 const char *filesystem; 76 char *buf = malloc(SERIALBUF_SIZE); 77 int interrupted = 0; 78 int error; 79 int fd; 80 int n; 81 int didwork; 82 int64_t total_bytes; 83 time_t base_t = time(NULL); 84 struct timeval bwtv; 85 u_int64_t bwcount; 86 87 if (ac == 0 || ac > 2) 88 mirror_usage(1); 89 filesystem = av[0]; 90 91 pickup.signature = 0; 92 pickup.type = 0; 93 94 again: 95 bzero(&mirror, sizeof(mirror)); 96 hammer_key_beg_init(&mirror.key_beg); 97 hammer_key_end_init(&mirror.key_end); 98 99 fd = getpfs(&pfs, filesystem); 100 101 if (streaming && VerboseOpt) { 102 fprintf(stderr, "\nRunning"); 103 fflush(stderr); 104 } 105 total_bytes = 0; 106 gettimeofday(&bwtv, NULL); 107 bwcount = 0; 108 109 /* 110 * Send initial header for the purpose of determining shared-uuid. 111 */ 112 generate_mrec_header(fd, 1, pfs.pfs_id, NULL, NULL); 113 114 /* 115 * In 2-way mode the target will send us a PFS info packet 116 * first. Use the target's current snapshot TID as our default 117 * begin TID. 118 */ 119 mirror.tid_beg = 0; 120 if (TwoWayPipeOpt) { 121 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 122 NULL, &mirror.tid_beg); 123 if (n < 0) { /* got TERM record */ 124 relpfs(fd, &pfs); 125 return; 126 } 127 ++mirror.tid_beg; 128 } 129 130 /* 131 * Write out the PFS header, tid_beg will be updated if our PFS 132 * has a larger begin sync. tid_end is set to the latest source 133 * TID whos flush cycle has completed. 134 */ 135 generate_mrec_header(fd, 1, pfs.pfs_id, 136 &mirror.tid_beg, &mirror.tid_end); 137 138 /* XXX streaming mode support w/ cycle or command line arg */ 139 /* 140 * A cycle file overrides the beginning TID 141 */ 142 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 143 144 if (ac == 2) 145 mirror.tid_beg = strtoull(av[1], NULL, 0); 146 147 if (streaming == 0 || VerboseOpt >= 2) { 148 fprintf(stderr, 149 "Mirror-read: Mirror from %016llx to %016llx\n", 150 mirror.tid_beg, mirror.tid_end); 151 } 152 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 153 fprintf(stderr, "Mirror-read: Resuming at object %016llx\n", 154 mirror.key_beg.obj_id); 155 } 156 157 /* 158 * Nothing to do if begin equals end. 159 */ 160 if (mirror.tid_beg >= mirror.tid_end) { 161 if (streaming == 0 || VerboseOpt >= 2) 162 fprintf(stderr, "Mirror-read: No work to do\n"); 163 didwork = 0; 164 goto done; 165 } 166 didwork = 1; 167 168 /* 169 * Write out bulk records 170 */ 171 mirror.ubuf = buf; 172 mirror.size = SERIALBUF_SIZE; 173 174 do { 175 mirror.count = 0; 176 mirror.pfs_id = pfs.pfs_id; 177 mirror.shared_uuid = pfs.ondisk->shared_uuid; 178 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 179 fprintf(stderr, "Mirror-read %s failed: %s\n", 180 filesystem, strerror(errno)); 181 exit(1); 182 } 183 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 184 fprintf(stderr, 185 "Mirror-read %s fatal error %d\n", 186 filesystem, mirror.head.error); 187 exit(1); 188 } 189 if (mirror.count) { 190 if (BandwidthOpt) { 191 n = writebw(1, mirror.ubuf, mirror.count, 192 &bwcount, &bwtv); 193 } else { 194 n = write(1, mirror.ubuf, mirror.count); 195 } 196 if (n != mirror.count) { 197 fprintf(stderr, "Mirror-read %s failed: " 198 "short write\n", 199 filesystem); 200 exit(1); 201 } 202 } 203 total_bytes += mirror.count; 204 if (streaming && VerboseOpt) { 205 fprintf(stderr, "\r%016llx %11lld", 206 mirror.key_cur.obj_id, 207 total_bytes); 208 fflush(stderr); 209 } 210 mirror.key_beg = mirror.key_cur; 211 if (TimeoutOpt && 212 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 213 fprintf(stderr, 214 "Mirror-read %s interrupted by timer at" 215 " %016llx\n", 216 filesystem, 217 mirror.key_cur.obj_id); 218 interrupted = 1; 219 break; 220 } 221 } while (mirror.count != 0); 222 223 done: 224 /* 225 * Write out the termination sync record - only if not interrupted 226 */ 227 if (interrupted == 0) { 228 if (didwork) { 229 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 230 &mrec_tmp, sizeof(mrec_tmp.sync)); 231 } else { 232 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 233 &mrec_tmp, sizeof(mrec_tmp.sync)); 234 } 235 } 236 237 /* 238 * If the -2 option was given (automatic when doing mirror-copy), 239 * a two-way pipe is assumed and we expect a response mrec from 240 * the target. 241 */ 242 if (TwoWayPipeOpt) { 243 mrec = read_mrecord(0, &error, &pickup); 244 if (mrec == NULL || 245 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 246 mrec->head.rec_size != sizeof(mrec->update)) { 247 fprintf(stderr, "mirror_read: Did not get final " 248 "acknowledgement packet from target\n"); 249 exit(1); 250 } 251 if (interrupted) { 252 if (CyclePath) { 253 hammer_set_cycle(&mirror.key_cur, mirror.tid_beg); 254 fprintf(stderr, "Cyclefile %s updated for " 255 "continuation\n", CyclePath); 256 } 257 } else { 258 sync_tid = mrec->update.tid; 259 if (CyclePath) { 260 hammer_key_beg_init(&mirror.key_beg); 261 hammer_set_cycle(&mirror.key_beg, sync_tid); 262 fprintf(stderr, "Cyclefile %s updated to 0x%016llx\n", 263 CyclePath, sync_tid); 264 } 265 } 266 } else if (CyclePath) { 267 /* NOTE! mirror.tid_beg cannot be updated */ 268 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 269 "fully updated unless you use mirror-copy\n"); 270 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 271 } 272 if (streaming && interrupted == 0) { 273 time_t t1 = time(NULL); 274 time_t t2; 275 276 if (VerboseOpt) { 277 fprintf(stderr, " W"); 278 fflush(stderr); 279 } 280 pfs.ondisk->sync_end_tid = mirror.tid_end; 281 if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 282 fprintf(stderr, "Mirror-read %s: cannot stream: %s\n", 283 filesystem, strerror(errno)); 284 } else { 285 t2 = time(NULL) - t1; 286 if (t2 >= 0 && t2 < DelayOpt) { 287 if (VerboseOpt) { 288 fprintf(stderr, "\bD"); 289 fflush(stderr); 290 } 291 sleep(DelayOpt - t2); 292 } 293 if (VerboseOpt) { 294 fprintf(stderr, "\b "); 295 fflush(stderr); 296 } 297 relpfs(fd, &pfs); 298 goto again; 299 } 300 } 301 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 302 &mrec_tmp, sizeof(mrec_tmp.sync)); 303 relpfs(fd, &pfs); 304 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 305 } 306 307 static void 308 create_pfs(const char *filesystem, uuid_t *s_uuid) 309 { 310 fprintf(stderr, "PFS slave %s does not exist.\n" 311 "Do you want to create a new slave PFS? (yes|no) ", 312 filesystem); 313 fflush(stderr); 314 if (getyn() != 1) { 315 fprintf(stderr, "Aborting operation\n"); 316 exit(1); 317 } 318 319 u_int32_t status; 320 char *shared_uuid = NULL; 321 uuid_to_string(s_uuid, &shared_uuid, &status); 322 323 char *cmd = NULL; 324 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 325 filesystem, shared_uuid); 326 free(shared_uuid); 327 328 if (cmd == NULL) { 329 fprintf(stderr, "Failed to alloc memory\n"); 330 exit(1); 331 } 332 if (system(cmd) != 0) { 333 fprintf(stderr, "Failed to create PFS\n"); 334 } 335 free(cmd); 336 } 337 338 /* 339 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 340 * some additional packet types to negotiate TID ranges and to verify 341 * completion. The HAMMER VFS does most of the work. 342 * 343 * It is important to note that the mirror.key_{beg,end} range must 344 * match the ranged used by the original. For now both sides use 345 * range the entire key space. 346 * 347 * It is even more important that the records in the stream conform 348 * to the TID range also supplied in the stream. The HAMMER VFS will 349 * use the REC, PASS, and SKIP record types to track the portions of 350 * the B-Tree being scanned in order to be able to proactively delete 351 * records on the target within those active areas that are not mentioned 352 * by the source. 353 * 354 * The mirror.key_cur field is used by the VFS to do this tracking. It 355 * must be initialized to key_beg but then is persistently updated by 356 * the HAMMER VFS on each successive ioctl() call. If you blow up this 357 * field you will blow up the mirror target, possibly to the point of 358 * deleting everything. As a safety measure the HAMMER VFS simply marks 359 * the records that the source has destroyed as deleted on the target, 360 * and normal pruning operations will deal with their final disposition 361 * at some later time. 362 */ 363 void 364 hammer_cmd_mirror_write(char **av, int ac) 365 { 366 struct hammer_ioc_mirror_rw mirror; 367 const char *filesystem; 368 char *buf = malloc(SERIALBUF_SIZE); 369 struct hammer_ioc_pseudofs_rw pfs; 370 struct hammer_ioc_mrecord_head pickup; 371 struct hammer_ioc_synctid synctid; 372 union hammer_ioc_mrecord_any mrec_tmp; 373 hammer_ioc_mrecord_any_t mrec; 374 struct stat st; 375 int error; 376 int fd; 377 int n; 378 379 if (ac != 1) 380 mirror_usage(1); 381 filesystem = av[0]; 382 383 pickup.signature = 0; 384 pickup.type = 0; 385 386 again: 387 bzero(&mirror, sizeof(mirror)); 388 hammer_key_beg_init(&mirror.key_beg); 389 hammer_key_end_init(&mirror.key_end); 390 mirror.key_end = mirror.key_beg; 391 392 /* 393 * Read initial packet 394 */ 395 mrec = read_mrecord(0, &error, &pickup); 396 if (mrec == NULL) { 397 if (error == 0) 398 fprintf(stderr, "validate_mrec_header: short read\n"); 399 exit(1); 400 } 401 /* 402 * Validate packet 403 */ 404 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 405 return; 406 } 407 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 408 fprintf(stderr, "validate_mrec_header: did not get expected " 409 "PFSD record type\n"); 410 exit(1); 411 } 412 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 413 fprintf(stderr, "validate_mrec_header: unexpected payload " 414 "size\n"); 415 exit(1); 416 } 417 418 /* 419 * Create slave PFS if it doesn't yet exist 420 */ 421 if (lstat(filesystem, &st) != 0) { 422 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 423 } 424 free(mrec); 425 mrec = NULL; 426 427 fd = getpfs(&pfs, filesystem); 428 429 /* 430 * In two-way mode the target writes out a PFS packet first. 431 * The source uses our tid_end as its tid_beg by default, 432 * picking up where it left off. 433 */ 434 mirror.tid_beg = 0; 435 if (TwoWayPipeOpt) { 436 generate_mrec_header(fd, 1, pfs.pfs_id, 437 &mirror.tid_beg, &mirror.tid_end); 438 } 439 440 /* 441 * Read and process the PFS header. The source informs us of 442 * the TID range the stream represents. 443 */ 444 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 445 &mirror.tid_beg, &mirror.tid_end); 446 if (n < 0) { /* got TERM record */ 447 relpfs(fd, &pfs); 448 return; 449 } 450 451 mirror.ubuf = buf; 452 mirror.size = SERIALBUF_SIZE; 453 454 /* 455 * Read and process bulk records (REC, PASS, and SKIP types). 456 * 457 * On your life, do NOT mess with mirror.key_cur or your mirror 458 * target may become history. 459 */ 460 for (;;) { 461 mirror.count = 0; 462 mirror.pfs_id = pfs.pfs_id; 463 mirror.shared_uuid = pfs.ondisk->shared_uuid; 464 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 465 if (mirror.size <= 0) 466 break; 467 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 468 fprintf(stderr, "Mirror-write %s failed: %s\n", 469 filesystem, strerror(errno)); 470 exit(1); 471 } 472 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 473 fprintf(stderr, 474 "Mirror-write %s fatal error %d\n", 475 filesystem, mirror.head.error); 476 exit(1); 477 } 478 #if 0 479 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 480 fprintf(stderr, 481 "Mirror-write %s interrupted by timer at" 482 " %016llx\n", 483 filesystem, 484 mirror.key_cur.obj_id); 485 exit(0); 486 } 487 #endif 488 } 489 490 /* 491 * Read and process the termination sync record. 492 */ 493 mrec = read_mrecord(0, &error, &pickup); 494 495 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 496 fprintf(stderr, "Mirror-write: received termination request\n"); 497 free(mrec); 498 return; 499 } 500 501 if (mrec == NULL || 502 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 503 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 504 mrec->head.rec_size != sizeof(mrec->sync)) { 505 fprintf(stderr, "Mirror-write %s: Did not get termination " 506 "sync record, or rec_size is wrong rt=%d\n", 507 filesystem, mrec->head.type); 508 exit(1); 509 } 510 511 /* 512 * Update the PFS info on the target so the user has visibility 513 * into the new snapshot, and sync the target filesystem. 514 */ 515 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 516 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 517 518 bzero(&synctid, sizeof(synctid)); 519 synctid.op = HAMMER_SYNCTID_SYNC2; 520 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 521 522 if (VerboseOpt >= 2) { 523 fprintf(stderr, "Mirror-write %s: succeeded\n", 524 filesystem); 525 } 526 } 527 528 free(mrec); 529 mrec = NULL; 530 531 /* 532 * Report back to the originator. 533 */ 534 if (TwoWayPipeOpt) { 535 mrec_tmp.update.tid = mirror.tid_end; 536 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 537 &mrec_tmp, sizeof(mrec_tmp.update)); 538 } else { 539 printf("Source can update synctid to 0x%016llx\n", 540 mirror.tid_end); 541 } 542 relpfs(fd, &pfs); 543 goto again; 544 } 545 546 void 547 hammer_cmd_mirror_dump(void) 548 { 549 char *buf = malloc(SERIALBUF_SIZE); 550 struct hammer_ioc_mrecord_head pickup; 551 hammer_ioc_mrecord_any_t mrec; 552 int error; 553 int size; 554 int offset; 555 int bytes; 556 557 /* 558 * Read and process the PFS header 559 */ 560 pickup.signature = 0; 561 pickup.type = 0; 562 563 mrec = read_mrecord(0, &error, &pickup); 564 565 /* 566 * Read and process bulk records 567 */ 568 for (;;) { 569 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 570 if (size <= 0) 571 break; 572 offset = 0; 573 while (offset < size) { 574 mrec = (void *)((char *)buf + offset); 575 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 576 if (offset + bytes > size) { 577 fprintf(stderr, "Misaligned record\n"); 578 exit(1); 579 } 580 581 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 582 case HAMMER_MREC_TYPE_REC_BADCRC: 583 case HAMMER_MREC_TYPE_REC: 584 printf("Record obj=%016llx key=%016llx " 585 "rt=%02x ot=%02x", 586 mrec->rec.leaf.base.obj_id, 587 mrec->rec.leaf.base.key, 588 mrec->rec.leaf.base.rec_type, 589 mrec->rec.leaf.base.obj_type); 590 if (mrec->head.type == 591 HAMMER_MREC_TYPE_REC_BADCRC) { 592 printf(" (BAD CRC)"); 593 } 594 printf("\n"); 595 printf(" tids %016llx:%016llx data=%d\n", 596 mrec->rec.leaf.base.create_tid, 597 mrec->rec.leaf.base.delete_tid, 598 mrec->rec.leaf.data_len); 599 break; 600 case HAMMER_MREC_TYPE_PASS: 601 printf("Pass obj=%016llx key=%016llx " 602 "rt=%02x ot=%02x\n", 603 mrec->rec.leaf.base.obj_id, 604 mrec->rec.leaf.base.key, 605 mrec->rec.leaf.base.rec_type, 606 mrec->rec.leaf.base.obj_type); 607 printf(" tids %016llx:%016llx data=%d\n", 608 mrec->rec.leaf.base.create_tid, 609 mrec->rec.leaf.base.delete_tid, 610 mrec->rec.leaf.data_len); 611 break; 612 case HAMMER_MREC_TYPE_SKIP: 613 printf("Skip obj=%016llx key=%016llx rt=%02x to\n" 614 " obj=%016llx key=%016llx rt=%02x\n", 615 mrec->skip.skip_beg.obj_id, 616 mrec->skip.skip_beg.key, 617 mrec->skip.skip_beg.rec_type, 618 mrec->skip.skip_end.obj_id, 619 mrec->skip.skip_end.key, 620 mrec->skip.skip_end.rec_type); 621 default: 622 break; 623 } 624 offset += bytes; 625 } 626 } 627 628 /* 629 * Read and process the termination sync record. 630 */ 631 mrec = read_mrecord(0, &error, &pickup); 632 if (mrec == NULL || 633 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 634 mrec->head.type != HAMMER_MREC_TYPE_IDLE) 635 ) { 636 fprintf(stderr, "Mirror-dump: Did not get termination " 637 "sync record\n"); 638 } 639 } 640 641 void 642 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 643 { 644 pid_t pid1; 645 pid_t pid2; 646 int fds[2]; 647 const char *xav[16]; 648 char tbuf[16]; 649 char *ptr; 650 int xac; 651 652 if (ac != 2) 653 mirror_usage(1); 654 655 if (pipe(fds) < 0) { 656 perror("pipe"); 657 exit(1); 658 } 659 660 TwoWayPipeOpt = 1; 661 662 /* 663 * Source 664 */ 665 if ((pid1 = fork()) == 0) { 666 dup2(fds[0], 0); 667 dup2(fds[0], 1); 668 close(fds[0]); 669 close(fds[1]); 670 if ((ptr = strchr(av[0], ':')) != NULL) { 671 *ptr++ = 0; 672 xac = 0; 673 xav[xac++] = "ssh"; 674 xav[xac++] = av[0]; 675 xav[xac++] = "hammer"; 676 677 switch(VerboseOpt) { 678 case 0: 679 break; 680 case 1: 681 xav[xac++] = "-v"; 682 break; 683 case 2: 684 xav[xac++] = "-vv"; 685 break; 686 default: 687 xav[xac++] = "-vvv"; 688 break; 689 } 690 xav[xac++] = "-2"; 691 if (TimeoutOpt) { 692 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 693 xav[xac++] = "-t"; 694 xav[xac++] = tbuf; 695 } 696 if (streaming) 697 xav[xac++] = "mirror-read-stream"; 698 else 699 xav[xac++] = "mirror-read"; 700 xav[xac++] = ptr; 701 xav[xac++] = NULL; 702 execv("/usr/bin/ssh", (void *)xav); 703 } else { 704 hammer_cmd_mirror_read(av, 1, streaming); 705 fflush(stdout); 706 fflush(stderr); 707 } 708 _exit(1); 709 } 710 711 /* 712 * Target 713 */ 714 if ((pid2 = fork()) == 0) { 715 dup2(fds[1], 0); 716 dup2(fds[1], 1); 717 close(fds[0]); 718 close(fds[1]); 719 if ((ptr = strchr(av[1], ':')) != NULL) { 720 *ptr++ = 0; 721 xac = 0; 722 xav[xac++] = "ssh"; 723 xav[xac++] = av[1]; 724 xav[xac++] = "hammer"; 725 726 switch(VerboseOpt) { 727 case 0: 728 break; 729 case 1: 730 xav[xac++] = "-v"; 731 break; 732 case 2: 733 xav[xac++] = "-vv"; 734 break; 735 default: 736 xav[xac++] = "-vvv"; 737 break; 738 } 739 740 xav[xac++] = "-2"; 741 xav[xac++] = "mirror-write"; 742 xav[xac++] = ptr; 743 xav[xac++] = NULL; 744 execv("/usr/bin/ssh", (void *)xav); 745 } else { 746 hammer_cmd_mirror_write(av + 1, 1); 747 fflush(stdout); 748 fflush(stderr); 749 } 750 _exit(1); 751 } 752 close(fds[0]); 753 close(fds[1]); 754 755 while (waitpid(pid1, NULL, 0) <= 0) 756 ; 757 while (waitpid(pid2, NULL, 0) <= 0) 758 ; 759 } 760 761 /* 762 * Read and return multiple mrecords 763 */ 764 static int 765 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 766 { 767 hammer_ioc_mrecord_any_t mrec; 768 u_int count; 769 size_t n; 770 size_t i; 771 size_t bytes; 772 int type; 773 774 count = 0; 775 while (size - count >= HAMMER_MREC_HEADSIZE) { 776 /* 777 * Cached the record header in case we run out of buffer 778 * space. 779 */ 780 fflush(stdout); 781 if (pickup->signature == 0) { 782 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 783 i = read(fd, (char *)pickup + n, 784 HAMMER_MREC_HEADSIZE - n); 785 if (i <= 0) 786 break; 787 } 788 if (n == 0) 789 break; 790 if (n != HAMMER_MREC_HEADSIZE) { 791 fprintf(stderr, "read_mrecords: short read on pipe\n"); 792 exit(1); 793 } 794 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 795 fprintf(stderr, "read_mrecords: malformed record on pipe, " 796 "bad signature\n"); 797 exit(1); 798 } 799 } 800 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 801 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 802 fprintf(stderr, "read_mrecords: malformed record on pipe, " 803 "illegal rec_size\n"); 804 exit(1); 805 } 806 807 /* 808 * Stop if we have insufficient space for the record and data. 809 */ 810 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 811 if (size - count < bytes) 812 break; 813 814 /* 815 * Stop if the record type is not a REC, SKIP, or PASS, 816 * which are the only types the ioctl supports. Other types 817 * are used only by the userland protocol. 818 * 819 * Ignore all flags. 820 */ 821 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 822 if (type != HAMMER_MREC_TYPE_PFSD && 823 type != HAMMER_MREC_TYPE_REC && 824 type != HAMMER_MREC_TYPE_SKIP && 825 type != HAMMER_MREC_TYPE_PASS) { 826 break; 827 } 828 829 /* 830 * Read the remainder and clear the pickup signature. 831 */ 832 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 833 i = read(fd, buf + count + n, bytes - n); 834 if (i <= 0) 835 break; 836 } 837 if (n != bytes) { 838 fprintf(stderr, "read_mrecords: short read on pipe\n"); 839 exit(1); 840 } 841 842 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 843 pickup->signature = 0; 844 pickup->type = 0; 845 mrec = (void *)(buf + count); 846 847 /* 848 * Validate the completed record 849 */ 850 if (mrec->head.rec_crc != 851 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 852 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 853 fprintf(stderr, "read_mrecords: malformed record " 854 "on pipe, bad crc\n"); 855 exit(1); 856 } 857 858 /* 859 * If its a B-Tree record validate the data crc. 860 * 861 * NOTE: If the VFS passes us an explicitly errorde mrec 862 * we just pass it through. 863 */ 864 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 865 866 if (type == HAMMER_MREC_TYPE_REC) { 867 if (mrec->head.rec_size < 868 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 869 fprintf(stderr, 870 "read_mrecords: malformed record on " 871 "pipe, illegal element data_len\n"); 872 exit(1); 873 } 874 if (mrec->rec.leaf.data_len && 875 mrec->rec.leaf.data_offset && 876 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 877 fprintf(stderr, 878 "read_mrecords: data_crc did not " 879 "match data! obj=%016llx key=%016llx\n", 880 mrec->rec.leaf.base.obj_id, 881 mrec->rec.leaf.base.key); 882 fprintf(stderr, 883 "continuing, but there are problems\n"); 884 } 885 } 886 count += bytes; 887 } 888 return(count); 889 } 890 891 /* 892 * Read and return a single mrecord. 893 */ 894 static 895 hammer_ioc_mrecord_any_t 896 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 897 { 898 hammer_ioc_mrecord_any_t mrec; 899 struct hammer_ioc_mrecord_head mrechd; 900 size_t bytes; 901 size_t n; 902 size_t i; 903 904 if (pickup && pickup->type != 0) { 905 mrechd = *pickup; 906 pickup->signature = 0; 907 pickup->type = 0; 908 n = HAMMER_MREC_HEADSIZE; 909 } else { 910 /* 911 * Read in the PFSD header from the sender. 912 */ 913 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 914 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 915 if (i <= 0) 916 break; 917 } 918 if (n == 0) { 919 *errorp = 0; /* EOF */ 920 return(NULL); 921 } 922 if (n != HAMMER_MREC_HEADSIZE) { 923 fprintf(stderr, "short read of mrecord header\n"); 924 *errorp = EPIPE; 925 return(NULL); 926 } 927 } 928 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 929 fprintf(stderr, "read_mrecord: bad signature\n"); 930 *errorp = EINVAL; 931 return(NULL); 932 } 933 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 934 assert(bytes >= sizeof(mrechd)); 935 mrec = malloc(bytes); 936 mrec->head = mrechd; 937 938 while (n < bytes) { 939 i = read(fdin, (char *)mrec + n, bytes - n); 940 if (i <= 0) 941 break; 942 n += i; 943 } 944 if (n != bytes) { 945 fprintf(stderr, "read_mrecord: short read on payload\n"); 946 *errorp = EPIPE; 947 return(NULL); 948 } 949 if (mrec->head.rec_crc != 950 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 951 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 952 fprintf(stderr, "read_mrecord: bad CRC\n"); 953 *errorp = EINVAL; 954 return(NULL); 955 } 956 *errorp = 0; 957 return(mrec); 958 } 959 960 static 961 void 962 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec, 963 int bytes) 964 { 965 char zbuf[HAMMER_HEAD_ALIGN]; 966 int pad; 967 968 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 969 970 assert(bytes >= (int)sizeof(mrec->head)); 971 bzero(&mrec->head, sizeof(mrec->head)); 972 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 973 mrec->head.type = type; 974 mrec->head.rec_size = bytes; 975 mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF, 976 bytes - HAMMER_MREC_CRCOFF); 977 if (write(fdout, mrec, bytes) != bytes) { 978 fprintf(stderr, "write_mrecord: error %d (%s)\n", 979 errno, strerror(errno)); 980 exit(1); 981 } 982 if (pad) { 983 bzero(zbuf, pad); 984 if (write(fdout, zbuf, pad) != pad) { 985 fprintf(stderr, "write_mrecord: error %d (%s)\n", 986 errno, strerror(errno)); 987 exit(1); 988 } 989 } 990 } 991 992 /* 993 * Generate a mirroring header with the pfs information of the 994 * originating filesytem. 995 */ 996 static void 997 generate_mrec_header(int fd, int fdout, int pfs_id, 998 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 999 { 1000 struct hammer_ioc_pseudofs_rw pfs; 1001 union hammer_ioc_mrecord_any mrec_tmp; 1002 1003 bzero(&pfs, sizeof(pfs)); 1004 bzero(&mrec_tmp, sizeof(mrec_tmp)); 1005 pfs.pfs_id = pfs_id; 1006 pfs.ondisk = &mrec_tmp.pfs.pfsd; 1007 pfs.bytes = sizeof(mrec_tmp.pfs.pfsd); 1008 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1009 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 1010 exit(1); 1011 } 1012 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1013 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n"); 1014 exit(1); 1015 } 1016 1017 /* 1018 * sync_beg_tid - lowest TID on source after which a full history 1019 * is available. 1020 * 1021 * sync_end_tid - highest fully synchronized TID from source. 1022 */ 1023 if (tid_begp && *tid_begp < mrec_tmp.pfs.pfsd.sync_beg_tid) 1024 *tid_begp = mrec_tmp.pfs.pfsd.sync_beg_tid; 1025 if (tid_endp) 1026 *tid_endp = mrec_tmp.pfs.pfsd.sync_end_tid; 1027 mrec_tmp.pfs.version = pfs.version; 1028 write_mrecord(fdout, HAMMER_MREC_TYPE_PFSD, 1029 &mrec_tmp, sizeof(mrec_tmp.pfs)); 1030 } 1031 1032 /* 1033 * Validate the pfs information from the originating filesystem 1034 * against the target filesystem. shared_uuid must match. 1035 * 1036 * return -1 if we got a TERM record 1037 */ 1038 static int 1039 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1040 struct hammer_ioc_mrecord_head *pickup, 1041 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1042 { 1043 struct hammer_ioc_pseudofs_rw pfs; 1044 struct hammer_pseudofs_data pfsd; 1045 hammer_ioc_mrecord_any_t mrec; 1046 int error; 1047 1048 /* 1049 * Get the PFSD info from the target filesystem. 1050 */ 1051 bzero(&pfs, sizeof(pfs)); 1052 bzero(&pfsd, sizeof(pfsd)); 1053 pfs.pfs_id = pfs_id; 1054 pfs.ondisk = &pfsd; 1055 pfs.bytes = sizeof(pfsd); 1056 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1057 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 1058 exit(1); 1059 } 1060 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1061 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n"); 1062 exit(1); 1063 } 1064 1065 mrec = read_mrecord(fdin, &error, pickup); 1066 if (mrec == NULL) { 1067 if (error == 0) 1068 fprintf(stderr, "validate_mrec_header: short read\n"); 1069 exit(1); 1070 } 1071 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1072 free(mrec); 1073 return(-1); 1074 } 1075 1076 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1077 fprintf(stderr, "validate_mrec_header: did not get expected " 1078 "PFSD record type\n"); 1079 exit(1); 1080 } 1081 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1082 fprintf(stderr, "validate_mrec_header: unexpected payload " 1083 "size\n"); 1084 exit(1); 1085 } 1086 if (mrec->pfs.version != pfs.version) { 1087 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 1088 exit(1); 1089 } 1090 1091 /* 1092 * Whew. Ok, is the read PFS info compatible with the target? 1093 */ 1094 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1095 sizeof(pfsd.shared_uuid)) != 0) { 1096 fprintf(stderr, 1097 "mirror-write: source and target have " 1098 "different shared-uuid's!\n"); 1099 exit(1); 1100 } 1101 if (is_target && 1102 (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) { 1103 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1104 exit(1); 1105 } 1106 if (tid_begp) 1107 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1108 if (tid_endp) 1109 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1110 free(mrec); 1111 return(0); 1112 } 1113 1114 static void 1115 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1116 { 1117 struct hammer_ioc_pseudofs_rw pfs; 1118 struct hammer_pseudofs_data pfsd; 1119 1120 bzero(&pfs, sizeof(pfs)); 1121 bzero(&pfsd, sizeof(pfsd)); 1122 pfs.pfs_id = pfs_id; 1123 pfs.ondisk = &pfsd; 1124 pfs.bytes = sizeof(pfsd); 1125 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1126 perror("update_pfs_snapshot (read)"); 1127 exit(1); 1128 } 1129 if (pfsd.sync_end_tid != snapshot_tid) { 1130 pfsd.sync_end_tid = snapshot_tid; 1131 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1132 perror("update_pfs_snapshot (rewrite)"); 1133 exit(1); 1134 } 1135 if (VerboseOpt >= 2) { 1136 fprintf(stderr, 1137 "Mirror-write: Completed, updated snapshot " 1138 "to %016llx\n", 1139 snapshot_tid); 1140 } 1141 } 1142 } 1143 1144 /* 1145 * Bandwidth-limited write in chunks 1146 */ 1147 static 1148 ssize_t 1149 writebw(int fd, const void *buf, size_t nbytes, 1150 u_int64_t *bwcount, struct timeval *tv1) 1151 { 1152 struct timeval tv2; 1153 size_t n; 1154 ssize_t r; 1155 ssize_t a; 1156 int usec; 1157 1158 a = 0; 1159 r = 0; 1160 while (nbytes) { 1161 if (*bwcount + nbytes > BandwidthOpt) 1162 n = BandwidthOpt - *bwcount; 1163 else 1164 n = nbytes; 1165 if (n) 1166 r = write(fd, buf, n); 1167 if (r >= 0) { 1168 a += r; 1169 nbytes -= r; 1170 buf = (const char *)buf + r; 1171 } 1172 if ((size_t)r != n) 1173 break; 1174 *bwcount += n; 1175 if (*bwcount >= BandwidthOpt) { 1176 gettimeofday(&tv2, NULL); 1177 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1178 (int)(tv2.tv_usec - tv1->tv_usec); 1179 if (usec >= 0 && usec < 1000000) 1180 usleep(1000000 - usec); 1181 gettimeofday(tv1, NULL); 1182 *bwcount -= BandwidthOpt; 1183 } 1184 } 1185 return(a ? a : r); 1186 } 1187 1188 /* 1189 * Get a yes or no answer from the terminal. The program may be run as 1190 * part of a two-way pipe so we cannot use stdin for this operation. 1191 */ 1192 static int 1193 getyn(void) 1194 { 1195 char buf[256]; 1196 FILE *fp; 1197 int result; 1198 1199 fp = fopen("/dev/tty", "r"); 1200 if (fp == NULL) { 1201 fprintf(stderr, "No terminal for response\n"); 1202 return(-1); 1203 } 1204 result = -1; 1205 while (fgets(buf, sizeof(buf), fp) != NULL) { 1206 if (buf[0] == 'y' || buf[0] == 'Y') { 1207 result = 1; 1208 break; 1209 } 1210 if (buf[0] == 'n' || buf[0] == 'N') { 1211 result = 0; 1212 break; 1213 } 1214 fprintf(stderr, "Response not understood\n"); 1215 break; 1216 } 1217 fclose(fp); 1218 return(result); 1219 } 1220 1221 static void 1222 mirror_usage(int code) 1223 { 1224 fprintf(stderr, 1225 "hammer mirror-read <filesystem> [begin-tid]\n" 1226 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1227 "hammer mirror-write <filesystem>\n" 1228 "hammer mirror-dump\n" 1229 "hammer mirror-copy [[user@]host:]<filesystem>" 1230 " [[user@]host:]<filesystem>\n" 1231 "hammer mirror-stream [[user@]host:]<filesystem>" 1232 " [[user@]host:]<filesystem>\n" 1233 ); 1234 exit(code); 1235 } 1236 1237