1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.16 2008/11/09 05:22:56 dillon Exp $ 35 */ 36 37 #include "hammer.h" 38 39 #define SERIALBUF_SIZE (512 * 1024) 40 41 static int read_mrecords(int fd, char *buf, u_int size, 42 hammer_ioc_mrecord_head_t pickup); 43 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 44 hammer_ioc_mrecord_head_t pickup); 45 static void write_mrecord(int fdout, u_int32_t type, 46 hammer_ioc_mrecord_any_t mrec, int bytes); 47 static void generate_mrec_header(int fd, int fdout, int pfs_id, 48 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 49 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 50 struct hammer_ioc_mrecord_head *pickup, 51 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 52 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 53 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 54 u_int64_t *bwcount, struct timeval *tv1); 55 static int getyn(void); 56 static void mirror_usage(int code); 57 58 /* 59 * Generate a mirroring data stream from the specific source over the 60 * entire key range, but restricted to the specified transaction range. 61 * 62 * The HAMMER VFS does most of the work, we add a few new mrecord 63 * types to negotiate the TID ranges and verify that the entire 64 * stream made it to the destination. 65 */ 66 void 67 hammer_cmd_mirror_read(char **av, int ac, int streaming) 68 { 69 struct hammer_ioc_mirror_rw mirror; 70 struct hammer_ioc_pseudofs_rw pfs; 71 union hammer_ioc_mrecord_any mrec_tmp; 72 struct hammer_ioc_mrecord_head pickup; 73 hammer_ioc_mrecord_any_t mrec; 74 hammer_tid_t sync_tid; 75 const char *filesystem; 76 char *buf = malloc(SERIALBUF_SIZE); 77 int interrupted = 0; 78 int error; 79 int fd; 80 int n; 81 int didwork; 82 int64_t total_bytes; 83 time_t base_t = time(NULL); 84 struct timeval bwtv; 85 u_int64_t bwcount; 86 87 if (ac == 0 || ac > 2) 88 mirror_usage(1); 89 filesystem = av[0]; 90 91 pickup.signature = 0; 92 pickup.type = 0; 93 94 again: 95 bzero(&mirror, sizeof(mirror)); 96 hammer_key_beg_init(&mirror.key_beg); 97 hammer_key_end_init(&mirror.key_end); 98 99 fd = getpfs(&pfs, filesystem); 100 101 if (streaming && VerboseOpt) { 102 fprintf(stderr, "\nRunning"); 103 fflush(stderr); 104 } 105 total_bytes = 0; 106 gettimeofday(&bwtv, NULL); 107 bwcount = 0; 108 109 /* 110 * Send initial header for the purpose of determining shared-uuid. 111 */ 112 generate_mrec_header(fd, 1, pfs.pfs_id, NULL, NULL); 113 114 /* 115 * In 2-way mode the target will send us a PFS info packet 116 * first. Use the target's current snapshot TID as our default 117 * begin TID. 118 */ 119 mirror.tid_beg = 0; 120 if (TwoWayPipeOpt) { 121 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 122 NULL, &mirror.tid_beg); 123 if (n < 0) { /* got TERM record */ 124 relpfs(fd, &pfs); 125 return; 126 } 127 ++mirror.tid_beg; 128 } 129 130 /* 131 * Write out the PFS header, tid_beg will be updated if our PFS 132 * has a larger begin sync. tid_end is set to the latest source 133 * TID whos flush cycle has completed. 134 */ 135 generate_mrec_header(fd, 1, pfs.pfs_id, 136 &mirror.tid_beg, &mirror.tid_end); 137 138 /* XXX streaming mode support w/ cycle or command line arg */ 139 /* 140 * A cycle file overrides the beginning TID 141 */ 142 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 143 144 if (ac == 2) 145 mirror.tid_beg = strtoull(av[1], NULL, 0); 146 147 if (streaming == 0 || VerboseOpt >= 2) { 148 fprintf(stderr, 149 "Mirror-read: Mirror from %016llx to %016llx\n", 150 mirror.tid_beg, mirror.tid_end); 151 } 152 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 153 fprintf(stderr, "Mirror-read: Resuming at object %016llx\n", 154 mirror.key_beg.obj_id); 155 } 156 157 /* 158 * Nothing to do if begin equals end. 159 */ 160 if (mirror.tid_beg >= mirror.tid_end) { 161 if (streaming == 0 || VerboseOpt >= 2) 162 fprintf(stderr, "Mirror-read: No work to do\n"); 163 didwork = 0; 164 goto done; 165 } 166 didwork = 1; 167 168 /* 169 * Write out bulk records 170 */ 171 mirror.ubuf = buf; 172 mirror.size = SERIALBUF_SIZE; 173 174 do { 175 mirror.count = 0; 176 mirror.pfs_id = pfs.pfs_id; 177 mirror.shared_uuid = pfs.ondisk->shared_uuid; 178 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 179 fprintf(stderr, "Mirror-read %s failed: %s\n", 180 filesystem, strerror(errno)); 181 exit(1); 182 } 183 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 184 fprintf(stderr, 185 "Mirror-read %s fatal error %d\n", 186 filesystem, mirror.head.error); 187 exit(1); 188 } 189 if (mirror.count) { 190 if (BandwidthOpt) { 191 n = writebw(1, mirror.ubuf, mirror.count, 192 &bwcount, &bwtv); 193 } else { 194 n = write(1, mirror.ubuf, mirror.count); 195 } 196 if (n != mirror.count) { 197 fprintf(stderr, "Mirror-read %s failed: " 198 "short write\n", 199 filesystem); 200 exit(1); 201 } 202 } 203 total_bytes += mirror.count; 204 if (streaming && VerboseOpt) { 205 fprintf(stderr, "\r%016llx %11lld", 206 mirror.key_cur.obj_id, 207 total_bytes); 208 fflush(stderr); 209 } 210 mirror.key_beg = mirror.key_cur; 211 if (TimeoutOpt && 212 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 213 fprintf(stderr, 214 "Mirror-read %s interrupted by timer at" 215 " %016llx\n", 216 filesystem, 217 mirror.key_cur.obj_id); 218 interrupted = 1; 219 break; 220 } 221 } while (mirror.count != 0); 222 223 done: 224 /* 225 * Write out the termination sync record - only if not interrupted 226 */ 227 if (interrupted == 0) { 228 if (didwork) { 229 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 230 &mrec_tmp, sizeof(mrec_tmp.sync)); 231 } else { 232 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 233 &mrec_tmp, sizeof(mrec_tmp.sync)); 234 } 235 } 236 237 /* 238 * If the -2 option was given (automatic when doing mirror-copy), 239 * a two-way pipe is assumed and we expect a response mrec from 240 * the target. 241 */ 242 if (TwoWayPipeOpt) { 243 mrec = read_mrecord(0, &error, &pickup); 244 if (mrec == NULL || 245 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 246 mrec->head.rec_size != sizeof(mrec->update)) { 247 fprintf(stderr, "mirror_read: Did not get final " 248 "acknowledgement packet from target\n"); 249 exit(1); 250 } 251 if (interrupted) { 252 if (CyclePath) { 253 hammer_set_cycle(&mirror.key_cur, mirror.tid_beg); 254 fprintf(stderr, "Cyclefile %s updated for " 255 "continuation\n", CyclePath); 256 } 257 } else { 258 sync_tid = mrec->update.tid; 259 if (CyclePath) { 260 hammer_key_beg_init(&mirror.key_beg); 261 hammer_set_cycle(&mirror.key_beg, sync_tid); 262 fprintf(stderr, "Cyclefile %s updated to 0x%016llx\n", 263 CyclePath, sync_tid); 264 } 265 } 266 } else if (CyclePath) { 267 /* NOTE! mirror.tid_beg cannot be updated */ 268 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 269 "fully updated unless you use mirror-copy\n"); 270 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 271 } 272 if (streaming && interrupted == 0) { 273 time_t t1 = time(NULL); 274 time_t t2; 275 276 if (VerboseOpt) { 277 fprintf(stderr, " W"); 278 fflush(stderr); 279 } 280 pfs.ondisk->sync_end_tid = mirror.tid_end; 281 if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 282 fprintf(stderr, "Mirror-read %s: cannot stream: %s\n", 283 filesystem, strerror(errno)); 284 } else { 285 t2 = time(NULL) - t1; 286 if (t2 >= 0 && t2 < DelayOpt) { 287 if (VerboseOpt) { 288 fprintf(stderr, "\bD"); 289 fflush(stderr); 290 } 291 sleep(DelayOpt - t2); 292 } 293 if (VerboseOpt) { 294 fprintf(stderr, "\b "); 295 fflush(stderr); 296 } 297 relpfs(fd, &pfs); 298 goto again; 299 } 300 } 301 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 302 &mrec_tmp, sizeof(mrec_tmp.sync)); 303 relpfs(fd, &pfs); 304 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 305 } 306 307 static void 308 create_pfs(const char *filesystem, uuid_t *s_uuid) 309 { 310 fprintf(stderr, "PFS slave %s does not exist.\n" 311 "Do you want to create a new slave PFS? (yes|no) ", 312 filesystem); 313 fflush(stderr); 314 if (getyn() != 1) { 315 fprintf(stderr, "Aborting operation\n"); 316 exit(1); 317 } 318 319 u_int32_t status; 320 char *shared_uuid = NULL; 321 uuid_to_string(s_uuid, &shared_uuid, &status); 322 323 char *cmd = NULL; 324 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 325 filesystem, shared_uuid); 326 free(shared_uuid); 327 328 if (cmd == NULL) { 329 fprintf(stderr, "Failed to alloc memory\n"); 330 exit(1); 331 } 332 if (system(cmd) != 0) { 333 fprintf(stderr, "Failed to create PFS\n"); 334 } 335 free(cmd); 336 } 337 338 /* 339 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 340 * some additional packet types to negotiate TID ranges and to verify 341 * completion. The HAMMER VFS does most of the work. 342 * 343 * It is important to note that the mirror.key_{beg,end} range must 344 * match the ranged used by the original. For now both sides use 345 * range the entire key space. 346 * 347 * It is even more important that the records in the stream conform 348 * to the TID range also supplied in the stream. The HAMMER VFS will 349 * use the REC, PASS, and SKIP record types to track the portions of 350 * the B-Tree being scanned in order to be able to proactively delete 351 * records on the target within those active areas that are not mentioned 352 * by the source. 353 * 354 * The mirror.key_cur field is used by the VFS to do this tracking. It 355 * must be initialized to key_beg but then is persistently updated by 356 * the HAMMER VFS on each successive ioctl() call. If you blow up this 357 * field you will blow up the mirror target, possibly to the point of 358 * deleting everything. As a safety measure the HAMMER VFS simply marks 359 * the records that the source has destroyed as deleted on the target, 360 * and normal pruning operations will deal with their final disposition 361 * at some later time. 362 */ 363 void 364 hammer_cmd_mirror_write(char **av, int ac) 365 { 366 struct hammer_ioc_mirror_rw mirror; 367 const char *filesystem; 368 char *buf = malloc(SERIALBUF_SIZE); 369 struct hammer_ioc_pseudofs_rw pfs; 370 struct hammer_ioc_mrecord_head pickup; 371 struct hammer_ioc_synctid synctid; 372 union hammer_ioc_mrecord_any mrec_tmp; 373 hammer_ioc_mrecord_any_t mrec; 374 struct stat st; 375 int error; 376 int fd; 377 int n; 378 379 if (ac != 1) 380 mirror_usage(1); 381 filesystem = av[0]; 382 383 pickup.signature = 0; 384 pickup.type = 0; 385 386 again: 387 bzero(&mirror, sizeof(mirror)); 388 hammer_key_beg_init(&mirror.key_beg); 389 hammer_key_end_init(&mirror.key_end); 390 mirror.key_end = mirror.key_beg; 391 392 /* 393 * Read initial packet 394 */ 395 mrec = read_mrecord(0, &error, &pickup); 396 if (mrec == NULL) { 397 if (error == 0) 398 fprintf(stderr, "validate_mrec_header: short read\n"); 399 exit(1); 400 } 401 /* 402 * Validate packet 403 */ 404 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 405 return; 406 } 407 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 408 fprintf(stderr, "validate_mrec_header: did not get expected " 409 "PFSD record type\n"); 410 exit(1); 411 } 412 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 413 fprintf(stderr, "validate_mrec_header: unexpected payload " 414 "size\n"); 415 exit(1); 416 } 417 418 /* 419 * Create slave PFS if it doesn't yet exist 420 */ 421 if (lstat(filesystem, &st) != 0) { 422 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 423 } 424 free(mrec); 425 mrec = NULL; 426 427 fd = getpfs(&pfs, filesystem); 428 429 /* 430 * In two-way mode the target writes out a PFS packet first. 431 * The source uses our tid_end as its tid_beg by default, 432 * picking up where it left off. 433 */ 434 mirror.tid_beg = 0; 435 if (TwoWayPipeOpt) { 436 generate_mrec_header(fd, 1, pfs.pfs_id, 437 &mirror.tid_beg, &mirror.tid_end); 438 } 439 440 /* 441 * Read and process the PFS header. The source informs us of 442 * the TID range the stream represents. 443 */ 444 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 445 &mirror.tid_beg, &mirror.tid_end); 446 if (n < 0) { /* got TERM record */ 447 relpfs(fd, &pfs); 448 return; 449 } 450 451 mirror.ubuf = buf; 452 mirror.size = SERIALBUF_SIZE; 453 454 /* 455 * Read and process bulk records (REC, PASS, and SKIP types). 456 * 457 * On your life, do NOT mess with mirror.key_cur or your mirror 458 * target may become history. 459 */ 460 for (;;) { 461 mirror.count = 0; 462 mirror.pfs_id = pfs.pfs_id; 463 mirror.shared_uuid = pfs.ondisk->shared_uuid; 464 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 465 if (mirror.size <= 0) 466 break; 467 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 468 fprintf(stderr, "Mirror-write %s failed: %s\n", 469 filesystem, strerror(errno)); 470 exit(1); 471 } 472 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 473 fprintf(stderr, 474 "Mirror-write %s fatal error %d\n", 475 filesystem, mirror.head.error); 476 exit(1); 477 } 478 #if 0 479 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 480 fprintf(stderr, 481 "Mirror-write %s interrupted by timer at" 482 " %016llx\n", 483 filesystem, 484 mirror.key_cur.obj_id); 485 exit(0); 486 } 487 #endif 488 } 489 490 /* 491 * Read and process the termination sync record. 492 */ 493 mrec = read_mrecord(0, &error, &pickup); 494 495 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 496 fprintf(stderr, "Mirror-write: received termination request\n"); 497 free(mrec); 498 return; 499 } 500 501 if (mrec == NULL || 502 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 503 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 504 mrec->head.rec_size != sizeof(mrec->sync)) { 505 fprintf(stderr, "Mirror-write %s: Did not get termination " 506 "sync record, or rec_size is wrong rt=%d\n", 507 filesystem, mrec->head.type); 508 exit(1); 509 } 510 511 /* 512 * Update the PFS info on the target so the user has visibility 513 * into the new snapshot, and sync the target filesystem. 514 */ 515 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 516 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 517 518 bzero(&synctid, sizeof(synctid)); 519 synctid.op = HAMMER_SYNCTID_SYNC2; 520 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 521 522 if (VerboseOpt >= 2) { 523 fprintf(stderr, "Mirror-write %s: succeeded\n", 524 filesystem); 525 } 526 } 527 528 free(mrec); 529 mrec = NULL; 530 531 /* 532 * Report back to the originator. 533 */ 534 if (TwoWayPipeOpt) { 535 mrec_tmp.update.tid = mirror.tid_end; 536 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 537 &mrec_tmp, sizeof(mrec_tmp.update)); 538 } else { 539 printf("Source can update synctid to 0x%016llx\n", 540 mirror.tid_end); 541 } 542 relpfs(fd, &pfs); 543 goto again; 544 } 545 546 void 547 hammer_cmd_mirror_dump(void) 548 { 549 char *buf = malloc(SERIALBUF_SIZE); 550 struct hammer_ioc_mrecord_head pickup; 551 hammer_ioc_mrecord_any_t mrec; 552 int error; 553 int size; 554 int offset; 555 int bytes; 556 557 /* 558 * Read and process the PFS header 559 */ 560 pickup.signature = 0; 561 pickup.type = 0; 562 563 mrec = read_mrecord(0, &error, &pickup); 564 565 /* 566 * Read and process bulk records 567 */ 568 for (;;) { 569 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 570 if (size <= 0) 571 break; 572 offset = 0; 573 while (offset < size) { 574 mrec = (void *)((char *)buf + offset); 575 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 576 if (offset + bytes > size) { 577 fprintf(stderr, "Misaligned record\n"); 578 exit(1); 579 } 580 581 switch(mrec->head.type) { 582 case HAMMER_MREC_TYPE_REC: 583 printf("Record obj=%016llx key=%016llx " 584 "rt=%02x ot=%02x\n", 585 mrec->rec.leaf.base.obj_id, 586 mrec->rec.leaf.base.key, 587 mrec->rec.leaf.base.rec_type, 588 mrec->rec.leaf.base.obj_type); 589 printf(" tids %016llx:%016llx data=%d\n", 590 mrec->rec.leaf.base.create_tid, 591 mrec->rec.leaf.base.delete_tid, 592 mrec->rec.leaf.data_len); 593 break; 594 case HAMMER_MREC_TYPE_PASS: 595 printf("Pass obj=%016llx key=%016llx " 596 "rt=%02x ot=%02x\n", 597 mrec->rec.leaf.base.obj_id, 598 mrec->rec.leaf.base.key, 599 mrec->rec.leaf.base.rec_type, 600 mrec->rec.leaf.base.obj_type); 601 printf(" tids %016llx:%016llx data=%d\n", 602 mrec->rec.leaf.base.create_tid, 603 mrec->rec.leaf.base.delete_tid, 604 mrec->rec.leaf.data_len); 605 break; 606 case HAMMER_MREC_TYPE_SKIP: 607 printf("Skip obj=%016llx key=%016llx rt=%02x to\n" 608 " obj=%016llx key=%016llx rt=%02x\n", 609 mrec->skip.skip_beg.obj_id, 610 mrec->skip.skip_beg.key, 611 mrec->skip.skip_beg.rec_type, 612 mrec->skip.skip_end.obj_id, 613 mrec->skip.skip_end.key, 614 mrec->skip.skip_end.rec_type); 615 default: 616 break; 617 } 618 offset += bytes; 619 } 620 } 621 622 /* 623 * Read and process the termination sync record. 624 */ 625 mrec = read_mrecord(0, &error, &pickup); 626 if (mrec == NULL || 627 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 628 mrec->head.type != HAMMER_MREC_TYPE_IDLE) 629 ) { 630 fprintf(stderr, "Mirror-dump: Did not get termination " 631 "sync record\n"); 632 } 633 } 634 635 void 636 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 637 { 638 pid_t pid1; 639 pid_t pid2; 640 int fds[2]; 641 const char *xav[16]; 642 char tbuf[16]; 643 char *ptr; 644 int xac; 645 646 if (ac != 2) 647 mirror_usage(1); 648 649 if (pipe(fds) < 0) { 650 perror("pipe"); 651 exit(1); 652 } 653 654 TwoWayPipeOpt = 1; 655 656 /* 657 * Source 658 */ 659 if ((pid1 = fork()) == 0) { 660 dup2(fds[0], 0); 661 dup2(fds[0], 1); 662 close(fds[0]); 663 close(fds[1]); 664 if ((ptr = strchr(av[0], ':')) != NULL) { 665 *ptr++ = 0; 666 xac = 0; 667 xav[xac++] = "ssh"; 668 xav[xac++] = av[0]; 669 xav[xac++] = "hammer"; 670 671 switch(VerboseOpt) { 672 case 0: 673 break; 674 case 1: 675 xav[xac++] = "-v"; 676 break; 677 case 2: 678 xav[xac++] = "-vv"; 679 break; 680 default: 681 xav[xac++] = "-vvv"; 682 break; 683 } 684 xav[xac++] = "-2"; 685 if (TimeoutOpt) { 686 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 687 xav[xac++] = "-t"; 688 xav[xac++] = tbuf; 689 } 690 if (streaming) 691 xav[xac++] = "mirror-read-stream"; 692 else 693 xav[xac++] = "mirror-read"; 694 xav[xac++] = ptr; 695 xav[xac++] = NULL; 696 execv("/usr/bin/ssh", (void *)xav); 697 } else { 698 hammer_cmd_mirror_read(av, 1, streaming); 699 fflush(stdout); 700 fflush(stderr); 701 } 702 _exit(1); 703 } 704 705 /* 706 * Target 707 */ 708 if ((pid2 = fork()) == 0) { 709 dup2(fds[1], 0); 710 dup2(fds[1], 1); 711 close(fds[0]); 712 close(fds[1]); 713 if ((ptr = strchr(av[1], ':')) != NULL) { 714 *ptr++ = 0; 715 xac = 0; 716 xav[xac++] = "ssh"; 717 xav[xac++] = av[1]; 718 xav[xac++] = "hammer"; 719 720 switch(VerboseOpt) { 721 case 0: 722 break; 723 case 1: 724 xav[xac++] = "-v"; 725 break; 726 case 2: 727 xav[xac++] = "-vv"; 728 break; 729 default: 730 xav[xac++] = "-vvv"; 731 break; 732 } 733 734 xav[xac++] = "-2"; 735 xav[xac++] = "mirror-write"; 736 xav[xac++] = ptr; 737 xav[xac++] = NULL; 738 execv("/usr/bin/ssh", (void *)xav); 739 } else { 740 hammer_cmd_mirror_write(av + 1, 1); 741 fflush(stdout); 742 fflush(stderr); 743 } 744 _exit(1); 745 } 746 close(fds[0]); 747 close(fds[1]); 748 749 while (waitpid(pid1, NULL, 0) <= 0) 750 ; 751 while (waitpid(pid2, NULL, 0) <= 0) 752 ; 753 } 754 755 /* 756 * Read and return multiple mrecords 757 */ 758 static int 759 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 760 { 761 hammer_ioc_mrecord_any_t mrec; 762 u_int count; 763 size_t n; 764 size_t i; 765 size_t bytes; 766 767 count = 0; 768 while (size - count >= HAMMER_MREC_HEADSIZE) { 769 /* 770 * Cached the record header in case we run out of buffer 771 * space. 772 */ 773 fflush(stdout); 774 if (pickup->signature == 0) { 775 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 776 i = read(fd, (char *)pickup + n, 777 HAMMER_MREC_HEADSIZE - n); 778 if (i <= 0) 779 break; 780 } 781 if (n == 0) 782 break; 783 if (n != HAMMER_MREC_HEADSIZE) { 784 fprintf(stderr, "read_mrecords: short read on pipe\n"); 785 exit(1); 786 } 787 788 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 789 fprintf(stderr, "read_mrecords: malformed record on pipe, " 790 "bad signature\n"); 791 exit(1); 792 } 793 } 794 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 795 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 796 fprintf(stderr, "read_mrecords: malformed record on pipe, " 797 "illegal rec_size\n"); 798 exit(1); 799 } 800 801 /* 802 * Stop if we have insufficient space for the record and data. 803 */ 804 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 805 if (size - count < bytes) 806 break; 807 808 /* 809 * Stop if the record type is not a REC or a SKIP (the only 810 * two types the ioctl supports. Other types are used only 811 * by the userland protocol). 812 */ 813 if (pickup->type != HAMMER_MREC_TYPE_REC && 814 pickup->type != HAMMER_MREC_TYPE_SKIP && 815 pickup->type != HAMMER_MREC_TYPE_PASS) { 816 break; 817 } 818 819 /* 820 * Read the remainder and clear the pickup signature. 821 */ 822 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 823 i = read(fd, buf + count + n, bytes - n); 824 if (i <= 0) 825 break; 826 } 827 if (n != bytes) { 828 fprintf(stderr, "read_mrecords: short read on pipe\n"); 829 exit(1); 830 } 831 832 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 833 pickup->signature = 0; 834 pickup->type = 0; 835 mrec = (void *)(buf + count); 836 837 /* 838 * Validate the completed record 839 */ 840 if (mrec->head.rec_crc != 841 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 842 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 843 fprintf(stderr, "read_mrecords: malformed record " 844 "on pipe, bad crc\n"); 845 exit(1); 846 } 847 848 /* 849 * If its a B-Tree record validate the data crc 850 */ 851 if (mrec->head.type == HAMMER_MREC_TYPE_REC) { 852 if (mrec->head.rec_size < 853 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 854 fprintf(stderr, 855 "read_mrecords: malformed record on " 856 "pipe, illegal element data_len\n"); 857 exit(1); 858 } 859 if (mrec->rec.leaf.data_len && 860 mrec->rec.leaf.data_offset && 861 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 862 fprintf(stderr, 863 "read_mrecords: data_crc did not " 864 "match data! obj=%016llx key=%016llx\n", 865 mrec->rec.leaf.base.obj_id, 866 mrec->rec.leaf.base.key); 867 fprintf(stderr, 868 "continuing, but there are problems\n"); 869 } 870 } 871 count += bytes; 872 } 873 return(count); 874 } 875 876 /* 877 * Read and return a single mrecord. 878 */ 879 static 880 hammer_ioc_mrecord_any_t 881 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 882 { 883 hammer_ioc_mrecord_any_t mrec; 884 struct hammer_ioc_mrecord_head mrechd; 885 size_t bytes; 886 size_t n; 887 size_t i; 888 889 if (pickup && pickup->type != 0) { 890 mrechd = *pickup; 891 pickup->signature = 0; 892 pickup->type = 0; 893 n = HAMMER_MREC_HEADSIZE; 894 } else { 895 /* 896 * Read in the PFSD header from the sender. 897 */ 898 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 899 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 900 if (i <= 0) 901 break; 902 } 903 if (n == 0) { 904 *errorp = 0; /* EOF */ 905 return(NULL); 906 } 907 if (n != HAMMER_MREC_HEADSIZE) { 908 fprintf(stderr, "short read of mrecord header\n"); 909 *errorp = EPIPE; 910 return(NULL); 911 } 912 } 913 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 914 fprintf(stderr, "read_mrecord: bad signature\n"); 915 *errorp = EINVAL; 916 return(NULL); 917 } 918 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 919 assert(bytes >= sizeof(mrechd)); 920 mrec = malloc(bytes); 921 mrec->head = mrechd; 922 923 while (n < bytes) { 924 i = read(fdin, (char *)mrec + n, bytes - n); 925 if (i <= 0) 926 break; 927 n += i; 928 } 929 if (n != bytes) { 930 fprintf(stderr, "read_mrecord: short read on payload\n"); 931 *errorp = EPIPE; 932 return(NULL); 933 } 934 if (mrec->head.rec_crc != 935 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 936 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 937 fprintf(stderr, "read_mrecord: bad CRC\n"); 938 *errorp = EINVAL; 939 return(NULL); 940 } 941 *errorp = 0; 942 return(mrec); 943 } 944 945 static 946 void 947 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec, 948 int bytes) 949 { 950 char zbuf[HAMMER_HEAD_ALIGN]; 951 int pad; 952 953 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 954 955 assert(bytes >= (int)sizeof(mrec->head)); 956 bzero(&mrec->head, sizeof(mrec->head)); 957 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 958 mrec->head.type = type; 959 mrec->head.rec_size = bytes; 960 mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF, 961 bytes - HAMMER_MREC_CRCOFF); 962 if (write(fdout, mrec, bytes) != bytes) { 963 fprintf(stderr, "write_mrecord: error %d (%s)\n", 964 errno, strerror(errno)); 965 exit(1); 966 } 967 if (pad) { 968 bzero(zbuf, pad); 969 if (write(fdout, zbuf, pad) != pad) { 970 fprintf(stderr, "write_mrecord: error %d (%s)\n", 971 errno, strerror(errno)); 972 exit(1); 973 } 974 } 975 } 976 977 /* 978 * Generate a mirroring header with the pfs information of the 979 * originating filesytem. 980 */ 981 static void 982 generate_mrec_header(int fd, int fdout, int pfs_id, 983 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 984 { 985 struct hammer_ioc_pseudofs_rw pfs; 986 union hammer_ioc_mrecord_any mrec_tmp; 987 988 bzero(&pfs, sizeof(pfs)); 989 bzero(&mrec_tmp, sizeof(mrec_tmp)); 990 pfs.pfs_id = pfs_id; 991 pfs.ondisk = &mrec_tmp.pfs.pfsd; 992 pfs.bytes = sizeof(mrec_tmp.pfs.pfsd); 993 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 994 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 995 exit(1); 996 } 997 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 998 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n"); 999 exit(1); 1000 } 1001 1002 /* 1003 * sync_beg_tid - lowest TID on source after which a full history 1004 * is available. 1005 * 1006 * sync_end_tid - highest fully synchronized TID from source. 1007 */ 1008 if (tid_begp && *tid_begp < mrec_tmp.pfs.pfsd.sync_beg_tid) 1009 *tid_begp = mrec_tmp.pfs.pfsd.sync_beg_tid; 1010 if (tid_endp) 1011 *tid_endp = mrec_tmp.pfs.pfsd.sync_end_tid; 1012 mrec_tmp.pfs.version = pfs.version; 1013 write_mrecord(fdout, HAMMER_MREC_TYPE_PFSD, 1014 &mrec_tmp, sizeof(mrec_tmp.pfs)); 1015 } 1016 1017 /* 1018 * Validate the pfs information from the originating filesystem 1019 * against the target filesystem. shared_uuid must match. 1020 * 1021 * return -1 if we got a TERM record 1022 */ 1023 static int 1024 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1025 struct hammer_ioc_mrecord_head *pickup, 1026 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1027 { 1028 struct hammer_ioc_pseudofs_rw pfs; 1029 struct hammer_pseudofs_data pfsd; 1030 hammer_ioc_mrecord_any_t mrec; 1031 int error; 1032 1033 /* 1034 * Get the PFSD info from the target filesystem. 1035 */ 1036 bzero(&pfs, sizeof(pfs)); 1037 bzero(&pfsd, sizeof(pfsd)); 1038 pfs.pfs_id = pfs_id; 1039 pfs.ondisk = &pfsd; 1040 pfs.bytes = sizeof(pfsd); 1041 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1042 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 1043 exit(1); 1044 } 1045 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1046 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n"); 1047 exit(1); 1048 } 1049 1050 mrec = read_mrecord(fdin, &error, pickup); 1051 if (mrec == NULL) { 1052 if (error == 0) 1053 fprintf(stderr, "validate_mrec_header: short read\n"); 1054 exit(1); 1055 } 1056 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1057 free(mrec); 1058 return(-1); 1059 } 1060 1061 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1062 fprintf(stderr, "validate_mrec_header: did not get expected " 1063 "PFSD record type\n"); 1064 exit(1); 1065 } 1066 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1067 fprintf(stderr, "validate_mrec_header: unexpected payload " 1068 "size\n"); 1069 exit(1); 1070 } 1071 if (mrec->pfs.version != pfs.version) { 1072 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 1073 exit(1); 1074 } 1075 1076 /* 1077 * Whew. Ok, is the read PFS info compatible with the target? 1078 */ 1079 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1080 sizeof(pfsd.shared_uuid)) != 0) { 1081 fprintf(stderr, 1082 "mirror-write: source and target have " 1083 "different shared-uuid's!\n"); 1084 exit(1); 1085 } 1086 if (is_target && 1087 (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) { 1088 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1089 exit(1); 1090 } 1091 if (tid_begp) 1092 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1093 if (tid_endp) 1094 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1095 free(mrec); 1096 return(0); 1097 } 1098 1099 static void 1100 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1101 { 1102 struct hammer_ioc_pseudofs_rw pfs; 1103 struct hammer_pseudofs_data pfsd; 1104 1105 bzero(&pfs, sizeof(pfs)); 1106 bzero(&pfsd, sizeof(pfsd)); 1107 pfs.pfs_id = pfs_id; 1108 pfs.ondisk = &pfsd; 1109 pfs.bytes = sizeof(pfsd); 1110 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1111 perror("update_pfs_snapshot (read)"); 1112 exit(1); 1113 } 1114 if (pfsd.sync_end_tid != snapshot_tid) { 1115 pfsd.sync_end_tid = snapshot_tid; 1116 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1117 perror("update_pfs_snapshot (rewrite)"); 1118 exit(1); 1119 } 1120 if (VerboseOpt >= 2) { 1121 fprintf(stderr, 1122 "Mirror-write: Completed, updated snapshot " 1123 "to %016llx\n", 1124 snapshot_tid); 1125 } 1126 } 1127 } 1128 1129 /* 1130 * Bandwidth-limited write in chunks 1131 */ 1132 static 1133 ssize_t 1134 writebw(int fd, const void *buf, size_t nbytes, 1135 u_int64_t *bwcount, struct timeval *tv1) 1136 { 1137 struct timeval tv2; 1138 size_t n; 1139 ssize_t r; 1140 ssize_t a; 1141 int usec; 1142 1143 a = 0; 1144 r = 0; 1145 while (nbytes) { 1146 if (*bwcount + nbytes > BandwidthOpt) 1147 n = BandwidthOpt - *bwcount; 1148 else 1149 n = nbytes; 1150 if (n) 1151 r = write(fd, buf, n); 1152 if (r >= 0) { 1153 a += r; 1154 nbytes -= r; 1155 buf = (const char *)buf + r; 1156 } 1157 if ((size_t)r != n) 1158 break; 1159 *bwcount += n; 1160 if (*bwcount >= BandwidthOpt) { 1161 gettimeofday(&tv2, NULL); 1162 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1163 (int)(tv2.tv_usec - tv1->tv_usec); 1164 if (usec >= 0 && usec < 1000000) 1165 usleep(1000000 - usec); 1166 gettimeofday(tv1, NULL); 1167 *bwcount -= BandwidthOpt; 1168 } 1169 } 1170 return(a ? a : r); 1171 } 1172 1173 /* 1174 * Get a yes or no answer from the terminal. The program may be run as 1175 * part of a two-way pipe so we cannot use stdin for this operation. 1176 */ 1177 static int 1178 getyn(void) 1179 { 1180 char buf[256]; 1181 FILE *fp; 1182 int result; 1183 1184 fp = fopen("/dev/tty", "r"); 1185 if (fp == NULL) { 1186 fprintf(stderr, "No terminal for response\n"); 1187 return(-1); 1188 } 1189 result = -1; 1190 while (fgets(buf, sizeof(buf), fp) != NULL) { 1191 if (buf[0] == 'y' || buf[0] == 'Y') { 1192 result = 1; 1193 break; 1194 } 1195 if (buf[0] == 'n' || buf[0] == 'N') { 1196 result = 0; 1197 break; 1198 } 1199 fprintf(stderr, "Response not understood\n"); 1200 break; 1201 } 1202 fclose(fp); 1203 return(result); 1204 } 1205 1206 static void 1207 mirror_usage(int code) 1208 { 1209 fprintf(stderr, 1210 "hammer mirror-read <filesystem> [begin-tid]\n" 1211 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1212 "hammer mirror-write <filesystem>\n" 1213 "hammer mirror-dump\n" 1214 "hammer mirror-copy [[user@]host:]<filesystem>" 1215 " [[user@]host:]<filesystem>\n" 1216 "hammer mirror-stream [[user@]host:]<filesystem>" 1217 " [[user@]host:]<filesystem>\n" 1218 ); 1219 exit(code); 1220 } 1221 1222