1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.14 2008/08/21 23:28:43 thomas Exp $ 35 */ 36 37 #include "hammer.h" 38 39 #define SERIALBUF_SIZE (512 * 1024) 40 41 static int read_mrecords(int fd, char *buf, u_int size, 42 hammer_ioc_mrecord_head_t pickup); 43 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 44 hammer_ioc_mrecord_head_t pickup); 45 static void write_mrecord(int fdout, u_int32_t type, 46 hammer_ioc_mrecord_any_t mrec, int bytes); 47 static void generate_mrec_header(int fd, int fdout, int pfs_id, 48 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 49 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 50 struct hammer_ioc_mrecord_head *pickup, 51 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 52 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 53 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 54 u_int64_t *bwcount, struct timeval *tv1); 55 static void mirror_usage(int code); 56 57 /* 58 * Generate a mirroring data stream from the specific source over the 59 * entire key range, but restricted to the specified transaction range. 60 * 61 * The HAMMER VFS does most of the work, we add a few new mrecord 62 * types to negotiate the TID ranges and verify that the entire 63 * stream made it to the destination. 64 */ 65 void 66 hammer_cmd_mirror_read(char **av, int ac, int streaming) 67 { 68 struct hammer_ioc_mirror_rw mirror; 69 struct hammer_ioc_pseudofs_rw pfs; 70 union hammer_ioc_mrecord_any mrec_tmp; 71 struct hammer_ioc_mrecord_head pickup; 72 hammer_ioc_mrecord_any_t mrec; 73 hammer_tid_t sync_tid; 74 const char *filesystem; 75 char *buf = malloc(SERIALBUF_SIZE); 76 int interrupted = 0; 77 int error; 78 int fd; 79 int n; 80 int didwork; 81 int64_t total_bytes; 82 time_t base_t = time(NULL); 83 struct timeval bwtv; 84 u_int64_t bwcount; 85 86 if (ac == 0 || ac > 2) 87 mirror_usage(1); 88 filesystem = av[0]; 89 90 pickup.signature = 0; 91 pickup.type = 0; 92 93 again: 94 bzero(&mirror, sizeof(mirror)); 95 hammer_key_beg_init(&mirror.key_beg); 96 hammer_key_end_init(&mirror.key_end); 97 98 fd = getpfs(&pfs, filesystem); 99 100 if (streaming && VerboseOpt) { 101 fprintf(stderr, "\nRunning"); 102 fflush(stderr); 103 } 104 total_bytes = 0; 105 gettimeofday(&bwtv, NULL); 106 bwcount = 0; 107 108 /* 109 * In 2-way mode the target will send us a PFS info packet 110 * first. Use the target's current snapshot TID as our default 111 * begin TID. 112 */ 113 mirror.tid_beg = 0; 114 if (TwoWayPipeOpt) { 115 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 116 NULL, &mirror.tid_beg); 117 if (n < 0) { /* got TERM record */ 118 relpfs(fd, &pfs); 119 return; 120 } 121 ++mirror.tid_beg; 122 } 123 124 /* 125 * Write out the PFS header, tid_beg will be updated if our PFS 126 * has a larger begin sync. tid_end is set to the latest source 127 * TID whos flush cycle has completed. 128 */ 129 generate_mrec_header(fd, 1, pfs.pfs_id, 130 &mirror.tid_beg, &mirror.tid_end); 131 132 /* XXX streaming mode support w/ cycle or command line arg */ 133 /* 134 * A cycle file overrides the beginning TID 135 */ 136 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 137 138 if (ac == 2) 139 mirror.tid_beg = strtoull(av[1], NULL, 0); 140 141 if (streaming == 0 || VerboseOpt >= 2) { 142 fprintf(stderr, 143 "Mirror-read: Mirror from %016llx to %016llx\n", 144 mirror.tid_beg, mirror.tid_end); 145 } 146 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 147 fprintf(stderr, "Mirror-read: Resuming at object %016llx\n", 148 mirror.key_beg.obj_id); 149 } 150 151 /* 152 * Nothing to do if begin equals end. 153 */ 154 if (mirror.tid_beg >= mirror.tid_end) { 155 if (streaming == 0 || VerboseOpt >= 2) 156 fprintf(stderr, "Mirror-read: No work to do\n"); 157 didwork = 0; 158 goto done; 159 } 160 didwork = 1; 161 162 /* 163 * Write out bulk records 164 */ 165 mirror.ubuf = buf; 166 mirror.size = SERIALBUF_SIZE; 167 168 do { 169 mirror.count = 0; 170 mirror.pfs_id = pfs.pfs_id; 171 mirror.shared_uuid = pfs.ondisk->shared_uuid; 172 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 173 fprintf(stderr, "Mirror-read %s failed: %s\n", 174 filesystem, strerror(errno)); 175 exit(1); 176 } 177 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 178 fprintf(stderr, 179 "Mirror-read %s fatal error %d\n", 180 filesystem, mirror.head.error); 181 exit(1); 182 } 183 if (mirror.count) { 184 if (BandwidthOpt) { 185 n = writebw(1, mirror.ubuf, mirror.count, 186 &bwcount, &bwtv); 187 } else { 188 n = write(1, mirror.ubuf, mirror.count); 189 } 190 if (n != mirror.count) { 191 fprintf(stderr, "Mirror-read %s failed: " 192 "short write\n", 193 filesystem); 194 exit(1); 195 } 196 } 197 total_bytes += mirror.count; 198 if (streaming && VerboseOpt) { 199 fprintf(stderr, "\r%016llx %11lld", 200 mirror.key_cur.obj_id, 201 total_bytes); 202 fflush(stderr); 203 } 204 mirror.key_beg = mirror.key_cur; 205 if (TimeoutOpt && 206 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 207 fprintf(stderr, 208 "Mirror-read %s interrupted by timer at" 209 " %016llx\n", 210 filesystem, 211 mirror.key_cur.obj_id); 212 interrupted = 1; 213 break; 214 } 215 } while (mirror.count != 0); 216 217 done: 218 /* 219 * Write out the termination sync record - only if not interrupted 220 */ 221 if (interrupted == 0) { 222 if (didwork) { 223 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 224 &mrec_tmp, sizeof(mrec_tmp.sync)); 225 } else { 226 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 227 &mrec_tmp, sizeof(mrec_tmp.sync)); 228 } 229 } 230 231 /* 232 * If the -2 option was given (automatic when doing mirror-copy), 233 * a two-way pipe is assumed and we expect a response mrec from 234 * the target. 235 */ 236 if (TwoWayPipeOpt) { 237 mrec = read_mrecord(0, &error, &pickup); 238 if (mrec == NULL || 239 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 240 mrec->head.rec_size != sizeof(mrec->update)) { 241 fprintf(stderr, "mirror_read: Did not get final " 242 "acknowledgement packet from target\n"); 243 exit(1); 244 } 245 if (interrupted) { 246 if (CyclePath) { 247 hammer_set_cycle(&mirror.key_cur, mirror.tid_beg); 248 fprintf(stderr, "Cyclefile %s updated for " 249 "continuation\n", CyclePath); 250 } 251 } else { 252 sync_tid = mrec->update.tid; 253 if (CyclePath) { 254 hammer_key_beg_init(&mirror.key_beg); 255 hammer_set_cycle(&mirror.key_beg, sync_tid); 256 fprintf(stderr, "Cyclefile %s updated to 0x%016llx\n", 257 CyclePath, sync_tid); 258 } 259 } 260 } else if (CyclePath) { 261 /* NOTE! mirror.tid_beg cannot be updated */ 262 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 263 "fully updated unless you use mirror-copy\n"); 264 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 265 } 266 if (streaming && interrupted == 0) { 267 time_t t1 = time(NULL); 268 time_t t2; 269 270 if (VerboseOpt) { 271 fprintf(stderr, " W"); 272 fflush(stderr); 273 } 274 pfs.ondisk->sync_end_tid = mirror.tid_end; 275 if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 276 fprintf(stderr, "Mirror-read %s: cannot stream: %s\n", 277 filesystem, strerror(errno)); 278 } else { 279 t2 = time(NULL) - t1; 280 if (t2 >= 0 && t2 < DelayOpt) { 281 if (VerboseOpt) { 282 fprintf(stderr, "\bD"); 283 fflush(stderr); 284 } 285 sleep(DelayOpt - t2); 286 } 287 if (VerboseOpt) { 288 fprintf(stderr, "\b "); 289 fflush(stderr); 290 } 291 relpfs(fd, &pfs); 292 goto again; 293 } 294 } 295 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 296 &mrec_tmp, sizeof(mrec_tmp.sync)); 297 relpfs(fd, &pfs); 298 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 299 } 300 301 /* 302 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 303 * some additional packet types to negotiate TID ranges and to verify 304 * completion. The HAMMER VFS does most of the work. 305 * 306 * It is important to note that the mirror.key_{beg,end} range must 307 * match the ranged used by the original. For now both sides use 308 * range the entire key space. 309 * 310 * It is even more important that the records in the stream conform 311 * to the TID range also supplied in the stream. The HAMMER VFS will 312 * use the REC, PASS, and SKIP record types to track the portions of 313 * the B-Tree being scanned in order to be able to proactively delete 314 * records on the target within those active areas that are not mentioned 315 * by the source. 316 * 317 * The mirror.key_cur field is used by the VFS to do this tracking. It 318 * must be initialized to key_beg but then is persistently updated by 319 * the HAMMER VFS on each successive ioctl() call. If you blow up this 320 * field you will blow up the mirror target, possibly to the point of 321 * deleting everything. As a safety measure the HAMMER VFS simply marks 322 * the records that the source has destroyed as deleted on the target, 323 * and normal pruning operations will deal with their final disposition 324 * at some later time. 325 */ 326 void 327 hammer_cmd_mirror_write(char **av, int ac) 328 { 329 struct hammer_ioc_mirror_rw mirror; 330 const char *filesystem; 331 char *buf = malloc(SERIALBUF_SIZE); 332 struct hammer_ioc_pseudofs_rw pfs; 333 struct hammer_ioc_mrecord_head pickup; 334 struct hammer_ioc_synctid synctid; 335 union hammer_ioc_mrecord_any mrec_tmp; 336 hammer_ioc_mrecord_any_t mrec; 337 int error; 338 int fd; 339 int n; 340 341 if (ac != 1) 342 mirror_usage(1); 343 filesystem = av[0]; 344 345 pickup.signature = 0; 346 pickup.type = 0; 347 348 again: 349 bzero(&mirror, sizeof(mirror)); 350 hammer_key_beg_init(&mirror.key_beg); 351 hammer_key_end_init(&mirror.key_end); 352 mirror.key_end = mirror.key_beg; 353 354 fd = getpfs(&pfs, filesystem); 355 356 /* 357 * In two-way mode the target writes out a PFS packet first. 358 * The source uses our tid_end as its tid_beg by default, 359 * picking up where it left off. 360 */ 361 mirror.tid_beg = 0; 362 if (TwoWayPipeOpt) { 363 generate_mrec_header(fd, 1, pfs.pfs_id, 364 &mirror.tid_beg, &mirror.tid_end); 365 } 366 367 /* 368 * Read and process the PFS header. The source informs us of 369 * the TID range the stream represents. 370 */ 371 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 372 &mirror.tid_beg, &mirror.tid_end); 373 if (n < 0) { /* got TERM record */ 374 relpfs(fd, &pfs); 375 return; 376 } 377 378 mirror.ubuf = buf; 379 mirror.size = SERIALBUF_SIZE; 380 381 /* 382 * Read and process bulk records (REC, PASS, and SKIP types). 383 * 384 * On your life, do NOT mess with mirror.key_cur or your mirror 385 * target may become history. 386 */ 387 for (;;) { 388 mirror.count = 0; 389 mirror.pfs_id = pfs.pfs_id; 390 mirror.shared_uuid = pfs.ondisk->shared_uuid; 391 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 392 if (mirror.size <= 0) 393 break; 394 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 395 fprintf(stderr, "Mirror-write %s failed: %s\n", 396 filesystem, strerror(errno)); 397 exit(1); 398 } 399 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 400 fprintf(stderr, 401 "Mirror-write %s fatal error %d\n", 402 filesystem, mirror.head.error); 403 exit(1); 404 } 405 #if 0 406 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 407 fprintf(stderr, 408 "Mirror-write %s interrupted by timer at" 409 " %016llx\n", 410 filesystem, 411 mirror.key_cur.obj_id); 412 exit(0); 413 } 414 #endif 415 } 416 417 /* 418 * Read and process the termination sync record. 419 */ 420 mrec = read_mrecord(0, &error, &pickup); 421 422 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 423 fprintf(stderr, "Mirror-write: received termination request\n"); 424 free(mrec); 425 return; 426 } 427 428 if (mrec == NULL || 429 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 430 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 431 mrec->head.rec_size != sizeof(mrec->sync)) { 432 fprintf(stderr, "Mirror-write %s: Did not get termination " 433 "sync record, or rec_size is wrong rt=%d\n", 434 filesystem, mrec->head.type); 435 exit(1); 436 } 437 438 /* 439 * Update the PFS info on the target so the user has visibility 440 * into the new snapshot, and sync the target filesystem. 441 */ 442 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 443 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 444 445 bzero(&synctid, sizeof(synctid)); 446 synctid.op = HAMMER_SYNCTID_SYNC2; 447 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 448 449 if (VerboseOpt >= 2) { 450 fprintf(stderr, "Mirror-write %s: succeeded\n", 451 filesystem); 452 } 453 } 454 455 free(mrec); 456 mrec = NULL; 457 458 /* 459 * Report back to the originator. 460 */ 461 if (TwoWayPipeOpt) { 462 mrec_tmp.update.tid = mirror.tid_end; 463 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 464 &mrec_tmp, sizeof(mrec_tmp.update)); 465 } else { 466 printf("Source can update synctid to 0x%016llx\n", 467 mirror.tid_end); 468 } 469 relpfs(fd, &pfs); 470 goto again; 471 } 472 473 void 474 hammer_cmd_mirror_dump(void) 475 { 476 char *buf = malloc(SERIALBUF_SIZE); 477 struct hammer_ioc_mrecord_head pickup; 478 hammer_ioc_mrecord_any_t mrec; 479 int error; 480 int size; 481 int offset; 482 int bytes; 483 484 /* 485 * Read and process the PFS header 486 */ 487 pickup.signature = 0; 488 pickup.type = 0; 489 490 mrec = read_mrecord(0, &error, &pickup); 491 492 /* 493 * Read and process bulk records 494 */ 495 for (;;) { 496 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 497 if (size <= 0) 498 break; 499 offset = 0; 500 while (offset < size) { 501 mrec = (void *)((char *)buf + offset); 502 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 503 if (offset + bytes > size) { 504 fprintf(stderr, "Misaligned record\n"); 505 exit(1); 506 } 507 508 switch(mrec->head.type) { 509 case HAMMER_MREC_TYPE_REC: 510 printf("Record obj=%016llx key=%016llx " 511 "rt=%02x ot=%02x\n", 512 mrec->rec.leaf.base.obj_id, 513 mrec->rec.leaf.base.key, 514 mrec->rec.leaf.base.rec_type, 515 mrec->rec.leaf.base.obj_type); 516 printf(" tids %016llx:%016llx data=%d\n", 517 mrec->rec.leaf.base.create_tid, 518 mrec->rec.leaf.base.delete_tid, 519 mrec->rec.leaf.data_len); 520 break; 521 case HAMMER_MREC_TYPE_PASS: 522 printf("Pass obj=%016llx key=%016llx " 523 "rt=%02x ot=%02x\n", 524 mrec->rec.leaf.base.obj_id, 525 mrec->rec.leaf.base.key, 526 mrec->rec.leaf.base.rec_type, 527 mrec->rec.leaf.base.obj_type); 528 printf(" tids %016llx:%016llx data=%d\n", 529 mrec->rec.leaf.base.create_tid, 530 mrec->rec.leaf.base.delete_tid, 531 mrec->rec.leaf.data_len); 532 break; 533 case HAMMER_MREC_TYPE_SKIP: 534 printf("Skip obj=%016llx key=%016llx rt=%02x to\n" 535 " obj=%016llx key=%016llx rt=%02x\n", 536 mrec->skip.skip_beg.obj_id, 537 mrec->skip.skip_beg.key, 538 mrec->skip.skip_beg.rec_type, 539 mrec->skip.skip_end.obj_id, 540 mrec->skip.skip_end.key, 541 mrec->skip.skip_end.rec_type); 542 default: 543 break; 544 } 545 offset += bytes; 546 } 547 } 548 549 /* 550 * Read and process the termination sync record. 551 */ 552 mrec = read_mrecord(0, &error, &pickup); 553 if (mrec == NULL || 554 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 555 mrec->head.type != HAMMER_MREC_TYPE_IDLE) 556 ) { 557 fprintf(stderr, "Mirror-dump: Did not get termination " 558 "sync record\n"); 559 } 560 } 561 562 void 563 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 564 { 565 pid_t pid1; 566 pid_t pid2; 567 int fds[2]; 568 const char *xav[16]; 569 char tbuf[16]; 570 char *ptr; 571 int xac; 572 573 if (ac != 2) 574 mirror_usage(1); 575 576 if (pipe(fds) < 0) { 577 perror("pipe"); 578 exit(1); 579 } 580 581 TwoWayPipeOpt = 1; 582 583 /* 584 * Source 585 */ 586 if ((pid1 = fork()) == 0) { 587 dup2(fds[0], 0); 588 dup2(fds[0], 1); 589 close(fds[0]); 590 close(fds[1]); 591 if ((ptr = strchr(av[0], ':')) != NULL) { 592 *ptr++ = 0; 593 xac = 0; 594 xav[xac++] = "ssh"; 595 xav[xac++] = av[0]; 596 xav[xac++] = "hammer"; 597 598 switch(VerboseOpt) { 599 case 0: 600 break; 601 case 1: 602 xav[xac++] = "-v"; 603 break; 604 case 2: 605 xav[xac++] = "-vv"; 606 break; 607 default: 608 xav[xac++] = "-vvv"; 609 break; 610 } 611 xav[xac++] = "-2"; 612 if (TimeoutOpt) { 613 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 614 xav[xac++] = "-t"; 615 xav[xac++] = tbuf; 616 } 617 if (streaming) 618 xav[xac++] = "mirror-read-streaming"; 619 else 620 xav[xac++] = "mirror-read"; 621 xav[xac++] = ptr; 622 xav[xac++] = NULL; 623 execv("/usr/bin/ssh", (void *)xav); 624 } else { 625 hammer_cmd_mirror_read(av, 1, streaming); 626 fflush(stdout); 627 fflush(stderr); 628 } 629 _exit(1); 630 } 631 632 /* 633 * Target 634 */ 635 if ((pid2 = fork()) == 0) { 636 dup2(fds[1], 0); 637 dup2(fds[1], 1); 638 close(fds[0]); 639 close(fds[1]); 640 if ((ptr = strchr(av[1], ':')) != NULL) { 641 *ptr++ = 0; 642 xac = 0; 643 xav[xac++] = "ssh"; 644 xav[xac++] = av[1]; 645 xav[xac++] = "hammer"; 646 647 switch(VerboseOpt) { 648 case 0: 649 break; 650 case 1: 651 xav[xac++] = "-v"; 652 break; 653 case 2: 654 xav[xac++] = "-vv"; 655 break; 656 default: 657 xav[xac++] = "-vvv"; 658 break; 659 } 660 661 xav[xac++] = "-2"; 662 xav[xac++] = "mirror-write"; 663 xav[xac++] = ptr; 664 xav[xac++] = NULL; 665 execv("/usr/bin/ssh", (void *)xav); 666 } else { 667 hammer_cmd_mirror_write(av + 1, 1); 668 fflush(stdout); 669 fflush(stderr); 670 } 671 _exit(1); 672 } 673 close(fds[0]); 674 close(fds[1]); 675 676 while (waitpid(pid1, NULL, 0) <= 0) 677 ; 678 while (waitpid(pid2, NULL, 0) <= 0) 679 ; 680 } 681 682 /* 683 * Read and return multiple mrecords 684 */ 685 static int 686 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 687 { 688 hammer_ioc_mrecord_any_t mrec; 689 u_int count; 690 size_t n; 691 size_t i; 692 size_t bytes; 693 694 count = 0; 695 while (size - count >= HAMMER_MREC_HEADSIZE) { 696 /* 697 * Cached the record header in case we run out of buffer 698 * space. 699 */ 700 fflush(stdout); 701 if (pickup->signature == 0) { 702 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 703 i = read(fd, (char *)pickup + n, 704 HAMMER_MREC_HEADSIZE - n); 705 if (i <= 0) 706 break; 707 } 708 if (n == 0) 709 break; 710 if (n != HAMMER_MREC_HEADSIZE) { 711 fprintf(stderr, "read_mrecords: short read on pipe\n"); 712 exit(1); 713 } 714 715 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 716 fprintf(stderr, "read_mrecords: malformed record on pipe, " 717 "bad signature\n"); 718 exit(1); 719 } 720 } 721 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 722 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 723 fprintf(stderr, "read_mrecords: malformed record on pipe, " 724 "illegal rec_size\n"); 725 exit(1); 726 } 727 728 /* 729 * Stop if we have insufficient space for the record and data. 730 */ 731 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 732 if (size - count < bytes) 733 break; 734 735 /* 736 * Stop if the record type is not a REC or a SKIP (the only 737 * two types the ioctl supports. Other types are used only 738 * by the userland protocol). 739 */ 740 if (pickup->type != HAMMER_MREC_TYPE_REC && 741 pickup->type != HAMMER_MREC_TYPE_SKIP && 742 pickup->type != HAMMER_MREC_TYPE_PASS) { 743 break; 744 } 745 746 /* 747 * Read the remainder and clear the pickup signature. 748 */ 749 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 750 i = read(fd, buf + count + n, bytes - n); 751 if (i <= 0) 752 break; 753 } 754 if (n != bytes) { 755 fprintf(stderr, "read_mrecords: short read on pipe\n"); 756 exit(1); 757 } 758 759 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 760 pickup->signature = 0; 761 pickup->type = 0; 762 mrec = (void *)(buf + count); 763 764 /* 765 * Validate the completed record 766 */ 767 if (mrec->head.rec_crc != 768 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 769 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 770 fprintf(stderr, "read_mrecords: malformed record " 771 "on pipe, bad crc\n"); 772 exit(1); 773 } 774 775 /* 776 * If its a B-Tree record validate the data crc 777 */ 778 if (mrec->head.type == HAMMER_MREC_TYPE_REC) { 779 if (mrec->head.rec_size < 780 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 781 fprintf(stderr, 782 "read_mrecords: malformed record on " 783 "pipe, illegal element data_len\n"); 784 exit(1); 785 } 786 if (mrec->rec.leaf.data_len && 787 mrec->rec.leaf.data_offset && 788 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 789 fprintf(stderr, 790 "read_mrecords: data_crc did not " 791 "match data! obj=%016llx key=%016llx\n", 792 mrec->rec.leaf.base.obj_id, 793 mrec->rec.leaf.base.key); 794 fprintf(stderr, 795 "continuing, but there are problems\n"); 796 } 797 } 798 count += bytes; 799 } 800 return(count); 801 } 802 803 /* 804 * Read and return a single mrecord. 805 */ 806 static 807 hammer_ioc_mrecord_any_t 808 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 809 { 810 hammer_ioc_mrecord_any_t mrec; 811 struct hammer_ioc_mrecord_head mrechd; 812 size_t bytes; 813 size_t n; 814 size_t i; 815 816 if (pickup && pickup->type != 0) { 817 mrechd = *pickup; 818 pickup->signature = 0; 819 pickup->type = 0; 820 n = HAMMER_MREC_HEADSIZE; 821 } else { 822 /* 823 * Read in the PFSD header from the sender. 824 */ 825 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 826 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 827 if (i <= 0) 828 break; 829 } 830 if (n == 0) { 831 *errorp = 0; /* EOF */ 832 return(NULL); 833 } 834 if (n != HAMMER_MREC_HEADSIZE) { 835 fprintf(stderr, "short read of mrecord header\n"); 836 *errorp = EPIPE; 837 return(NULL); 838 } 839 } 840 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 841 fprintf(stderr, "read_mrecord: bad signature\n"); 842 *errorp = EINVAL; 843 return(NULL); 844 } 845 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 846 assert(bytes >= sizeof(mrechd)); 847 mrec = malloc(bytes); 848 mrec->head = mrechd; 849 850 while (n < bytes) { 851 i = read(fdin, (char *)mrec + n, bytes - n); 852 if (i <= 0) 853 break; 854 n += i; 855 } 856 if (n != bytes) { 857 fprintf(stderr, "read_mrecord: short read on payload\n"); 858 *errorp = EPIPE; 859 return(NULL); 860 } 861 if (mrec->head.rec_crc != 862 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 863 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 864 fprintf(stderr, "read_mrecord: bad CRC\n"); 865 *errorp = EINVAL; 866 return(NULL); 867 } 868 *errorp = 0; 869 return(mrec); 870 } 871 872 static 873 void 874 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec, 875 int bytes) 876 { 877 char zbuf[HAMMER_HEAD_ALIGN]; 878 int pad; 879 880 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 881 882 assert(bytes >= (int)sizeof(mrec->head)); 883 bzero(&mrec->head, sizeof(mrec->head)); 884 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 885 mrec->head.type = type; 886 mrec->head.rec_size = bytes; 887 mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF, 888 bytes - HAMMER_MREC_CRCOFF); 889 if (write(fdout, mrec, bytes) != bytes) { 890 fprintf(stderr, "write_mrecord: error %d (%s)\n", 891 errno, strerror(errno)); 892 exit(1); 893 } 894 if (pad) { 895 bzero(zbuf, pad); 896 if (write(fdout, zbuf, pad) != pad) { 897 fprintf(stderr, "write_mrecord: error %d (%s)\n", 898 errno, strerror(errno)); 899 exit(1); 900 } 901 } 902 } 903 904 /* 905 * Generate a mirroring header with the pfs information of the 906 * originating filesytem. 907 */ 908 static void 909 generate_mrec_header(int fd, int fdout, int pfs_id, 910 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 911 { 912 struct hammer_ioc_pseudofs_rw pfs; 913 union hammer_ioc_mrecord_any mrec_tmp; 914 915 bzero(&pfs, sizeof(pfs)); 916 bzero(&mrec_tmp, sizeof(mrec_tmp)); 917 pfs.pfs_id = pfs_id; 918 pfs.ondisk = &mrec_tmp.pfs.pfsd; 919 pfs.bytes = sizeof(mrec_tmp.pfs.pfsd); 920 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 921 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 922 exit(1); 923 } 924 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 925 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n"); 926 exit(1); 927 } 928 929 /* 930 * sync_beg_tid - lowest TID on source after which a full history 931 * is available. 932 * 933 * sync_end_tid - highest fully synchronized TID from source. 934 */ 935 if (tid_begp && *tid_begp < mrec_tmp.pfs.pfsd.sync_beg_tid) 936 *tid_begp = mrec_tmp.pfs.pfsd.sync_beg_tid; 937 if (tid_endp) 938 *tid_endp = mrec_tmp.pfs.pfsd.sync_end_tid; 939 mrec_tmp.pfs.version = pfs.version; 940 write_mrecord(fdout, HAMMER_MREC_TYPE_PFSD, 941 &mrec_tmp, sizeof(mrec_tmp.pfs)); 942 } 943 944 /* 945 * Validate the pfs information from the originating filesystem 946 * against the target filesystem. shared_uuid must match. 947 * 948 * return -1 if we got a TERM record 949 */ 950 static int 951 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 952 struct hammer_ioc_mrecord_head *pickup, 953 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 954 { 955 struct hammer_ioc_pseudofs_rw pfs; 956 struct hammer_pseudofs_data pfsd; 957 hammer_ioc_mrecord_any_t mrec; 958 int error; 959 960 /* 961 * Get the PFSD info from the target filesystem. 962 */ 963 bzero(&pfs, sizeof(pfs)); 964 bzero(&pfsd, sizeof(pfsd)); 965 pfs.pfs_id = pfs_id; 966 pfs.ondisk = &pfsd; 967 pfs.bytes = sizeof(pfsd); 968 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 969 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 970 exit(1); 971 } 972 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 973 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n"); 974 exit(1); 975 } 976 977 mrec = read_mrecord(fdin, &error, pickup); 978 if (mrec == NULL) { 979 if (error == 0) 980 fprintf(stderr, "validate_mrec_header: short read\n"); 981 exit(1); 982 } 983 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 984 free(mrec); 985 return(-1); 986 } 987 988 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 989 fprintf(stderr, "validate_mrec_header: did not get expected " 990 "PFSD record type\n"); 991 exit(1); 992 } 993 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 994 fprintf(stderr, "validate_mrec_header: unexpected payload " 995 "size\n"); 996 exit(1); 997 } 998 if (mrec->pfs.version != pfs.version) { 999 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 1000 exit(1); 1001 } 1002 1003 /* 1004 * Whew. Ok, is the read PFS info compatible with the target? 1005 */ 1006 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1007 sizeof(pfsd.shared_uuid)) != 0) { 1008 fprintf(stderr, 1009 "mirror-write: source and target have " 1010 "different shared-uuid's!\n"); 1011 exit(1); 1012 } 1013 if (is_target && 1014 (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) { 1015 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1016 exit(1); 1017 } 1018 if (tid_begp) 1019 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1020 if (tid_endp) 1021 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1022 free(mrec); 1023 return(0); 1024 } 1025 1026 static void 1027 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1028 { 1029 struct hammer_ioc_pseudofs_rw pfs; 1030 struct hammer_pseudofs_data pfsd; 1031 1032 bzero(&pfs, sizeof(pfs)); 1033 bzero(&pfsd, sizeof(pfsd)); 1034 pfs.pfs_id = pfs_id; 1035 pfs.ondisk = &pfsd; 1036 pfs.bytes = sizeof(pfsd); 1037 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1038 perror("update_pfs_snapshot (read)"); 1039 exit(1); 1040 } 1041 if (pfsd.sync_end_tid != snapshot_tid) { 1042 pfsd.sync_end_tid = snapshot_tid; 1043 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1044 perror("update_pfs_snapshot (rewrite)"); 1045 exit(1); 1046 } 1047 if (VerboseOpt >= 2) { 1048 fprintf(stderr, 1049 "Mirror-write: Completed, updated snapshot " 1050 "to %016llx\n", 1051 snapshot_tid); 1052 } 1053 } 1054 } 1055 1056 /* 1057 * Bandwidth-limited write in chunks 1058 */ 1059 static 1060 ssize_t 1061 writebw(int fd, const void *buf, size_t nbytes, 1062 u_int64_t *bwcount, struct timeval *tv1) 1063 { 1064 struct timeval tv2; 1065 size_t n; 1066 ssize_t r; 1067 ssize_t a; 1068 int usec; 1069 1070 a = 0; 1071 r = 0; 1072 while (nbytes) { 1073 if (*bwcount + nbytes > BandwidthOpt) 1074 n = BandwidthOpt - *bwcount; 1075 else 1076 n = nbytes; 1077 if (n) 1078 r = write(fd, buf, n); 1079 if (r >= 0) { 1080 a += r; 1081 nbytes -= r; 1082 buf = (const char *)buf + r; 1083 } 1084 if ((size_t)r != n) 1085 break; 1086 *bwcount += n; 1087 if (*bwcount >= BandwidthOpt) { 1088 gettimeofday(&tv2, NULL); 1089 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1090 (int)(tv2.tv_usec - tv1->tv_usec); 1091 if (usec >= 0 && usec < 1000000) 1092 usleep(1000000 - usec); 1093 gettimeofday(tv1, NULL); 1094 *bwcount -= BandwidthOpt; 1095 } 1096 } 1097 return(a ? a : r); 1098 } 1099 1100 static void 1101 mirror_usage(int code) 1102 { 1103 fprintf(stderr, 1104 "hammer mirror-read <filesystem> [begin-tid]\n" 1105 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1106 "hammer mirror-write <filesystem>\n" 1107 "hammer mirror-dump\n" 1108 "hammer mirror-copy [[user@]host:]<filesystem>" 1109 " [[user@]host:]<filesystem>\n" 1110 "hammer mirror-stream [[user@]host:]<filesystem>" 1111 " [[user@]host:]<filesystem>\n" 1112 ); 1113 exit(code); 1114 } 1115