1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 35 #include "hammer.h" 36 37 #define LINE1 0,20 38 #define LINE2 20,78 39 #define LINE3 90,70 40 41 #define SERIALBUF_SIZE (512 * 1024) 42 43 typedef struct histogram { 44 hammer_tid_t tid; 45 uint64_t bytes; 46 } *histogram_t; 47 48 static int read_mrecords(int fd, char *buf, u_int size, 49 hammer_ioc_mrecord_head_t pickup); 50 static int generate_histogram(int fd, const char *filesystem, 51 histogram_t *histogram_ary, 52 struct hammer_ioc_mirror_rw *mirror_base, 53 int *repeatp); 54 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 55 hammer_ioc_mrecord_head_t pickup); 56 static void write_mrecord(int fdout, uint32_t type, 57 hammer_ioc_mrecord_any_t mrec, int bytes); 58 static void generate_mrec_header(int fd, int pfs_id, 59 union hammer_ioc_mrecord_any *mrec_tmp); 60 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 61 struct hammer_ioc_mrecord_head *pickup, 62 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 63 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 64 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 65 uint64_t *bwcount, struct timeval *tv1); 66 static int getyn(void); 67 static void mirror_usage(int code); 68 69 /* 70 * Generate a mirroring data stream from the specific source over the 71 * entire key range, but restricted to the specified transaction range. 72 * 73 * The HAMMER VFS does most of the work, we add a few new mrecord 74 * types to negotiate the TID ranges and verify that the entire 75 * stream made it to the destination. 76 * 77 * streaming will be 0 for mirror-read, 1 for mirror-stream. The code will 78 * set up a fake value of -1 when running the histogram for mirror-read. 79 */ 80 void 81 hammer_cmd_mirror_read(char **av, int ac, int streaming) 82 { 83 struct hammer_ioc_mirror_rw mirror; 84 struct hammer_ioc_pseudofs_rw pfs; 85 union hammer_ioc_mrecord_any mrec_tmp; 86 struct hammer_ioc_mrecord_head pickup; 87 hammer_ioc_mrecord_any_t mrec; 88 hammer_tid_t sync_tid; 89 histogram_t histogram_ary; 90 char *filesystem; 91 char *buf = malloc(SERIALBUF_SIZE); 92 int interrupted = 0; 93 int error; 94 int fd; 95 int n; 96 int didwork; 97 int histogram; 98 int histindex; 99 int histmax; 100 int repeat = 0; 101 int sameline; 102 int64_t total_bytes; 103 time_t base_t = time(NULL); 104 struct timeval bwtv; 105 uint64_t bwcount; 106 uint64_t estbytes; 107 108 if (ac == 0 || ac > 2) 109 mirror_usage(1); 110 filesystem = av[0]; 111 hammer_check_restrict(filesystem); 112 113 pickup.signature = 0; 114 pickup.type = 0; 115 histogram = 0; 116 histindex = 0; 117 histmax = 0; 118 histogram_ary = NULL; 119 sameline = 0; 120 121 again: 122 bzero(&mirror, sizeof(mirror)); 123 hammer_key_beg_init(&mirror.key_beg); 124 hammer_key_end_init(&mirror.key_end); 125 126 fd = getpfs(&pfs, filesystem); 127 128 if (streaming >= 0) 129 score_printf(LINE1, "Running"); 130 131 if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) { 132 fprintf(stderr, "%cRunning \b\b", (sameline ? '\r' : '\n')); 133 fflush(stderr); 134 sameline = 1; 135 } 136 sameline = 1; 137 total_bytes = 0; 138 gettimeofday(&bwtv, NULL); 139 bwcount = 0; 140 141 /* 142 * Send initial header for the purpose of determining the 143 * shared-uuid. 144 */ 145 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 146 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 147 &mrec_tmp, sizeof(mrec_tmp.pfs)); 148 149 /* 150 * In 2-way mode the target will send us a PFS info packet 151 * first. Use the target's current snapshot TID as our default 152 * begin TID. 153 */ 154 if (TwoWayPipeOpt) { 155 mirror.tid_beg = 0; 156 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 157 NULL, &mirror.tid_beg); 158 if (n < 0) { /* got TERM record */ 159 relpfs(fd, &pfs); 160 free(buf); 161 free(histogram_ary); 162 return; 163 } 164 ++mirror.tid_beg; 165 } else if (streaming && histogram) { 166 mirror.tid_beg = histogram_ary[histindex].tid + 1; 167 } else { 168 mirror.tid_beg = 0; 169 } 170 171 /* 172 * Write out the PFS header, tid_beg will be updated if our PFS 173 * has a larger begin sync. tid_end is set to the latest source 174 * TID whos flush cycle has completed. 175 */ 176 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 177 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 178 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 179 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 180 mirror.ubuf = buf; 181 mirror.size = SERIALBUF_SIZE; 182 mirror.pfs_id = pfs.pfs_id; 183 mirror.shared_uuid = pfs.ondisk->shared_uuid; 184 185 /* 186 * XXX If the histogram is exhausted and the TID delta is large 187 * the stream might have been offline for a while and is 188 * now picking it up again. Do another histogram. 189 */ 190 #if 0 191 if (streaming && histogram && histindex == histend) { 192 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM) 193 histogram = 0; 194 } 195 #endif 196 197 /* 198 * Initial bulk startup control, try to do some incremental 199 * mirroring in order to allow the stream to be killed and 200 * restarted without having to start over. 201 */ 202 if (histogram == 0 && BulkOpt == 0) { 203 if (VerboseOpt && repeat == 0) { 204 fprintf(stderr, "\n"); 205 sameline = 0; 206 } 207 histmax = generate_histogram(fd, filesystem, 208 &histogram_ary, &mirror, 209 &repeat); 210 histindex = 0; 211 histogram = 1; 212 213 /* 214 * Just stream the histogram, then stop 215 */ 216 if (streaming == 0) 217 streaming = -1; 218 } 219 220 if (streaming && histogram) { 221 ++histindex; 222 mirror.tid_end = histogram_ary[histindex].tid; 223 estbytes = histogram_ary[histindex-1].bytes; 224 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end; 225 } else { 226 estbytes = 0; 227 } 228 229 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 230 &mrec_tmp, sizeof(mrec_tmp.pfs)); 231 232 /* 233 * A cycle file overrides the beginning TID only if we are 234 * not operating in two-way or histogram mode. 235 */ 236 if (TwoWayPipeOpt == 0 && histogram == 0) { 237 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 238 } 239 240 /* 241 * An additional argument overrides the beginning TID regardless 242 * of what mode we are in. This is not recommending if operating 243 * in two-way mode. 244 */ 245 if (ac == 2) 246 mirror.tid_beg = strtoull(av[1], NULL, 0); 247 248 if (streaming == 0 || VerboseOpt >= 2) { 249 fprintf(stderr, 250 "Mirror-read: Mirror %016jx to %016jx", 251 (uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end); 252 if (histogram) 253 fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes); 254 fprintf(stderr, "\n"); 255 fflush(stderr); 256 } 257 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 258 fprintf(stderr, "Mirror-read: Resuming at object %016jx\n", 259 (uintmax_t)mirror.key_beg.obj_id); 260 } 261 262 /* 263 * Nothing to do if begin equals end. 264 */ 265 if (mirror.tid_beg >= mirror.tid_end) { 266 if (streaming == 0 || VerboseOpt >= 2) 267 fprintf(stderr, "Mirror-read: No work to do\n"); 268 sleep(DelayOpt); 269 didwork = 0; 270 histogram = 0; 271 goto done; 272 } 273 didwork = 1; 274 275 /* 276 * Write out bulk records 277 */ 278 mirror.ubuf = buf; 279 mirror.size = SERIALBUF_SIZE; 280 281 do { 282 mirror.count = 0; 283 mirror.pfs_id = pfs.pfs_id; 284 mirror.shared_uuid = pfs.ondisk->shared_uuid; 285 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 286 score_printf(LINE3, "Mirror-read %s failed: %s", 287 filesystem, strerror(errno)); 288 fprintf(stderr, "Mirror-read %s failed: %s\n", 289 filesystem, strerror(errno)); 290 exit(1); 291 } 292 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 293 score_printf(LINE3, "Mirror-read %s fatal error %d", 294 filesystem, mirror.head.error); 295 fprintf(stderr, 296 "Mirror-read %s fatal error %d\n", 297 filesystem, mirror.head.error); 298 exit(1); 299 } 300 if (mirror.count) { 301 if (BandwidthOpt) { 302 n = writebw(1, mirror.ubuf, mirror.count, 303 &bwcount, &bwtv); 304 } else { 305 n = write(1, mirror.ubuf, mirror.count); 306 } 307 if (n != mirror.count) { 308 score_printf(LINE3, 309 "Mirror-read %s failed: " 310 "short write", 311 filesystem); 312 fprintf(stderr, 313 "Mirror-read %s failed: " 314 "short write\n", 315 filesystem); 316 exit(1); 317 } 318 } 319 total_bytes += mirror.count; 320 if (streaming && VerboseOpt) { 321 fprintf(stderr, 322 "\rscan obj=%016jx tids=%016jx:%016jx %11jd", 323 (uintmax_t)mirror.key_cur.obj_id, 324 (uintmax_t)mirror.tid_beg, 325 (uintmax_t)mirror.tid_end, 326 (intmax_t)total_bytes); 327 fflush(stderr); 328 sameline = 0; 329 } else if (streaming) { 330 score_printf(LINE2, 331 "obj=%016jx tids=%016jx:%016jx %11jd", 332 (uintmax_t)mirror.key_cur.obj_id, 333 (uintmax_t)mirror.tid_beg, 334 (uintmax_t)mirror.tid_end, 335 (intmax_t)total_bytes); 336 } 337 mirror.key_beg = mirror.key_cur; 338 339 /* 340 * Deal with time limit option 341 */ 342 if (TimeoutOpt && 343 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 344 score_printf(LINE3, 345 "Mirror-read %s interrupted by timer at" 346 " %016jx", 347 filesystem, 348 (uintmax_t)mirror.key_cur.obj_id); 349 fprintf(stderr, 350 "Mirror-read %s interrupted by timer at" 351 " %016jx\n", 352 filesystem, 353 (uintmax_t)mirror.key_cur.obj_id); 354 interrupted = 1; 355 break; 356 } 357 } while (mirror.count != 0); 358 359 done: 360 if (streaming && VerboseOpt && sameline == 0) { 361 fprintf(stderr, "\n"); 362 fflush(stderr); 363 sameline = 1; 364 } 365 366 /* 367 * Write out the termination sync record - only if not interrupted 368 */ 369 if (interrupted == 0) { 370 if (didwork) { 371 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 372 &mrec_tmp, sizeof(mrec_tmp.sync)); 373 } else { 374 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 375 &mrec_tmp, sizeof(mrec_tmp.sync)); 376 } 377 } 378 379 /* 380 * If the -2 option was given (automatic when doing mirror-copy), 381 * a two-way pipe is assumed and we expect a response mrec from 382 * the target. 383 */ 384 if (TwoWayPipeOpt) { 385 mrec = read_mrecord(0, &error, &pickup); 386 if (mrec == NULL || 387 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 388 mrec->head.rec_size != sizeof(mrec->update)) { 389 fprintf(stderr, "mirror_read: Did not get final " 390 "acknowledgement packet from target\n"); 391 exit(1); 392 } 393 if (interrupted) { 394 if (CyclePath) { 395 hammer_set_cycle(&mirror.key_cur, 396 mirror.tid_beg); 397 fprintf(stderr, "Cyclefile %s updated for " 398 "continuation\n", CyclePath); 399 } 400 } else { 401 sync_tid = mrec->update.tid; 402 if (CyclePath) { 403 hammer_key_beg_init(&mirror.key_beg); 404 hammer_set_cycle(&mirror.key_beg, sync_tid); 405 fprintf(stderr, 406 "Cyclefile %s updated to 0x%016jx\n", 407 CyclePath, (uintmax_t)sync_tid); 408 } 409 } 410 free(mrec); 411 } else if (CyclePath) { 412 /* NOTE! mirror.tid_beg cannot be updated */ 413 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 414 "fully updated unless you use mirror-copy\n"); 415 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 416 } 417 if (streaming && interrupted == 0) { 418 time_t t1 = time(NULL); 419 time_t t2; 420 421 /* 422 * Try to break down large bulk transfers into smaller ones 423 * so it can sync the transaction id on the slave. This 424 * way if we get interrupted a restart doesn't have to 425 * start from scratch. 426 */ 427 if (streaming && histogram) { 428 if (histindex != histmax) { 429 if (VerboseOpt && VerboseOpt < 2 && 430 streaming >= 0) { 431 fprintf(stderr, " (bulk incremental)"); 432 } 433 relpfs(fd, &pfs); 434 goto again; 435 } 436 } 437 438 if (VerboseOpt && streaming >= 0) { 439 fprintf(stderr, " W"); 440 fflush(stderr); 441 } else if (streaming >= 0) { 442 score_printf(LINE1, "Waiting"); 443 } 444 pfs.ondisk->sync_end_tid = mirror.tid_end; 445 if (streaming < 0) { 446 /* 447 * Fake streaming mode when using a histogram to 448 * break up a mirror-read, do not wait on source. 449 */ 450 streaming = 0; 451 } else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 452 score_printf(LINE3, 453 "Mirror-read %s: cannot stream: %s\n", 454 filesystem, strerror(errno)); 455 fprintf(stderr, 456 "Mirror-read %s: cannot stream: %s\n", 457 filesystem, strerror(errno)); 458 } else { 459 t2 = time(NULL) - t1; 460 if (t2 >= 0 && t2 < DelayOpt) { 461 if (VerboseOpt) { 462 fprintf(stderr, "\bD"); 463 fflush(stderr); 464 } 465 sleep(DelayOpt - t2); 466 } 467 if (VerboseOpt) { 468 fprintf(stderr, "\b "); 469 fflush(stderr); 470 } 471 relpfs(fd, &pfs); 472 goto again; 473 } 474 } 475 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 476 &mrec_tmp, sizeof(mrec_tmp.sync)); 477 relpfs(fd, &pfs); 478 free(buf); 479 free(histogram_ary); 480 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 481 } 482 483 /* 484 * What we are trying to do here is figure out how much data is 485 * going to be sent for the TID range and to break the TID range 486 * down into reasonably-sized slices (from the point of view of 487 * data sent) so a lost connection can restart at a reasonable 488 * place and not all the way back at the beginning. 489 * 490 * An entry's TID serves as the end_tid for the prior entry 491 * So we have to offset the calculation by 1 so that TID falls into 492 * the previous entry when populating entries. 493 * 494 * Because the transaction id space is bursty we need a relatively 495 * large number of buckets (like a million) to do a reasonable job 496 * for things like an initial bulk mirrors on a very large filesystem. 497 */ 498 #define HIST_COUNT (1024 * 1024) 499 500 static int 501 generate_histogram(int fd, const char *filesystem, 502 histogram_t *histogram_ary, 503 struct hammer_ioc_mirror_rw *mirror_base, 504 int *repeatp) 505 { 506 struct hammer_ioc_mirror_rw mirror; 507 union hammer_ioc_mrecord_any *mrec; 508 hammer_tid_t tid_beg; 509 hammer_tid_t tid_end; 510 hammer_tid_t tid; 511 hammer_tid_t tidx; 512 uint64_t *tid_bytes; 513 uint64_t total; 514 uint64_t accum; 515 int chunkno; 516 int i; 517 int res; 518 int off; 519 int len; 520 521 mirror = *mirror_base; 522 tid_beg = mirror.tid_beg; 523 tid_end = mirror.tid_end; 524 mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA; 525 526 if (*histogram_ary == NULL) { 527 *histogram_ary = malloc(sizeof(struct histogram) * 528 (HIST_COUNT + 2)); 529 } 530 if (tid_beg >= tid_end) 531 return(0); 532 533 /* needs 2 extra */ 534 tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2)); 535 bzero(tid_bytes, sizeof(*tid_bytes) * (HIST_COUNT + 2)); 536 537 if (*repeatp == 0) { 538 fprintf(stderr, "Prescan to break up bulk transfer"); 539 if (VerboseOpt > 1) 540 fprintf(stderr, " (%juMB chunks)", 541 (uintmax_t)(SplitupOpt / (1024 * 1024))); 542 fprintf(stderr, "\n"); 543 } 544 545 /* 546 * Note: (tid_beg,tid_end), range is inclusive of both beg & end. 547 * 548 * Note: Estimates can be off when the mirror is way behind due 549 * to skips. 550 */ 551 total = 0; 552 accum = 0; 553 chunkno = 0; 554 for (;;) { 555 mirror.count = 0; 556 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 557 fprintf(stderr, "Mirror-read %s failed: %s\n", 558 filesystem, strerror(errno)); 559 exit(1); 560 } 561 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 562 fprintf(stderr, 563 "Mirror-read %s fatal error %d\n", 564 filesystem, mirror.head.error); 565 exit(1); 566 } 567 for (off = 0; 568 off < mirror.count; 569 off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size)) { 570 mrec = (void *)((char *)mirror.ubuf + off); 571 572 /* 573 * We only care about general RECs and PASS 574 * records. We ignore SKIPs. 575 */ 576 switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) { 577 case HAMMER_MREC_TYPE_REC: 578 case HAMMER_MREC_TYPE_PASS: 579 break; 580 default: 581 continue; 582 } 583 584 /* 585 * Calculate for two indices, create_tid and 586 * delete_tid. Record data only applies to 587 * the create_tid. 588 * 589 * When tid is exactly on the boundary it really 590 * belongs to the previous entry because scans 591 * are inclusive of the ending entry. 592 */ 593 tid = mrec->rec.leaf.base.delete_tid; 594 if (tid && tid >= tid_beg && tid <= tid_end) { 595 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 596 if (mrec->head.type == 597 HAMMER_MREC_TYPE_REC) { 598 len -= HAMMER_HEAD_DOALIGN( 599 mrec->rec.leaf.data_len); 600 assert(len > 0); 601 } 602 i = (tid - tid_beg) * HIST_COUNT / 603 (tid_end - tid_beg); 604 tidx = tid_beg + i * (tid_end - tid_beg) / 605 HIST_COUNT; 606 if (tid == tidx && i) 607 --i; 608 assert(i >= 0 && i < HIST_COUNT); 609 tid_bytes[i] += len; 610 total += len; 611 accum += len; 612 } 613 614 tid = mrec->rec.leaf.base.create_tid; 615 if (tid && tid >= tid_beg && tid <= tid_end) { 616 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 617 if (mrec->head.type == 618 HAMMER_MREC_TYPE_REC_NODATA) { 619 len += HAMMER_HEAD_DOALIGN( 620 mrec->rec.leaf.data_len); 621 } 622 i = (tid - tid_beg) * HIST_COUNT / 623 (tid_end - tid_beg); 624 tidx = tid_beg + i * (tid_end - tid_beg) / 625 HIST_COUNT; 626 if (tid == tidx && i) 627 --i; 628 assert(i >= 0 && i < HIST_COUNT); 629 tid_bytes[i] += len; 630 total += len; 631 accum += len; 632 } 633 } 634 if (*repeatp == 0 && accum > SplitupOpt) { 635 if (VerboseOpt > 1) { 636 fprintf(stderr, "."); 637 fflush(stderr); 638 } 639 ++chunkno; 640 score_printf(LINE2, "Prescan chunk %d", chunkno); 641 accum = 0; 642 } 643 if (mirror.count == 0) 644 break; 645 mirror.key_beg = mirror.key_cur; 646 } 647 648 /* 649 * Reduce to SplitupOpt (default 4GB) chunks. This code may 650 * use up to two additional elements. Do the array in-place. 651 * 652 * Inefficient degenerate cases can occur if we do not accumulate 653 * at least the requested split amount, so error on the side of 654 * going over a bit. 655 */ 656 res = 0; 657 (*histogram_ary)[res].tid = tid_beg; 658 (*histogram_ary)[res].bytes = tid_bytes[0]; 659 for (i = 1; i < HIST_COUNT; ++i) { 660 if ((*histogram_ary)[res].bytes >= SplitupOpt) { 661 ++res; 662 (*histogram_ary)[res].tid = tid_beg + 663 i * (tid_end - tid_beg) / 664 HIST_COUNT; 665 (*histogram_ary)[res].bytes = 0; 666 667 } 668 (*histogram_ary)[res].bytes += tid_bytes[i]; 669 } 670 ++res; 671 (*histogram_ary)[res].tid = tid_end; 672 (*histogram_ary)[res].bytes = -1; 673 674 if (*repeatp == 0) { 675 if (VerboseOpt > 1) 676 fprintf(stderr, "\n"); /* newline after ... */ 677 score_printf(LINE3, "Prescan %d chunks, total %ju MBytes", 678 res, (uintmax_t)total / (1024 * 1024)); 679 fprintf(stderr, "Prescan %d chunks, total %ju MBytes (", 680 res, (uintmax_t)total / (1024 * 1024)); 681 for (i = 0; i < res && i < 3; ++i) { 682 if (i) 683 fprintf(stderr, ", "); 684 fprintf(stderr, "%ju", 685 (uintmax_t)(*histogram_ary)[i].bytes); 686 } 687 if (i < res) 688 fprintf(stderr, ", ..."); 689 fprintf(stderr, ")\n"); 690 } 691 assert(res <= HIST_COUNT); 692 *repeatp = 1; 693 694 free(tid_bytes); 695 return(res); 696 } 697 698 static void 699 create_pfs(const char *filesystem, uuid_t *s_uuid) 700 { 701 if (ForceYesOpt == 1) { 702 fprintf(stderr, "PFS slave %s does not exist. " 703 "Auto create new slave PFS!\n", filesystem); 704 705 } else { 706 fprintf(stderr, "PFS slave %s does not exist.\n" 707 "Do you want to create a new slave PFS? (yes|no) ", 708 filesystem); 709 fflush(stderr); 710 if (getyn() != 1) { 711 fprintf(stderr, "Aborting operation\n"); 712 exit(1); 713 } 714 } 715 716 uint32_t status; 717 char *shared_uuid = NULL; 718 uuid_to_string(s_uuid, &shared_uuid, &status); 719 720 char *cmd = NULL; 721 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 722 filesystem, shared_uuid); 723 free(shared_uuid); 724 725 if (cmd == NULL) { 726 fprintf(stderr, "Failed to alloc memory\n"); 727 exit(1); 728 } 729 if (system(cmd) != 0) { 730 fprintf(stderr, "Failed to create PFS\n"); 731 } 732 free(cmd); 733 } 734 735 /* 736 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 737 * some additional packet types to negotiate TID ranges and to verify 738 * completion. The HAMMER VFS does most of the work. 739 * 740 * It is important to note that the mirror.key_{beg,end} range must 741 * match the ranged used by the original. For now both sides use 742 * range the entire key space. 743 * 744 * It is even more important that the records in the stream conform 745 * to the TID range also supplied in the stream. The HAMMER VFS will 746 * use the REC, PASS, and SKIP record types to track the portions of 747 * the B-Tree being scanned in order to be able to proactively delete 748 * records on the target within those active areas that are not mentioned 749 * by the source. 750 * 751 * The mirror.key_cur field is used by the VFS to do this tracking. It 752 * must be initialized to key_beg but then is persistently updated by 753 * the HAMMER VFS on each successive ioctl() call. If you blow up this 754 * field you will blow up the mirror target, possibly to the point of 755 * deleting everything. As a safety measure the HAMMER VFS simply marks 756 * the records that the source has destroyed as deleted on the target, 757 * and normal pruning operations will deal with their final disposition 758 * at some later time. 759 */ 760 void 761 hammer_cmd_mirror_write(char **av, int ac) 762 { 763 struct hammer_ioc_mirror_rw mirror; 764 char *filesystem; 765 char *buf = malloc(SERIALBUF_SIZE); 766 struct hammer_ioc_pseudofs_rw pfs; 767 struct hammer_ioc_mrecord_head pickup; 768 struct hammer_ioc_synctid synctid; 769 union hammer_ioc_mrecord_any mrec_tmp; 770 hammer_ioc_mrecord_any_t mrec; 771 struct stat st; 772 int error; 773 int fd; 774 int n; 775 776 if (ac != 1) 777 mirror_usage(1); 778 filesystem = av[0]; 779 hammer_check_restrict(filesystem); 780 781 pickup.signature = 0; 782 pickup.type = 0; 783 784 again: 785 bzero(&mirror, sizeof(mirror)); 786 hammer_key_beg_init(&mirror.key_beg); 787 hammer_key_end_init(&mirror.key_end); 788 mirror.key_end = mirror.key_beg; 789 790 /* 791 * Read initial packet 792 */ 793 mrec = read_mrecord(0, &error, &pickup); 794 if (mrec == NULL) { 795 if (error == 0) 796 fprintf(stderr, "validate_mrec_header: short read\n"); 797 exit(1); 798 } 799 /* 800 * Validate packet 801 */ 802 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 803 free(buf); 804 return; 805 } 806 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 807 fprintf(stderr, "validate_mrec_header: did not get expected " 808 "PFSD record type\n"); 809 exit(1); 810 } 811 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 812 fprintf(stderr, "validate_mrec_header: unexpected payload " 813 "size\n"); 814 exit(1); 815 } 816 817 /* 818 * Create slave PFS if it doesn't yet exist 819 */ 820 if (lstat(filesystem, &st) != 0) { 821 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 822 } 823 free(mrec); 824 mrec = NULL; 825 826 fd = getpfs(&pfs, filesystem); 827 828 /* 829 * In two-way mode the target writes out a PFS packet first. 830 * The source uses our tid_end as its tid_beg by default, 831 * picking up where it left off. 832 */ 833 mirror.tid_beg = 0; 834 if (TwoWayPipeOpt) { 835 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 836 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 837 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 838 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 839 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 840 &mrec_tmp, sizeof(mrec_tmp.pfs)); 841 } 842 843 /* 844 * Read and process the PFS header. The source informs us of 845 * the TID range the stream represents. 846 */ 847 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 848 &mirror.tid_beg, &mirror.tid_end); 849 if (n < 0) { /* got TERM record */ 850 relpfs(fd, &pfs); 851 free(buf); 852 return; 853 } 854 855 mirror.ubuf = buf; 856 mirror.size = SERIALBUF_SIZE; 857 858 /* 859 * Read and process bulk records (REC, PASS, and SKIP types). 860 * 861 * On your life, do NOT mess with mirror.key_cur or your mirror 862 * target may become history. 863 */ 864 for (;;) { 865 mirror.count = 0; 866 mirror.pfs_id = pfs.pfs_id; 867 mirror.shared_uuid = pfs.ondisk->shared_uuid; 868 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 869 if (mirror.size <= 0) 870 break; 871 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 872 fprintf(stderr, "Mirror-write %s failed: %s\n", 873 filesystem, strerror(errno)); 874 exit(1); 875 } 876 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 877 fprintf(stderr, 878 "Mirror-write %s fatal error %d\n", 879 filesystem, mirror.head.error); 880 exit(1); 881 } 882 #if 0 883 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 884 fprintf(stderr, 885 "Mirror-write %s interrupted by timer at" 886 " %016llx\n", 887 filesystem, 888 mirror.key_cur.obj_id); 889 exit(0); 890 } 891 #endif 892 } 893 894 /* 895 * Read and process the termination sync record. 896 */ 897 mrec = read_mrecord(0, &error, &pickup); 898 899 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 900 fprintf(stderr, "Mirror-write: received termination request\n"); 901 relpfs(fd, &pfs); 902 free(mrec); 903 free(buf); 904 return; 905 } 906 907 if (mrec == NULL || 908 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 909 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 910 mrec->head.rec_size != sizeof(mrec->sync)) { 911 fprintf(stderr, "Mirror-write %s: Did not get termination " 912 "sync record, or rec_size is wrong rt=%d\n", 913 filesystem, 914 (mrec ? (int)mrec->head.type : -1)); 915 exit(1); 916 } 917 918 /* 919 * Update the PFS info on the target so the user has visibility 920 * into the new snapshot, and sync the target filesystem. 921 */ 922 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 923 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 924 925 bzero(&synctid, sizeof(synctid)); 926 synctid.op = HAMMER_SYNCTID_SYNC2; 927 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 928 929 if (VerboseOpt >= 2) { 930 fprintf(stderr, "Mirror-write %s: succeeded\n", 931 filesystem); 932 } 933 } 934 935 free(mrec); 936 mrec = NULL; 937 938 /* 939 * Report back to the originator. 940 */ 941 if (TwoWayPipeOpt) { 942 mrec_tmp.update.tid = mirror.tid_end; 943 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 944 &mrec_tmp, sizeof(mrec_tmp.update)); 945 } else { 946 printf("Source can update synctid to 0x%016jx\n", 947 (uintmax_t)mirror.tid_end); 948 } 949 relpfs(fd, &pfs); 950 goto again; 951 } 952 953 void 954 hammer_cmd_mirror_dump(char **av, int ac) 955 { 956 char *buf = malloc(SERIALBUF_SIZE); 957 struct hammer_ioc_mrecord_head pickup; 958 hammer_ioc_mrecord_any_t mrec; 959 int error; 960 int size; 961 int offset; 962 int bytes; 963 int header_only = 0; 964 965 if (ac == 1 && strcmp(*av, "header") == 0) 966 header_only = 1; 967 else if (ac != 0) 968 mirror_usage(1); 969 970 /* 971 * Read and process the PFS header 972 */ 973 pickup.signature = 0; 974 pickup.type = 0; 975 976 mrec = read_mrecord(0, &error, &pickup); 977 978 /* 979 * Dump the PFS header. mirror-dump takes its input from the output 980 * of a mirror-read so getpfs() can't be used to get a fd to be passed 981 * to dump_pfsd(). 982 */ 983 if (header_only && mrec != NULL) { 984 dump_pfsd(&mrec->pfs.pfsd, -1); 985 free(mrec); 986 free(buf); 987 return; 988 } 989 free(mrec); 990 991 again: 992 /* 993 * Read and process bulk records 994 */ 995 for (;;) { 996 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 997 if (size <= 0) 998 break; 999 offset = 0; 1000 while (offset < size) { 1001 mrec = (void *)((char *)buf + offset); 1002 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 1003 if (offset + bytes > size) { 1004 fprintf(stderr, "Misaligned record\n"); 1005 exit(1); 1006 } 1007 1008 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 1009 case HAMMER_MREC_TYPE_REC_BADCRC: 1010 case HAMMER_MREC_TYPE_REC: 1011 printf("Record lo=%08x obj=%016jx key=%016jx " 1012 "rt=%02x ot=%02x", 1013 mrec->rec.leaf.base.localization, 1014 (uintmax_t)mrec->rec.leaf.base.obj_id, 1015 (uintmax_t)mrec->rec.leaf.base.key, 1016 mrec->rec.leaf.base.rec_type, 1017 mrec->rec.leaf.base.obj_type); 1018 if (mrec->head.type == 1019 HAMMER_MREC_TYPE_REC_BADCRC) { 1020 printf(" (BAD CRC)"); 1021 } 1022 printf("\n"); 1023 printf(" tids %016jx:%016jx data=%d\n", 1024 (uintmax_t)mrec->rec.leaf.base.create_tid, 1025 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1026 mrec->rec.leaf.data_len); 1027 break; 1028 case HAMMER_MREC_TYPE_PASS: 1029 printf("Pass lo=%08x obj=%016jx key=%016jx " 1030 "rt=%02x ot=%02x\n", 1031 mrec->rec.leaf.base.localization, 1032 (uintmax_t)mrec->rec.leaf.base.obj_id, 1033 (uintmax_t)mrec->rec.leaf.base.key, 1034 mrec->rec.leaf.base.rec_type, 1035 mrec->rec.leaf.base.obj_type); 1036 printf(" tids %016jx:%016jx data=%d\n", 1037 (uintmax_t)mrec->rec.leaf.base.create_tid, 1038 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1039 mrec->rec.leaf.data_len); 1040 break; 1041 case HAMMER_MREC_TYPE_SKIP: 1042 printf("Skip lo=%08x obj=%016jx key=%016jx rt=%02x to\n" 1043 " lo=%08x obj=%016jx key=%016jx rt=%02x\n", 1044 mrec->skip.skip_beg.localization, 1045 (uintmax_t)mrec->skip.skip_beg.obj_id, 1046 (uintmax_t)mrec->skip.skip_beg.key, 1047 mrec->skip.skip_beg.rec_type, 1048 mrec->skip.skip_end.localization, 1049 (uintmax_t)mrec->skip.skip_end.obj_id, 1050 (uintmax_t)mrec->skip.skip_end.key, 1051 mrec->skip.skip_end.rec_type); 1052 default: 1053 break; 1054 } 1055 offset += bytes; 1056 } 1057 } 1058 1059 /* 1060 * Read and process the termination sync record. 1061 */ 1062 mrec = read_mrecord(0, &error, &pickup); 1063 if (mrec == NULL || 1064 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 1065 mrec->head.type != HAMMER_MREC_TYPE_IDLE)) { 1066 fprintf(stderr, "Mirror-dump: Did not get termination " 1067 "sync record\n"); 1068 } 1069 free(mrec); 1070 1071 /* 1072 * Continue with more batches until EOF. 1073 */ 1074 mrec = read_mrecord(0, &error, &pickup); 1075 if (mrec) { 1076 free(mrec); 1077 goto again; 1078 } 1079 free(buf); 1080 } 1081 1082 void 1083 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 1084 { 1085 pid_t pid1; 1086 pid_t pid2; 1087 int fds[2]; 1088 const char *xav[32]; 1089 char tbuf[16]; 1090 char *sh, *user, *host, *rfs; 1091 int xac; 1092 1093 if (ac != 2) 1094 mirror_usage(1); 1095 1096 TwoWayPipeOpt = 1; 1097 signal(SIGPIPE, SIG_IGN); 1098 1099 again: 1100 if (pipe(fds) < 0) { 1101 perror("pipe"); 1102 exit(1); 1103 } 1104 1105 /* 1106 * Source 1107 */ 1108 if ((pid1 = fork()) == 0) { 1109 signal(SIGPIPE, SIG_DFL); 1110 dup2(fds[0], 0); 1111 dup2(fds[0], 1); 1112 close(fds[0]); 1113 close(fds[1]); 1114 if ((rfs = strchr(av[0], ':')) != NULL) { 1115 xac = 0; 1116 1117 if((sh = getenv("HAMMER_RSH")) == NULL) 1118 xav[xac++] = "ssh"; 1119 else 1120 xav[xac++] = sh; 1121 1122 if (CompressOpt) 1123 xav[xac++] = "-C"; 1124 1125 if ((host = strchr(av[0], '@')) != NULL) { 1126 user = strndup( av[0], (host++ - av[0])); 1127 host = strndup( host, (rfs++ - host)); 1128 xav[xac++] = "-l"; 1129 xav[xac++] = user; 1130 xav[xac++] = host; 1131 } 1132 else { 1133 host = strndup( av[0], (rfs++ - av[0])); 1134 user = NULL; 1135 xav[xac++] = host; 1136 } 1137 1138 1139 if (SshPort) { 1140 xav[xac++] = "-p"; 1141 xav[xac++] = SshPort; 1142 } 1143 1144 xav[xac++] = "hammer"; 1145 1146 switch(VerboseOpt) { 1147 case 0: 1148 break; 1149 case 1: 1150 xav[xac++] = "-v"; 1151 break; 1152 case 2: 1153 xav[xac++] = "-vv"; 1154 break; 1155 default: 1156 xav[xac++] = "-vvv"; 1157 break; 1158 } 1159 if (ForceYesOpt) { 1160 xav[xac++] = "-y"; 1161 } 1162 xav[xac++] = "-2"; 1163 if (TimeoutOpt) { 1164 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 1165 xav[xac++] = "-t"; 1166 xav[xac++] = tbuf; 1167 } 1168 if (SplitupOptStr) { 1169 xav[xac++] = "-S"; 1170 xav[xac++] = SplitupOptStr; 1171 } 1172 if (streaming) 1173 xav[xac++] = "mirror-read-stream"; 1174 else 1175 xav[xac++] = "mirror-read"; 1176 xav[xac++] = rfs; 1177 xav[xac++] = NULL; 1178 execvp(*xav, (void *)xav); 1179 } else { 1180 hammer_cmd_mirror_read(av, 1, streaming); 1181 fflush(stdout); 1182 fflush(stderr); 1183 } 1184 _exit(1); 1185 } 1186 1187 /* 1188 * Target 1189 */ 1190 if ((pid2 = fork()) == 0) { 1191 signal(SIGPIPE, SIG_DFL); 1192 dup2(fds[1], 0); 1193 dup2(fds[1], 1); 1194 close(fds[0]); 1195 close(fds[1]); 1196 if ((rfs = strchr(av[1], ':')) != NULL) { 1197 xac = 0; 1198 1199 if((sh = getenv("HAMMER_RSH")) == NULL) 1200 xav[xac++] = "ssh"; 1201 else 1202 xav[xac++] = sh; 1203 1204 if (CompressOpt) 1205 xav[xac++] = "-C"; 1206 1207 if ((host = strchr(av[1], '@')) != NULL) { 1208 user = strndup( av[1], (host++ - av[1])); 1209 host = strndup( host, (rfs++ - host)); 1210 xav[xac++] = "-l"; 1211 xav[xac++] = user; 1212 xav[xac++] = host; 1213 } 1214 else { 1215 host = strndup( av[1], (rfs++ - av[1])); 1216 user = NULL; 1217 xav[xac++] = host; 1218 } 1219 1220 if (SshPort) { 1221 xav[xac++] = "-p"; 1222 xav[xac++] = SshPort; 1223 } 1224 1225 xav[xac++] = "hammer"; 1226 1227 switch(VerboseOpt) { 1228 case 0: 1229 break; 1230 case 1: 1231 xav[xac++] = "-v"; 1232 break; 1233 case 2: 1234 xav[xac++] = "-vv"; 1235 break; 1236 default: 1237 xav[xac++] = "-vvv"; 1238 break; 1239 } 1240 if (ForceYesOpt) { 1241 xav[xac++] = "-y"; 1242 } 1243 xav[xac++] = "-2"; 1244 xav[xac++] = "mirror-write"; 1245 xav[xac++] = rfs; 1246 xav[xac++] = NULL; 1247 execvp(*xav, (void *)xav); 1248 } else { 1249 hammer_cmd_mirror_write(av + 1, 1); 1250 fflush(stdout); 1251 fflush(stderr); 1252 } 1253 _exit(1); 1254 } 1255 close(fds[0]); 1256 close(fds[1]); 1257 1258 while (waitpid(pid1, NULL, 0) <= 0) 1259 ; 1260 while (waitpid(pid2, NULL, 0) <= 0) 1261 ; 1262 1263 /* 1264 * If the link is lost restart 1265 */ 1266 if (streaming) { 1267 if (VerboseOpt) { 1268 fprintf(stderr, "\nLost Link\n"); 1269 fflush(stderr); 1270 } 1271 sleep(15 + DelayOpt); 1272 goto again; 1273 } 1274 1275 } 1276 1277 /* 1278 * Read and return multiple mrecords 1279 */ 1280 static int 1281 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 1282 { 1283 hammer_ioc_mrecord_any_t mrec; 1284 u_int count; 1285 size_t n; 1286 size_t i; 1287 size_t bytes; 1288 int type; 1289 1290 count = 0; 1291 while (size - count >= HAMMER_MREC_HEADSIZE) { 1292 /* 1293 * Cached the record header in case we run out of buffer 1294 * space. 1295 */ 1296 fflush(stdout); 1297 if (pickup->signature == 0) { 1298 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1299 i = read(fd, (char *)pickup + n, 1300 HAMMER_MREC_HEADSIZE - n); 1301 if (i <= 0) 1302 break; 1303 } 1304 if (n == 0) 1305 break; 1306 if (n != HAMMER_MREC_HEADSIZE) { 1307 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1308 exit(1); 1309 } 1310 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1311 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1312 "bad signature\n"); 1313 exit(1); 1314 } 1315 } 1316 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 1317 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 1318 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1319 "illegal rec_size\n"); 1320 exit(1); 1321 } 1322 1323 /* 1324 * Stop if we have insufficient space for the record and data. 1325 */ 1326 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 1327 if (size - count < bytes) 1328 break; 1329 1330 /* 1331 * Stop if the record type is not a REC, SKIP, or PASS, 1332 * which are the only types the ioctl supports. Other types 1333 * are used only by the userland protocol. 1334 * 1335 * Ignore all flags. 1336 */ 1337 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 1338 if (type != HAMMER_MREC_TYPE_PFSD && 1339 type != HAMMER_MREC_TYPE_REC && 1340 type != HAMMER_MREC_TYPE_SKIP && 1341 type != HAMMER_MREC_TYPE_PASS) { 1342 break; 1343 } 1344 1345 /* 1346 * Read the remainder and clear the pickup signature. 1347 */ 1348 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 1349 i = read(fd, buf + count + n, bytes - n); 1350 if (i <= 0) 1351 break; 1352 } 1353 if (n != bytes) { 1354 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1355 exit(1); 1356 } 1357 1358 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 1359 pickup->signature = 0; 1360 pickup->type = 0; 1361 mrec = (void *)(buf + count); 1362 1363 /* 1364 * Validate the completed record 1365 */ 1366 if (mrec->head.rec_crc != 1367 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1368 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1369 fprintf(stderr, "read_mrecords: malformed record " 1370 "on pipe, bad crc\n"); 1371 exit(1); 1372 } 1373 1374 /* 1375 * If its a B-Tree record validate the data crc. 1376 * 1377 * NOTE: If the VFS passes us an explicitly errorde mrec 1378 * we just pass it through. 1379 */ 1380 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 1381 1382 if (type == HAMMER_MREC_TYPE_REC) { 1383 if (mrec->head.rec_size < 1384 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 1385 fprintf(stderr, 1386 "read_mrecords: malformed record on " 1387 "pipe, illegal element data_len\n"); 1388 exit(1); 1389 } 1390 if (mrec->rec.leaf.data_len && 1391 mrec->rec.leaf.data_offset && 1392 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 1393 fprintf(stderr, 1394 "read_mrecords: data_crc did not " 1395 "match data! obj=%016jx key=%016jx\n", 1396 (uintmax_t)mrec->rec.leaf.base.obj_id, 1397 (uintmax_t)mrec->rec.leaf.base.key); 1398 fprintf(stderr, 1399 "continuing, but there are problems\n"); 1400 } 1401 } 1402 count += bytes; 1403 } 1404 return(count); 1405 } 1406 1407 /* 1408 * Read and return a single mrecord. 1409 */ 1410 static 1411 hammer_ioc_mrecord_any_t 1412 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 1413 { 1414 hammer_ioc_mrecord_any_t mrec; 1415 struct hammer_ioc_mrecord_head mrechd; 1416 size_t bytes; 1417 size_t n; 1418 size_t i; 1419 1420 if (pickup && pickup->type != 0) { 1421 mrechd = *pickup; 1422 pickup->signature = 0; 1423 pickup->type = 0; 1424 n = HAMMER_MREC_HEADSIZE; 1425 } else { 1426 /* 1427 * Read in the PFSD header from the sender. 1428 */ 1429 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1430 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 1431 if (i <= 0) 1432 break; 1433 } 1434 if (n == 0) { 1435 *errorp = 0; /* EOF */ 1436 return(NULL); 1437 } 1438 if (n != HAMMER_MREC_HEADSIZE) { 1439 fprintf(stderr, "short read of mrecord header\n"); 1440 *errorp = EPIPE; 1441 return(NULL); 1442 } 1443 } 1444 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1445 fprintf(stderr, "read_mrecord: bad signature\n"); 1446 *errorp = EINVAL; 1447 return(NULL); 1448 } 1449 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 1450 assert(bytes >= sizeof(mrechd)); 1451 mrec = malloc(bytes); 1452 mrec->head = mrechd; 1453 1454 while (n < bytes) { 1455 i = read(fdin, (char *)mrec + n, bytes - n); 1456 if (i <= 0) 1457 break; 1458 n += i; 1459 } 1460 if (n != bytes) { 1461 fprintf(stderr, "read_mrecord: short read on payload\n"); 1462 *errorp = EPIPE; 1463 return(NULL); 1464 } 1465 if (mrec->head.rec_crc != 1466 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1467 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1468 fprintf(stderr, "read_mrecord: bad CRC\n"); 1469 *errorp = EINVAL; 1470 return(NULL); 1471 } 1472 *errorp = 0; 1473 return(mrec); 1474 } 1475 1476 static 1477 void 1478 write_mrecord(int fdout, uint32_t type, hammer_ioc_mrecord_any_t mrec, 1479 int bytes) 1480 { 1481 char zbuf[HAMMER_HEAD_ALIGN]; 1482 int pad; 1483 1484 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 1485 1486 assert(bytes >= (int)sizeof(mrec->head)); 1487 bzero(&mrec->head, sizeof(mrec->head)); 1488 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 1489 mrec->head.type = type; 1490 mrec->head.rec_size = bytes; 1491 mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1492 bytes - HAMMER_MREC_CRCOFF); 1493 if (write(fdout, mrec, bytes) != bytes) { 1494 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1495 errno, strerror(errno)); 1496 exit(1); 1497 } 1498 if (pad) { 1499 bzero(zbuf, pad); 1500 if (write(fdout, zbuf, pad) != pad) { 1501 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1502 errno, strerror(errno)); 1503 exit(1); 1504 } 1505 } 1506 } 1507 1508 /* 1509 * Generate a mirroring header with the pfs information of the 1510 * originating filesytem. 1511 */ 1512 static void 1513 generate_mrec_header(int fd, int pfs_id, 1514 union hammer_ioc_mrecord_any *mrec_tmp) 1515 { 1516 struct hammer_ioc_pseudofs_rw pfs; 1517 1518 bzero(&pfs, sizeof(pfs)); 1519 bzero(mrec_tmp, sizeof(*mrec_tmp)); 1520 pfs.pfs_id = pfs_id; 1521 pfs.ondisk = &mrec_tmp->pfs.pfsd; 1522 pfs.bytes = sizeof(mrec_tmp->pfs.pfsd); 1523 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1524 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 1525 exit(1); 1526 } 1527 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1528 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n"); 1529 exit(1); 1530 } 1531 mrec_tmp->pfs.version = pfs.version; 1532 } 1533 1534 /* 1535 * Validate the pfs information from the originating filesystem 1536 * against the target filesystem. shared_uuid must match. 1537 * 1538 * return -1 if we got a TERM record 1539 */ 1540 static int 1541 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1542 struct hammer_ioc_mrecord_head *pickup, 1543 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1544 { 1545 struct hammer_ioc_pseudofs_rw pfs; 1546 struct hammer_pseudofs_data pfsd; 1547 hammer_ioc_mrecord_any_t mrec; 1548 int error; 1549 1550 /* 1551 * Get the PFSD info from the target filesystem. 1552 */ 1553 bzero(&pfs, sizeof(pfs)); 1554 bzero(&pfsd, sizeof(pfsd)); 1555 pfs.pfs_id = pfs_id; 1556 pfs.ondisk = &pfsd; 1557 pfs.bytes = sizeof(pfsd); 1558 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1559 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 1560 exit(1); 1561 } 1562 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1563 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n"); 1564 exit(1); 1565 } 1566 1567 mrec = read_mrecord(fdin, &error, pickup); 1568 if (mrec == NULL) { 1569 if (error == 0) 1570 fprintf(stderr, "validate_mrec_header: short read\n"); 1571 exit(1); 1572 } 1573 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1574 free(mrec); 1575 return(-1); 1576 } 1577 1578 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1579 fprintf(stderr, "validate_mrec_header: did not get expected " 1580 "PFSD record type\n"); 1581 exit(1); 1582 } 1583 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1584 fprintf(stderr, "validate_mrec_header: unexpected payload " 1585 "size\n"); 1586 exit(1); 1587 } 1588 if (mrec->pfs.version != pfs.version) { 1589 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 1590 exit(1); 1591 } 1592 1593 /* 1594 * Whew. Ok, is the read PFS info compatible with the target? 1595 */ 1596 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1597 sizeof(pfsd.shared_uuid)) != 0) { 1598 fprintf(stderr, 1599 "mirror-write: source and target have " 1600 "different shared-uuid's!\n"); 1601 exit(1); 1602 } 1603 if (is_target && 1604 (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) { 1605 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1606 exit(1); 1607 } 1608 if (tid_begp) 1609 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1610 if (tid_endp) 1611 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1612 free(mrec); 1613 return(0); 1614 } 1615 1616 static void 1617 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1618 { 1619 struct hammer_ioc_pseudofs_rw pfs; 1620 struct hammer_pseudofs_data pfsd; 1621 1622 bzero(&pfs, sizeof(pfs)); 1623 bzero(&pfsd, sizeof(pfsd)); 1624 pfs.pfs_id = pfs_id; 1625 pfs.ondisk = &pfsd; 1626 pfs.bytes = sizeof(pfsd); 1627 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1628 perror("update_pfs_snapshot (read)"); 1629 exit(1); 1630 } 1631 if (pfsd.sync_end_tid != snapshot_tid) { 1632 pfsd.sync_end_tid = snapshot_tid; 1633 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1634 perror("update_pfs_snapshot (rewrite)"); 1635 exit(1); 1636 } 1637 if (VerboseOpt >= 2) { 1638 fprintf(stderr, 1639 "Mirror-write: Completed, updated snapshot " 1640 "to %016jx\n", 1641 (uintmax_t)snapshot_tid); 1642 fflush(stderr); 1643 } 1644 } 1645 } 1646 1647 /* 1648 * Bandwidth-limited write in chunks 1649 */ 1650 static 1651 ssize_t 1652 writebw(int fd, const void *buf, size_t nbytes, 1653 uint64_t *bwcount, struct timeval *tv1) 1654 { 1655 struct timeval tv2; 1656 size_t n; 1657 ssize_t r; 1658 ssize_t a; 1659 int usec; 1660 1661 a = 0; 1662 r = 0; 1663 while (nbytes) { 1664 if (*bwcount + nbytes > BandwidthOpt) 1665 n = BandwidthOpt - *bwcount; 1666 else 1667 n = nbytes; 1668 if (n) 1669 r = write(fd, buf, n); 1670 if (r >= 0) { 1671 a += r; 1672 nbytes -= r; 1673 buf = (const char *)buf + r; 1674 } 1675 if ((size_t)r != n) 1676 break; 1677 *bwcount += n; 1678 if (*bwcount >= BandwidthOpt) { 1679 gettimeofday(&tv2, NULL); 1680 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1681 (int)(tv2.tv_usec - tv1->tv_usec); 1682 if (usec >= 0 && usec < 1000000) 1683 usleep(1000000 - usec); 1684 gettimeofday(tv1, NULL); 1685 *bwcount -= BandwidthOpt; 1686 } 1687 } 1688 return(a ? a : r); 1689 } 1690 1691 /* 1692 * Get a yes or no answer from the terminal. The program may be run as 1693 * part of a two-way pipe so we cannot use stdin for this operation. 1694 */ 1695 static int 1696 getyn(void) 1697 { 1698 char buf[256]; 1699 FILE *fp; 1700 int result; 1701 1702 fp = fopen("/dev/tty", "r"); 1703 if (fp == NULL) { 1704 fprintf(stderr, "No terminal for response\n"); 1705 return(-1); 1706 } 1707 result = -1; 1708 while (fgets(buf, sizeof(buf), fp) != NULL) { 1709 if (buf[0] == 'y' || buf[0] == 'Y') { 1710 result = 1; 1711 break; 1712 } 1713 if (buf[0] == 'n' || buf[0] == 'N') { 1714 result = 0; 1715 break; 1716 } 1717 fprintf(stderr, "Response not understood\n"); 1718 break; 1719 } 1720 fclose(fp); 1721 return(result); 1722 } 1723 1724 static void 1725 mirror_usage(int code) 1726 { 1727 fprintf(stderr, 1728 "hammer mirror-read <filesystem> [begin-tid]\n" 1729 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1730 "hammer mirror-write <filesystem>\n" 1731 "hammer mirror-dump [header]\n" 1732 "hammer mirror-copy [[user@]host:]<filesystem>" 1733 " [[user@]host:]<filesystem>\n" 1734 "hammer mirror-stream [[user@]host:]<filesystem>" 1735 " [[user@]host:]<filesystem>\n" 1736 ); 1737 exit(code); 1738 } 1739 1740