1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 35 #include "hammer.h" 36 37 #define LINE1 0,20 38 #define LINE2 20,78 39 #define LINE3 90,70 40 41 #define SERIALBUF_SIZE (512 * 1024) 42 43 typedef struct histogram { 44 hammer_tid_t tid; 45 u_int64_t bytes; 46 } *histogram_t; 47 48 static int read_mrecords(int fd, char *buf, u_int size, 49 hammer_ioc_mrecord_head_t pickup); 50 static int generate_histogram(int fd, const char *filesystem, 51 histogram_t *histogram_ary, 52 struct hammer_ioc_mirror_rw *mirror_base, 53 int *repeatp); 54 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 55 hammer_ioc_mrecord_head_t pickup); 56 static void write_mrecord(int fdout, u_int32_t type, 57 hammer_ioc_mrecord_any_t mrec, int bytes); 58 static void generate_mrec_header(int fd, int pfs_id, 59 union hammer_ioc_mrecord_any *mrec_tmp); 60 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 61 struct hammer_ioc_mrecord_head *pickup, 62 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 63 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 64 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 65 u_int64_t *bwcount, struct timeval *tv1); 66 static int getyn(void); 67 static void mirror_usage(int code); 68 69 /* 70 * Generate a mirroring data stream from the specific source over the 71 * entire key range, but restricted to the specified transaction range. 72 * 73 * The HAMMER VFS does most of the work, we add a few new mrecord 74 * types to negotiate the TID ranges and verify that the entire 75 * stream made it to the destination. 76 * 77 * streaming will be 0 for mirror-read, 1 for mirror-stream. The code will 78 * set up a fake value of -1 when running the histogram for mirror-read. 79 */ 80 void 81 hammer_cmd_mirror_read(char **av, int ac, int streaming) 82 { 83 struct hammer_ioc_mirror_rw mirror; 84 struct hammer_ioc_pseudofs_rw pfs; 85 union hammer_ioc_mrecord_any mrec_tmp; 86 struct hammer_ioc_mrecord_head pickup; 87 hammer_ioc_mrecord_any_t mrec; 88 hammer_tid_t sync_tid; 89 histogram_t histogram_ary; 90 char *filesystem; 91 char *buf = malloc(SERIALBUF_SIZE); 92 int interrupted = 0; 93 int error; 94 int fd; 95 int n; 96 int didwork; 97 int histogram; 98 int histindex; 99 int histmax; 100 int repeat = 0; 101 int sameline; 102 int64_t total_bytes; 103 time_t base_t = time(NULL); 104 struct timeval bwtv; 105 u_int64_t bwcount; 106 u_int64_t estbytes; 107 108 if (ac == 0 || ac > 2) 109 mirror_usage(1); 110 filesystem = av[0]; 111 hammer_check_restrict(filesystem); 112 113 pickup.signature = 0; 114 pickup.type = 0; 115 histogram = 0; 116 histindex = 0; 117 histmax = 0; 118 histogram_ary = NULL; 119 sameline = 0; 120 121 again: 122 bzero(&mirror, sizeof(mirror)); 123 hammer_key_beg_init(&mirror.key_beg); 124 hammer_key_end_init(&mirror.key_end); 125 126 fd = getpfs(&pfs, filesystem); 127 128 if (streaming >= 0) 129 score_printf(LINE1, "Running"); 130 131 if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) { 132 fprintf(stderr, "%cRunning \b\b", (sameline ? '\r' : '\n')); 133 fflush(stderr); 134 sameline = 1; 135 } 136 sameline = 1; 137 total_bytes = 0; 138 gettimeofday(&bwtv, NULL); 139 bwcount = 0; 140 141 /* 142 * Send initial header for the purpose of determining the 143 * shared-uuid. 144 */ 145 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 146 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 147 &mrec_tmp, sizeof(mrec_tmp.pfs)); 148 149 /* 150 * In 2-way mode the target will send us a PFS info packet 151 * first. Use the target's current snapshot TID as our default 152 * begin TID. 153 */ 154 if (TwoWayPipeOpt) { 155 mirror.tid_beg = 0; 156 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 157 NULL, &mirror.tid_beg); 158 if (n < 0) { /* got TERM record */ 159 relpfs(fd, &pfs); 160 free(buf); 161 free(histogram_ary); 162 return; 163 } 164 ++mirror.tid_beg; 165 } else if (streaming && histogram) { 166 mirror.tid_beg = histogram_ary[histindex].tid + 1; 167 } else { 168 mirror.tid_beg = 0; 169 } 170 171 /* 172 * Write out the PFS header, tid_beg will be updated if our PFS 173 * has a larger begin sync. tid_end is set to the latest source 174 * TID whos flush cycle has completed. 175 */ 176 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 177 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 178 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 179 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 180 mirror.ubuf = buf; 181 mirror.size = SERIALBUF_SIZE; 182 mirror.pfs_id = pfs.pfs_id; 183 mirror.shared_uuid = pfs.ondisk->shared_uuid; 184 185 /* 186 * XXX If the histogram is exhausted and the TID delta is large 187 * the stream might have been offline for a while and is 188 * now picking it up again. Do another histogram. 189 */ 190 #if 0 191 if (streaming && histogram && histindex == histend) { 192 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM) 193 histogram = 0; 194 } 195 #endif 196 197 /* 198 * Initial bulk startup control, try to do some incremental 199 * mirroring in order to allow the stream to be killed and 200 * restarted without having to start over. 201 */ 202 if (histogram == 0 && BulkOpt == 0) { 203 if (VerboseOpt && repeat == 0) { 204 fprintf(stderr, "\n"); 205 sameline = 0; 206 } 207 histmax = generate_histogram(fd, filesystem, 208 &histogram_ary, &mirror, 209 &repeat); 210 histindex = 0; 211 histogram = 1; 212 213 /* 214 * Just stream the histogram, then stop 215 */ 216 if (streaming == 0) 217 streaming = -1; 218 } 219 220 if (streaming && histogram) { 221 ++histindex; 222 mirror.tid_end = histogram_ary[histindex].tid; 223 estbytes = histogram_ary[histindex-1].bytes; 224 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end; 225 } else { 226 estbytes = 0; 227 } 228 229 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 230 &mrec_tmp, sizeof(mrec_tmp.pfs)); 231 232 /* 233 * A cycle file overrides the beginning TID only if we are 234 * not operating in two-way or histogram mode. 235 */ 236 if (TwoWayPipeOpt == 0 && histogram == 0) { 237 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 238 } 239 240 /* 241 * An additional argument overrides the beginning TID regardless 242 * of what mode we are in. This is not recommending if operating 243 * in two-way mode. 244 */ 245 if (ac == 2) 246 mirror.tid_beg = strtoull(av[1], NULL, 0); 247 248 if (streaming == 0 || VerboseOpt >= 2) { 249 fprintf(stderr, 250 "Mirror-read: Mirror %016jx to %016jx", 251 (uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end); 252 if (histogram) 253 fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes); 254 fprintf(stderr, "\n"); 255 fflush(stderr); 256 } 257 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 258 fprintf(stderr, "Mirror-read: Resuming at object %016jx\n", 259 (uintmax_t)mirror.key_beg.obj_id); 260 } 261 262 /* 263 * Nothing to do if begin equals end. 264 */ 265 if (mirror.tid_beg >= mirror.tid_end) { 266 if (streaming == 0 || VerboseOpt >= 2) 267 fprintf(stderr, "Mirror-read: No work to do\n"); 268 sleep(DelayOpt); 269 didwork = 0; 270 histogram = 0; 271 goto done; 272 } 273 didwork = 1; 274 275 /* 276 * Write out bulk records 277 */ 278 mirror.ubuf = buf; 279 mirror.size = SERIALBUF_SIZE; 280 281 do { 282 mirror.count = 0; 283 mirror.pfs_id = pfs.pfs_id; 284 mirror.shared_uuid = pfs.ondisk->shared_uuid; 285 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 286 score_printf(LINE3, "Mirror-read %s failed: %s", 287 filesystem, strerror(errno)); 288 fprintf(stderr, "Mirror-read %s failed: %s\n", 289 filesystem, strerror(errno)); 290 exit(1); 291 } 292 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 293 score_printf(LINE3, "Mirror-read %s fatal error %d", 294 filesystem, mirror.head.error); 295 fprintf(stderr, 296 "Mirror-read %s fatal error %d\n", 297 filesystem, mirror.head.error); 298 exit(1); 299 } 300 if (mirror.count) { 301 if (BandwidthOpt) { 302 n = writebw(1, mirror.ubuf, mirror.count, 303 &bwcount, &bwtv); 304 } else { 305 n = write(1, mirror.ubuf, mirror.count); 306 } 307 if (n != mirror.count) { 308 score_printf(LINE3, 309 "Mirror-read %s failed: " 310 "short write", 311 filesystem); 312 fprintf(stderr, 313 "Mirror-read %s failed: " 314 "short write\n", 315 filesystem); 316 exit(1); 317 } 318 } 319 total_bytes += mirror.count; 320 if (streaming && VerboseOpt) { 321 fprintf(stderr, 322 "\rscan obj=%016jx tids=%016jx:%016jx %11jd", 323 (uintmax_t)mirror.key_cur.obj_id, 324 (uintmax_t)mirror.tid_beg, 325 (uintmax_t)mirror.tid_end, 326 (intmax_t)total_bytes); 327 fflush(stderr); 328 sameline = 0; 329 } else if (streaming) { 330 score_printf(LINE2, 331 "obj=%016jx tids=%016jx:%016jx %11jd", 332 (uintmax_t)mirror.key_cur.obj_id, 333 (uintmax_t)mirror.tid_beg, 334 (uintmax_t)mirror.tid_end, 335 (intmax_t)total_bytes); 336 } 337 mirror.key_beg = mirror.key_cur; 338 339 /* 340 * Deal with time limit option 341 */ 342 if (TimeoutOpt && 343 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 344 score_printf(LINE3, 345 "Mirror-read %s interrupted by timer at" 346 " %016jx", 347 filesystem, 348 (uintmax_t)mirror.key_cur.obj_id); 349 fprintf(stderr, 350 "Mirror-read %s interrupted by timer at" 351 " %016jx\n", 352 filesystem, 353 (uintmax_t)mirror.key_cur.obj_id); 354 interrupted = 1; 355 break; 356 } 357 } while (mirror.count != 0); 358 359 done: 360 if (streaming && VerboseOpt && sameline == 0) { 361 fprintf(stderr, "\n"); 362 fflush(stderr); 363 sameline = 1; 364 } 365 366 /* 367 * Write out the termination sync record - only if not interrupted 368 */ 369 if (interrupted == 0) { 370 if (didwork) { 371 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 372 &mrec_tmp, sizeof(mrec_tmp.sync)); 373 } else { 374 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 375 &mrec_tmp, sizeof(mrec_tmp.sync)); 376 } 377 } 378 379 /* 380 * If the -2 option was given (automatic when doing mirror-copy), 381 * a two-way pipe is assumed and we expect a response mrec from 382 * the target. 383 */ 384 if (TwoWayPipeOpt) { 385 mrec = read_mrecord(0, &error, &pickup); 386 if (mrec == NULL || 387 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 388 mrec->head.rec_size != sizeof(mrec->update)) { 389 fprintf(stderr, "mirror_read: Did not get final " 390 "acknowledgement packet from target\n"); 391 exit(1); 392 } 393 if (interrupted) { 394 if (CyclePath) { 395 hammer_set_cycle(&mirror.key_cur, 396 mirror.tid_beg); 397 fprintf(stderr, "Cyclefile %s updated for " 398 "continuation\n", CyclePath); 399 } 400 } else { 401 sync_tid = mrec->update.tid; 402 if (CyclePath) { 403 hammer_key_beg_init(&mirror.key_beg); 404 hammer_set_cycle(&mirror.key_beg, sync_tid); 405 fprintf(stderr, 406 "Cyclefile %s updated to 0x%016jx\n", 407 CyclePath, (uintmax_t)sync_tid); 408 } 409 } 410 free(mrec); 411 } else if (CyclePath) { 412 /* NOTE! mirror.tid_beg cannot be updated */ 413 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 414 "fully updated unless you use mirror-copy\n"); 415 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 416 } 417 if (streaming && interrupted == 0) { 418 time_t t1 = time(NULL); 419 time_t t2; 420 421 /* 422 * Try to break down large bulk transfers into smaller ones 423 * so it can sync the transaction id on the slave. This 424 * way if we get interrupted a restart doesn't have to 425 * start from scratch. 426 */ 427 if (streaming && histogram) { 428 if (histindex != histmax) { 429 if (VerboseOpt && VerboseOpt < 2 && 430 streaming >= 0) { 431 fprintf(stderr, " (bulk incremental)"); 432 } 433 relpfs(fd, &pfs); 434 goto again; 435 } 436 } 437 438 if (VerboseOpt && streaming >= 0) { 439 fprintf(stderr, " W"); 440 fflush(stderr); 441 } else if (streaming >= 0) { 442 score_printf(LINE1, "Waiting"); 443 } 444 pfs.ondisk->sync_end_tid = mirror.tid_end; 445 if (streaming < 0) { 446 /* 447 * Fake streaming mode when using a histogram to 448 * break up a mirror-read, do not wait on source. 449 */ 450 streaming = 0; 451 } else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 452 score_printf(LINE3, 453 "Mirror-read %s: cannot stream: %s\n", 454 filesystem, strerror(errno)); 455 fprintf(stderr, 456 "Mirror-read %s: cannot stream: %s\n", 457 filesystem, strerror(errno)); 458 } else { 459 t2 = time(NULL) - t1; 460 if (t2 >= 0 && t2 < DelayOpt) { 461 if (VerboseOpt) { 462 fprintf(stderr, "\bD"); 463 fflush(stderr); 464 } 465 sleep(DelayOpt - t2); 466 } 467 if (VerboseOpt) { 468 fprintf(stderr, "\b "); 469 fflush(stderr); 470 } 471 relpfs(fd, &pfs); 472 goto again; 473 } 474 } 475 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 476 &mrec_tmp, sizeof(mrec_tmp.sync)); 477 relpfs(fd, &pfs); 478 free(buf); 479 free(histogram_ary); 480 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 481 } 482 483 /* 484 * What we are trying to do here is figure out how much data is 485 * going to be sent for the TID range and to break the TID range 486 * down into reasonably-sized slices (from the point of view of 487 * data sent) so a lost connection can restart at a reasonable 488 * place and not all the way back at the beginning. 489 * 490 * An entry's TID serves as the end_tid for the prior entry 491 * So we have to offset the calculation by 1 so that TID falls into 492 * the previous entry when populating entries. 493 * 494 * Because the transaction id space is bursty we need a relatively 495 * large number of buckets (like a million) to do a reasonable job 496 * for things like an initial bulk mirrors on a very large filesystem. 497 */ 498 #define HIST_COUNT (1024 * 1024) 499 500 static int 501 generate_histogram(int fd, const char *filesystem, 502 histogram_t *histogram_ary, 503 struct hammer_ioc_mirror_rw *mirror_base, 504 int *repeatp) 505 { 506 struct hammer_ioc_mirror_rw mirror; 507 union hammer_ioc_mrecord_any *mrec; 508 hammer_tid_t tid_beg; 509 hammer_tid_t tid_end; 510 hammer_tid_t tid; 511 hammer_tid_t tidx; 512 u_int64_t *tid_bytes; 513 u_int64_t total; 514 u_int64_t accum; 515 int chunkno; 516 int i; 517 int res; 518 int off; 519 int len; 520 521 mirror = *mirror_base; 522 tid_beg = mirror.tid_beg; 523 tid_end = mirror.tid_end; 524 mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA; 525 526 if (*histogram_ary == NULL) { 527 *histogram_ary = malloc(sizeof(struct histogram) * 528 (HIST_COUNT + 2)); 529 } 530 if (tid_beg >= tid_end) 531 return(0); 532 533 /* needs 2 extra */ 534 tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2)); 535 bzero(tid_bytes, sizeof(*tid_bytes) * (HIST_COUNT + 2)); 536 537 if (*repeatp == 0) { 538 fprintf(stderr, "Prescan to break up bulk transfer"); 539 if (VerboseOpt > 1) 540 fprintf(stderr, " (%juMB chunks)", 541 (uintmax_t)(SplitupOpt / (1024 * 1024))); 542 fprintf(stderr, "\n"); 543 } 544 545 /* 546 * Note: (tid_beg,tid_end), range is inclusive of both beg & end. 547 * 548 * Note: Estimates can be off when the mirror is way behind due 549 * to skips. 550 */ 551 total = 0; 552 accum = 0; 553 chunkno = 0; 554 for (;;) { 555 mirror.count = 0; 556 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 557 fprintf(stderr, "Mirror-read %s failed: %s\n", 558 filesystem, strerror(errno)); 559 exit(1); 560 } 561 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 562 fprintf(stderr, 563 "Mirror-read %s fatal error %d\n", 564 filesystem, mirror.head.error); 565 exit(1); 566 } 567 for (off = 0; 568 off < mirror.count; 569 off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size) 570 ) { 571 mrec = (void *)((char *)mirror.ubuf + off); 572 573 /* 574 * We only care about general RECs and PASS 575 * records. We ignore SKIPs. 576 */ 577 switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) { 578 case HAMMER_MREC_TYPE_REC: 579 case HAMMER_MREC_TYPE_PASS: 580 break; 581 default: 582 continue; 583 } 584 585 /* 586 * Calculate for two indices, create_tid and 587 * delete_tid. Record data only applies to 588 * the create_tid. 589 * 590 * When tid is exactly on the boundary it really 591 * belongs to the previous entry because scans 592 * are inclusive of the ending entry. 593 */ 594 tid = mrec->rec.leaf.base.delete_tid; 595 if (tid && tid >= tid_beg && tid <= tid_end) { 596 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 597 if (mrec->head.type == 598 HAMMER_MREC_TYPE_REC) { 599 len -= HAMMER_HEAD_DOALIGN( 600 mrec->rec.leaf.data_len); 601 assert(len > 0); 602 } 603 i = (tid - tid_beg) * HIST_COUNT / 604 (tid_end - tid_beg); 605 tidx = tid_beg + i * (tid_end - tid_beg) / 606 HIST_COUNT; 607 if (tid == tidx && i) 608 --i; 609 assert(i >= 0 && i < HIST_COUNT); 610 tid_bytes[i] += len; 611 total += len; 612 accum += len; 613 } 614 615 tid = mrec->rec.leaf.base.create_tid; 616 if (tid && tid >= tid_beg && tid <= tid_end) { 617 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 618 if (mrec->head.type == 619 HAMMER_MREC_TYPE_REC_NODATA) { 620 len += HAMMER_HEAD_DOALIGN( 621 mrec->rec.leaf.data_len); 622 } 623 i = (tid - tid_beg) * HIST_COUNT / 624 (tid_end - tid_beg); 625 tidx = tid_beg + i * (tid_end - tid_beg) / 626 HIST_COUNT; 627 if (tid == tidx && i) 628 --i; 629 assert(i >= 0 && i < HIST_COUNT); 630 tid_bytes[i] += len; 631 total += len; 632 accum += len; 633 } 634 } 635 if (*repeatp == 0 && accum > SplitupOpt) { 636 if (VerboseOpt > 1) { 637 fprintf(stderr, "."); 638 fflush(stderr); 639 } 640 ++chunkno; 641 score_printf(LINE2, "Prescan chunk %d", chunkno); 642 accum = 0; 643 } 644 if (mirror.count == 0) 645 break; 646 mirror.key_beg = mirror.key_cur; 647 } 648 649 /* 650 * Reduce to SplitupOpt (default 4GB) chunks. This code may 651 * use up to two additional elements. Do the array in-place. 652 * 653 * Inefficient degenerate cases can occur if we do not accumulate 654 * at least the requested split amount, so error on the side of 655 * going over a bit. 656 */ 657 res = 0; 658 (*histogram_ary)[res].tid = tid_beg; 659 (*histogram_ary)[res].bytes = tid_bytes[0]; 660 for (i = 1; i < HIST_COUNT; ++i) { 661 if ((*histogram_ary)[res].bytes >= SplitupOpt) { 662 ++res; 663 (*histogram_ary)[res].tid = tid_beg + 664 i * (tid_end - tid_beg) / 665 HIST_COUNT; 666 (*histogram_ary)[res].bytes = 0; 667 668 } 669 (*histogram_ary)[res].bytes += tid_bytes[i]; 670 } 671 ++res; 672 (*histogram_ary)[res].tid = tid_end; 673 (*histogram_ary)[res].bytes = -1; 674 675 if (*repeatp == 0) { 676 if (VerboseOpt > 1) 677 fprintf(stderr, "\n"); /* newline after ... */ 678 score_printf(LINE3, "Prescan %d chunks, total %ju MBytes", 679 res, (uintmax_t)total / (1024 * 1024)); 680 fprintf(stderr, "Prescan %d chunks, total %ju MBytes (", 681 res, (uintmax_t)total / (1024 * 1024)); 682 for (i = 0; i < res && i < 3; ++i) { 683 if (i) 684 fprintf(stderr, ", "); 685 fprintf(stderr, "%ju", 686 (uintmax_t)(*histogram_ary)[i].bytes); 687 } 688 if (i < res) 689 fprintf(stderr, ", ..."); 690 fprintf(stderr, ")\n"); 691 } 692 assert(res <= HIST_COUNT); 693 *repeatp = 1; 694 695 free(tid_bytes); 696 return(res); 697 } 698 699 static void 700 create_pfs(const char *filesystem, uuid_t *s_uuid) 701 { 702 if (ForceYesOpt == 1) { 703 fprintf(stderr, "PFS slave %s does not exist. " 704 "Auto create new slave PFS!\n", filesystem); 705 706 } else { 707 fprintf(stderr, "PFS slave %s does not exist.\n" 708 "Do you want to create a new slave PFS? (yes|no) ", 709 filesystem); 710 fflush(stderr); 711 if (getyn() != 1) { 712 fprintf(stderr, "Aborting operation\n"); 713 exit(1); 714 } 715 } 716 717 u_int32_t status; 718 char *shared_uuid = NULL; 719 uuid_to_string(s_uuid, &shared_uuid, &status); 720 721 char *cmd = NULL; 722 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 723 filesystem, shared_uuid); 724 free(shared_uuid); 725 726 if (cmd == NULL) { 727 fprintf(stderr, "Failed to alloc memory\n"); 728 exit(1); 729 } 730 if (system(cmd) != 0) { 731 fprintf(stderr, "Failed to create PFS\n"); 732 } 733 free(cmd); 734 } 735 736 /* 737 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 738 * some additional packet types to negotiate TID ranges and to verify 739 * completion. The HAMMER VFS does most of the work. 740 * 741 * It is important to note that the mirror.key_{beg,end} range must 742 * match the ranged used by the original. For now both sides use 743 * range the entire key space. 744 * 745 * It is even more important that the records in the stream conform 746 * to the TID range also supplied in the stream. The HAMMER VFS will 747 * use the REC, PASS, and SKIP record types to track the portions of 748 * the B-Tree being scanned in order to be able to proactively delete 749 * records on the target within those active areas that are not mentioned 750 * by the source. 751 * 752 * The mirror.key_cur field is used by the VFS to do this tracking. It 753 * must be initialized to key_beg but then is persistently updated by 754 * the HAMMER VFS on each successive ioctl() call. If you blow up this 755 * field you will blow up the mirror target, possibly to the point of 756 * deleting everything. As a safety measure the HAMMER VFS simply marks 757 * the records that the source has destroyed as deleted on the target, 758 * and normal pruning operations will deal with their final disposition 759 * at some later time. 760 */ 761 void 762 hammer_cmd_mirror_write(char **av, int ac) 763 { 764 struct hammer_ioc_mirror_rw mirror; 765 char *filesystem; 766 char *buf = malloc(SERIALBUF_SIZE); 767 struct hammer_ioc_pseudofs_rw pfs; 768 struct hammer_ioc_mrecord_head pickup; 769 struct hammer_ioc_synctid synctid; 770 union hammer_ioc_mrecord_any mrec_tmp; 771 hammer_ioc_mrecord_any_t mrec; 772 struct stat st; 773 int error; 774 int fd; 775 int n; 776 777 if (ac != 1) 778 mirror_usage(1); 779 filesystem = av[0]; 780 hammer_check_restrict(filesystem); 781 782 pickup.signature = 0; 783 pickup.type = 0; 784 785 again: 786 bzero(&mirror, sizeof(mirror)); 787 hammer_key_beg_init(&mirror.key_beg); 788 hammer_key_end_init(&mirror.key_end); 789 mirror.key_end = mirror.key_beg; 790 791 /* 792 * Read initial packet 793 */ 794 mrec = read_mrecord(0, &error, &pickup); 795 if (mrec == NULL) { 796 if (error == 0) 797 fprintf(stderr, "validate_mrec_header: short read\n"); 798 exit(1); 799 } 800 /* 801 * Validate packet 802 */ 803 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 804 free(buf); 805 return; 806 } 807 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 808 fprintf(stderr, "validate_mrec_header: did not get expected " 809 "PFSD record type\n"); 810 exit(1); 811 } 812 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 813 fprintf(stderr, "validate_mrec_header: unexpected payload " 814 "size\n"); 815 exit(1); 816 } 817 818 /* 819 * Create slave PFS if it doesn't yet exist 820 */ 821 if (lstat(filesystem, &st) != 0) { 822 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 823 } 824 free(mrec); 825 mrec = NULL; 826 827 fd = getpfs(&pfs, filesystem); 828 829 /* 830 * In two-way mode the target writes out a PFS packet first. 831 * The source uses our tid_end as its tid_beg by default, 832 * picking up where it left off. 833 */ 834 mirror.tid_beg = 0; 835 if (TwoWayPipeOpt) { 836 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 837 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 838 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 839 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 840 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 841 &mrec_tmp, sizeof(mrec_tmp.pfs)); 842 } 843 844 /* 845 * Read and process the PFS header. The source informs us of 846 * the TID range the stream represents. 847 */ 848 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 849 &mirror.tid_beg, &mirror.tid_end); 850 if (n < 0) { /* got TERM record */ 851 relpfs(fd, &pfs); 852 free(buf); 853 return; 854 } 855 856 mirror.ubuf = buf; 857 mirror.size = SERIALBUF_SIZE; 858 859 /* 860 * Read and process bulk records (REC, PASS, and SKIP types). 861 * 862 * On your life, do NOT mess with mirror.key_cur or your mirror 863 * target may become history. 864 */ 865 for (;;) { 866 mirror.count = 0; 867 mirror.pfs_id = pfs.pfs_id; 868 mirror.shared_uuid = pfs.ondisk->shared_uuid; 869 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 870 if (mirror.size <= 0) 871 break; 872 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 873 fprintf(stderr, "Mirror-write %s failed: %s\n", 874 filesystem, strerror(errno)); 875 exit(1); 876 } 877 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 878 fprintf(stderr, 879 "Mirror-write %s fatal error %d\n", 880 filesystem, mirror.head.error); 881 exit(1); 882 } 883 #if 0 884 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 885 fprintf(stderr, 886 "Mirror-write %s interrupted by timer at" 887 " %016llx\n", 888 filesystem, 889 mirror.key_cur.obj_id); 890 exit(0); 891 } 892 #endif 893 } 894 895 /* 896 * Read and process the termination sync record. 897 */ 898 mrec = read_mrecord(0, &error, &pickup); 899 900 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 901 fprintf(stderr, "Mirror-write: received termination request\n"); 902 relpfs(fd, &pfs); 903 free(mrec); 904 free(buf); 905 return; 906 } 907 908 if (mrec == NULL || 909 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 910 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 911 mrec->head.rec_size != sizeof(mrec->sync)) { 912 fprintf(stderr, "Mirror-write %s: Did not get termination " 913 "sync record, or rec_size is wrong rt=%d\n", 914 filesystem, 915 (mrec ? (int)mrec->head.type : -1)); 916 exit(1); 917 } 918 919 /* 920 * Update the PFS info on the target so the user has visibility 921 * into the new snapshot, and sync the target filesystem. 922 */ 923 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 924 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 925 926 bzero(&synctid, sizeof(synctid)); 927 synctid.op = HAMMER_SYNCTID_SYNC2; 928 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 929 930 if (VerboseOpt >= 2) { 931 fprintf(stderr, "Mirror-write %s: succeeded\n", 932 filesystem); 933 } 934 } 935 936 free(mrec); 937 mrec = NULL; 938 939 /* 940 * Report back to the originator. 941 */ 942 if (TwoWayPipeOpt) { 943 mrec_tmp.update.tid = mirror.tid_end; 944 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 945 &mrec_tmp, sizeof(mrec_tmp.update)); 946 } else { 947 printf("Source can update synctid to 0x%016jx\n", 948 (uintmax_t)mirror.tid_end); 949 } 950 relpfs(fd, &pfs); 951 goto again; 952 } 953 954 void 955 hammer_cmd_mirror_dump(char **av, int ac) 956 { 957 char *buf = malloc(SERIALBUF_SIZE); 958 struct hammer_ioc_mrecord_head pickup; 959 hammer_ioc_mrecord_any_t mrec; 960 int error; 961 int size; 962 int offset; 963 int bytes; 964 int header_only = 0; 965 966 if (ac == 1 && strcmp(*av, "header") == 0) 967 header_only = 1; 968 else if (ac != 0) 969 mirror_usage(1); 970 971 /* 972 * Read and process the PFS header 973 */ 974 pickup.signature = 0; 975 pickup.type = 0; 976 977 mrec = read_mrecord(0, &error, &pickup); 978 979 /* 980 * Dump the PFS header. mirror-dump takes its input from the output 981 * of a mirror-read so getpfs() can't be used to get a fd to be passed 982 * to dump_pfsd(). 983 */ 984 if (header_only && mrec != NULL) { 985 dump_pfsd(&mrec->pfs.pfsd, -1); 986 free(mrec); 987 free(buf); 988 return; 989 } 990 free(mrec); 991 992 again: 993 /* 994 * Read and process bulk records 995 */ 996 for (;;) { 997 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 998 if (size <= 0) 999 break; 1000 offset = 0; 1001 while (offset < size) { 1002 mrec = (void *)((char *)buf + offset); 1003 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 1004 if (offset + bytes > size) { 1005 fprintf(stderr, "Misaligned record\n"); 1006 exit(1); 1007 } 1008 1009 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 1010 case HAMMER_MREC_TYPE_REC_BADCRC: 1011 case HAMMER_MREC_TYPE_REC: 1012 printf("Record lo=%08x obj=%016jx key=%016jx " 1013 "rt=%02x ot=%02x", 1014 mrec->rec.leaf.base.localization, 1015 (uintmax_t)mrec->rec.leaf.base.obj_id, 1016 (uintmax_t)mrec->rec.leaf.base.key, 1017 mrec->rec.leaf.base.rec_type, 1018 mrec->rec.leaf.base.obj_type); 1019 if (mrec->head.type == 1020 HAMMER_MREC_TYPE_REC_BADCRC) { 1021 printf(" (BAD CRC)"); 1022 } 1023 printf("\n"); 1024 printf(" tids %016jx:%016jx data=%d\n", 1025 (uintmax_t)mrec->rec.leaf.base.create_tid, 1026 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1027 mrec->rec.leaf.data_len); 1028 break; 1029 case HAMMER_MREC_TYPE_PASS: 1030 printf("Pass lo=%08x obj=%016jx key=%016jx " 1031 "rt=%02x ot=%02x\n", 1032 mrec->rec.leaf.base.localization, 1033 (uintmax_t)mrec->rec.leaf.base.obj_id, 1034 (uintmax_t)mrec->rec.leaf.base.key, 1035 mrec->rec.leaf.base.rec_type, 1036 mrec->rec.leaf.base.obj_type); 1037 printf(" tids %016jx:%016jx data=%d\n", 1038 (uintmax_t)mrec->rec.leaf.base.create_tid, 1039 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1040 mrec->rec.leaf.data_len); 1041 break; 1042 case HAMMER_MREC_TYPE_SKIP: 1043 printf("Skip lo=%08x obj=%016jx key=%016jx rt=%02x to\n" 1044 " lo=%08x obj=%016jx key=%016jx rt=%02x\n", 1045 mrec->skip.skip_beg.localization, 1046 (uintmax_t)mrec->skip.skip_beg.obj_id, 1047 (uintmax_t)mrec->skip.skip_beg.key, 1048 mrec->skip.skip_beg.rec_type, 1049 mrec->skip.skip_end.localization, 1050 (uintmax_t)mrec->skip.skip_end.obj_id, 1051 (uintmax_t)mrec->skip.skip_end.key, 1052 mrec->skip.skip_end.rec_type); 1053 default: 1054 break; 1055 } 1056 offset += bytes; 1057 } 1058 } 1059 1060 /* 1061 * Read and process the termination sync record. 1062 */ 1063 mrec = read_mrecord(0, &error, &pickup); 1064 if (mrec == NULL || 1065 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 1066 mrec->head.type != HAMMER_MREC_TYPE_IDLE) 1067 ) { 1068 fprintf(stderr, "Mirror-dump: Did not get termination " 1069 "sync record\n"); 1070 } 1071 free(mrec); 1072 1073 /* 1074 * Continue with more batches until EOF. 1075 */ 1076 mrec = read_mrecord(0, &error, &pickup); 1077 if (mrec) { 1078 free(mrec); 1079 goto again; 1080 } 1081 free(buf); 1082 } 1083 1084 void 1085 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 1086 { 1087 pid_t pid1; 1088 pid_t pid2; 1089 int fds[2]; 1090 const char *xav[32]; 1091 char tbuf[16]; 1092 char *sh, *user, *host, *rfs; 1093 int xac; 1094 1095 if (ac != 2) 1096 mirror_usage(1); 1097 1098 TwoWayPipeOpt = 1; 1099 signal(SIGPIPE, SIG_IGN); 1100 1101 again: 1102 if (pipe(fds) < 0) { 1103 perror("pipe"); 1104 exit(1); 1105 } 1106 1107 /* 1108 * Source 1109 */ 1110 if ((pid1 = fork()) == 0) { 1111 signal(SIGPIPE, SIG_DFL); 1112 dup2(fds[0], 0); 1113 dup2(fds[0], 1); 1114 close(fds[0]); 1115 close(fds[1]); 1116 if ((rfs = strchr(av[0], ':')) != NULL) { 1117 xac = 0; 1118 1119 if((sh = getenv("HAMMER_RSH")) == NULL) 1120 xav[xac++] = "ssh"; 1121 else 1122 xav[xac++] = sh; 1123 1124 if (CompressOpt) 1125 xav[xac++] = "-C"; 1126 1127 if ((host = strchr(av[0], '@')) != NULL) { 1128 user = strndup( av[0], (host++ - av[0])); 1129 host = strndup( host, (rfs++ - host)); 1130 xav[xac++] = "-l"; 1131 xav[xac++] = user; 1132 xav[xac++] = host; 1133 } 1134 else { 1135 host = strndup( av[0], (rfs++ - av[0])); 1136 user = NULL; 1137 xav[xac++] = host; 1138 } 1139 1140 1141 if (SshPort) { 1142 xav[xac++] = "-p"; 1143 xav[xac++] = SshPort; 1144 } 1145 1146 xav[xac++] = "hammer"; 1147 1148 switch(VerboseOpt) { 1149 case 0: 1150 break; 1151 case 1: 1152 xav[xac++] = "-v"; 1153 break; 1154 case 2: 1155 xav[xac++] = "-vv"; 1156 break; 1157 default: 1158 xav[xac++] = "-vvv"; 1159 break; 1160 } 1161 if (ForceYesOpt) { 1162 xav[xac++] = "-y"; 1163 } 1164 xav[xac++] = "-2"; 1165 if (TimeoutOpt) { 1166 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 1167 xav[xac++] = "-t"; 1168 xav[xac++] = tbuf; 1169 } 1170 if (SplitupOptStr) { 1171 xav[xac++] = "-S"; 1172 xav[xac++] = SplitupOptStr; 1173 } 1174 if (streaming) 1175 xav[xac++] = "mirror-read-stream"; 1176 else 1177 xav[xac++] = "mirror-read"; 1178 xav[xac++] = rfs; 1179 xav[xac++] = NULL; 1180 execvp(*xav, (void *)xav); 1181 } else { 1182 hammer_cmd_mirror_read(av, 1, streaming); 1183 fflush(stdout); 1184 fflush(stderr); 1185 } 1186 _exit(1); 1187 } 1188 1189 /* 1190 * Target 1191 */ 1192 if ((pid2 = fork()) == 0) { 1193 signal(SIGPIPE, SIG_DFL); 1194 dup2(fds[1], 0); 1195 dup2(fds[1], 1); 1196 close(fds[0]); 1197 close(fds[1]); 1198 if ((rfs = strchr(av[1], ':')) != NULL) { 1199 xac = 0; 1200 1201 if((sh = getenv("HAMMER_RSH")) == NULL) 1202 xav[xac++] = "ssh"; 1203 else 1204 xav[xac++] = sh; 1205 1206 if (CompressOpt) 1207 xav[xac++] = "-C"; 1208 1209 if ((host = strchr(av[1], '@')) != NULL) { 1210 user = strndup( av[1], (host++ - av[1])); 1211 host = strndup( host, (rfs++ - host)); 1212 xav[xac++] = "-l"; 1213 xav[xac++] = user; 1214 xav[xac++] = host; 1215 } 1216 else { 1217 host = strndup( av[1], (rfs++ - av[1])); 1218 user = NULL; 1219 xav[xac++] = host; 1220 } 1221 1222 if (SshPort) { 1223 xav[xac++] = "-p"; 1224 xav[xac++] = SshPort; 1225 } 1226 1227 xav[xac++] = "hammer"; 1228 1229 switch(VerboseOpt) { 1230 case 0: 1231 break; 1232 case 1: 1233 xav[xac++] = "-v"; 1234 break; 1235 case 2: 1236 xav[xac++] = "-vv"; 1237 break; 1238 default: 1239 xav[xac++] = "-vvv"; 1240 break; 1241 } 1242 if (ForceYesOpt) { 1243 xav[xac++] = "-y"; 1244 } 1245 xav[xac++] = "-2"; 1246 xav[xac++] = "mirror-write"; 1247 xav[xac++] = rfs; 1248 xav[xac++] = NULL; 1249 execvp(*xav, (void *)xav); 1250 } else { 1251 hammer_cmd_mirror_write(av + 1, 1); 1252 fflush(stdout); 1253 fflush(stderr); 1254 } 1255 _exit(1); 1256 } 1257 close(fds[0]); 1258 close(fds[1]); 1259 1260 while (waitpid(pid1, NULL, 0) <= 0) 1261 ; 1262 while (waitpid(pid2, NULL, 0) <= 0) 1263 ; 1264 1265 /* 1266 * If the link is lost restart 1267 */ 1268 if (streaming) { 1269 if (VerboseOpt) { 1270 fprintf(stderr, "\nLost Link\n"); 1271 fflush(stderr); 1272 } 1273 sleep(15 + DelayOpt); 1274 goto again; 1275 } 1276 1277 } 1278 1279 /* 1280 * Read and return multiple mrecords 1281 */ 1282 static int 1283 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 1284 { 1285 hammer_ioc_mrecord_any_t mrec; 1286 u_int count; 1287 size_t n; 1288 size_t i; 1289 size_t bytes; 1290 int type; 1291 1292 count = 0; 1293 while (size - count >= HAMMER_MREC_HEADSIZE) { 1294 /* 1295 * Cached the record header in case we run out of buffer 1296 * space. 1297 */ 1298 fflush(stdout); 1299 if (pickup->signature == 0) { 1300 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1301 i = read(fd, (char *)pickup + n, 1302 HAMMER_MREC_HEADSIZE - n); 1303 if (i <= 0) 1304 break; 1305 } 1306 if (n == 0) 1307 break; 1308 if (n != HAMMER_MREC_HEADSIZE) { 1309 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1310 exit(1); 1311 } 1312 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1313 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1314 "bad signature\n"); 1315 exit(1); 1316 } 1317 } 1318 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 1319 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 1320 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1321 "illegal rec_size\n"); 1322 exit(1); 1323 } 1324 1325 /* 1326 * Stop if we have insufficient space for the record and data. 1327 */ 1328 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 1329 if (size - count < bytes) 1330 break; 1331 1332 /* 1333 * Stop if the record type is not a REC, SKIP, or PASS, 1334 * which are the only types the ioctl supports. Other types 1335 * are used only by the userland protocol. 1336 * 1337 * Ignore all flags. 1338 */ 1339 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 1340 if (type != HAMMER_MREC_TYPE_PFSD && 1341 type != HAMMER_MREC_TYPE_REC && 1342 type != HAMMER_MREC_TYPE_SKIP && 1343 type != HAMMER_MREC_TYPE_PASS) { 1344 break; 1345 } 1346 1347 /* 1348 * Read the remainder and clear the pickup signature. 1349 */ 1350 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 1351 i = read(fd, buf + count + n, bytes - n); 1352 if (i <= 0) 1353 break; 1354 } 1355 if (n != bytes) { 1356 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1357 exit(1); 1358 } 1359 1360 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 1361 pickup->signature = 0; 1362 pickup->type = 0; 1363 mrec = (void *)(buf + count); 1364 1365 /* 1366 * Validate the completed record 1367 */ 1368 if (mrec->head.rec_crc != 1369 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1370 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1371 fprintf(stderr, "read_mrecords: malformed record " 1372 "on pipe, bad crc\n"); 1373 exit(1); 1374 } 1375 1376 /* 1377 * If its a B-Tree record validate the data crc. 1378 * 1379 * NOTE: If the VFS passes us an explicitly errorde mrec 1380 * we just pass it through. 1381 */ 1382 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 1383 1384 if (type == HAMMER_MREC_TYPE_REC) { 1385 if (mrec->head.rec_size < 1386 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 1387 fprintf(stderr, 1388 "read_mrecords: malformed record on " 1389 "pipe, illegal element data_len\n"); 1390 exit(1); 1391 } 1392 if (mrec->rec.leaf.data_len && 1393 mrec->rec.leaf.data_offset && 1394 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 1395 fprintf(stderr, 1396 "read_mrecords: data_crc did not " 1397 "match data! obj=%016jx key=%016jx\n", 1398 (uintmax_t)mrec->rec.leaf.base.obj_id, 1399 (uintmax_t)mrec->rec.leaf.base.key); 1400 fprintf(stderr, 1401 "continuing, but there are problems\n"); 1402 } 1403 } 1404 count += bytes; 1405 } 1406 return(count); 1407 } 1408 1409 /* 1410 * Read and return a single mrecord. 1411 */ 1412 static 1413 hammer_ioc_mrecord_any_t 1414 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 1415 { 1416 hammer_ioc_mrecord_any_t mrec; 1417 struct hammer_ioc_mrecord_head mrechd; 1418 size_t bytes; 1419 size_t n; 1420 size_t i; 1421 1422 if (pickup && pickup->type != 0) { 1423 mrechd = *pickup; 1424 pickup->signature = 0; 1425 pickup->type = 0; 1426 n = HAMMER_MREC_HEADSIZE; 1427 } else { 1428 /* 1429 * Read in the PFSD header from the sender. 1430 */ 1431 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1432 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 1433 if (i <= 0) 1434 break; 1435 } 1436 if (n == 0) { 1437 *errorp = 0; /* EOF */ 1438 return(NULL); 1439 } 1440 if (n != HAMMER_MREC_HEADSIZE) { 1441 fprintf(stderr, "short read of mrecord header\n"); 1442 *errorp = EPIPE; 1443 return(NULL); 1444 } 1445 } 1446 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1447 fprintf(stderr, "read_mrecord: bad signature\n"); 1448 *errorp = EINVAL; 1449 return(NULL); 1450 } 1451 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 1452 assert(bytes >= sizeof(mrechd)); 1453 mrec = malloc(bytes); 1454 mrec->head = mrechd; 1455 1456 while (n < bytes) { 1457 i = read(fdin, (char *)mrec + n, bytes - n); 1458 if (i <= 0) 1459 break; 1460 n += i; 1461 } 1462 if (n != bytes) { 1463 fprintf(stderr, "read_mrecord: short read on payload\n"); 1464 *errorp = EPIPE; 1465 return(NULL); 1466 } 1467 if (mrec->head.rec_crc != 1468 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1469 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1470 fprintf(stderr, "read_mrecord: bad CRC\n"); 1471 *errorp = EINVAL; 1472 return(NULL); 1473 } 1474 *errorp = 0; 1475 return(mrec); 1476 } 1477 1478 static 1479 void 1480 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec, 1481 int bytes) 1482 { 1483 char zbuf[HAMMER_HEAD_ALIGN]; 1484 int pad; 1485 1486 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 1487 1488 assert(bytes >= (int)sizeof(mrec->head)); 1489 bzero(&mrec->head, sizeof(mrec->head)); 1490 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 1491 mrec->head.type = type; 1492 mrec->head.rec_size = bytes; 1493 mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1494 bytes - HAMMER_MREC_CRCOFF); 1495 if (write(fdout, mrec, bytes) != bytes) { 1496 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1497 errno, strerror(errno)); 1498 exit(1); 1499 } 1500 if (pad) { 1501 bzero(zbuf, pad); 1502 if (write(fdout, zbuf, pad) != pad) { 1503 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1504 errno, strerror(errno)); 1505 exit(1); 1506 } 1507 } 1508 } 1509 1510 /* 1511 * Generate a mirroring header with the pfs information of the 1512 * originating filesytem. 1513 */ 1514 static void 1515 generate_mrec_header(int fd, int pfs_id, 1516 union hammer_ioc_mrecord_any *mrec_tmp) 1517 { 1518 struct hammer_ioc_pseudofs_rw pfs; 1519 1520 bzero(&pfs, sizeof(pfs)); 1521 bzero(mrec_tmp, sizeof(*mrec_tmp)); 1522 pfs.pfs_id = pfs_id; 1523 pfs.ondisk = &mrec_tmp->pfs.pfsd; 1524 pfs.bytes = sizeof(mrec_tmp->pfs.pfsd); 1525 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1526 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 1527 exit(1); 1528 } 1529 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1530 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n"); 1531 exit(1); 1532 } 1533 mrec_tmp->pfs.version = pfs.version; 1534 } 1535 1536 /* 1537 * Validate the pfs information from the originating filesystem 1538 * against the target filesystem. shared_uuid must match. 1539 * 1540 * return -1 if we got a TERM record 1541 */ 1542 static int 1543 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1544 struct hammer_ioc_mrecord_head *pickup, 1545 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1546 { 1547 struct hammer_ioc_pseudofs_rw pfs; 1548 struct hammer_pseudofs_data pfsd; 1549 hammer_ioc_mrecord_any_t mrec; 1550 int error; 1551 1552 /* 1553 * Get the PFSD info from the target filesystem. 1554 */ 1555 bzero(&pfs, sizeof(pfs)); 1556 bzero(&pfsd, sizeof(pfsd)); 1557 pfs.pfs_id = pfs_id; 1558 pfs.ondisk = &pfsd; 1559 pfs.bytes = sizeof(pfsd); 1560 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1561 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 1562 exit(1); 1563 } 1564 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1565 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n"); 1566 exit(1); 1567 } 1568 1569 mrec = read_mrecord(fdin, &error, pickup); 1570 if (mrec == NULL) { 1571 if (error == 0) 1572 fprintf(stderr, "validate_mrec_header: short read\n"); 1573 exit(1); 1574 } 1575 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1576 free(mrec); 1577 return(-1); 1578 } 1579 1580 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1581 fprintf(stderr, "validate_mrec_header: did not get expected " 1582 "PFSD record type\n"); 1583 exit(1); 1584 } 1585 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1586 fprintf(stderr, "validate_mrec_header: unexpected payload " 1587 "size\n"); 1588 exit(1); 1589 } 1590 if (mrec->pfs.version != pfs.version) { 1591 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 1592 exit(1); 1593 } 1594 1595 /* 1596 * Whew. Ok, is the read PFS info compatible with the target? 1597 */ 1598 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1599 sizeof(pfsd.shared_uuid)) != 0) { 1600 fprintf(stderr, 1601 "mirror-write: source and target have " 1602 "different shared-uuid's!\n"); 1603 exit(1); 1604 } 1605 if (is_target && 1606 (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) { 1607 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1608 exit(1); 1609 } 1610 if (tid_begp) 1611 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1612 if (tid_endp) 1613 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1614 free(mrec); 1615 return(0); 1616 } 1617 1618 static void 1619 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1620 { 1621 struct hammer_ioc_pseudofs_rw pfs; 1622 struct hammer_pseudofs_data pfsd; 1623 1624 bzero(&pfs, sizeof(pfs)); 1625 bzero(&pfsd, sizeof(pfsd)); 1626 pfs.pfs_id = pfs_id; 1627 pfs.ondisk = &pfsd; 1628 pfs.bytes = sizeof(pfsd); 1629 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1630 perror("update_pfs_snapshot (read)"); 1631 exit(1); 1632 } 1633 if (pfsd.sync_end_tid != snapshot_tid) { 1634 pfsd.sync_end_tid = snapshot_tid; 1635 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1636 perror("update_pfs_snapshot (rewrite)"); 1637 exit(1); 1638 } 1639 if (VerboseOpt >= 2) { 1640 fprintf(stderr, 1641 "Mirror-write: Completed, updated snapshot " 1642 "to %016jx\n", 1643 (uintmax_t)snapshot_tid); 1644 fflush(stderr); 1645 } 1646 } 1647 } 1648 1649 /* 1650 * Bandwidth-limited write in chunks 1651 */ 1652 static 1653 ssize_t 1654 writebw(int fd, const void *buf, size_t nbytes, 1655 u_int64_t *bwcount, struct timeval *tv1) 1656 { 1657 struct timeval tv2; 1658 size_t n; 1659 ssize_t r; 1660 ssize_t a; 1661 int usec; 1662 1663 a = 0; 1664 r = 0; 1665 while (nbytes) { 1666 if (*bwcount + nbytes > BandwidthOpt) 1667 n = BandwidthOpt - *bwcount; 1668 else 1669 n = nbytes; 1670 if (n) 1671 r = write(fd, buf, n); 1672 if (r >= 0) { 1673 a += r; 1674 nbytes -= r; 1675 buf = (const char *)buf + r; 1676 } 1677 if ((size_t)r != n) 1678 break; 1679 *bwcount += n; 1680 if (*bwcount >= BandwidthOpt) { 1681 gettimeofday(&tv2, NULL); 1682 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1683 (int)(tv2.tv_usec - tv1->tv_usec); 1684 if (usec >= 0 && usec < 1000000) 1685 usleep(1000000 - usec); 1686 gettimeofday(tv1, NULL); 1687 *bwcount -= BandwidthOpt; 1688 } 1689 } 1690 return(a ? a : r); 1691 } 1692 1693 /* 1694 * Get a yes or no answer from the terminal. The program may be run as 1695 * part of a two-way pipe so we cannot use stdin for this operation. 1696 */ 1697 static int 1698 getyn(void) 1699 { 1700 char buf[256]; 1701 FILE *fp; 1702 int result; 1703 1704 fp = fopen("/dev/tty", "r"); 1705 if (fp == NULL) { 1706 fprintf(stderr, "No terminal for response\n"); 1707 return(-1); 1708 } 1709 result = -1; 1710 while (fgets(buf, sizeof(buf), fp) != NULL) { 1711 if (buf[0] == 'y' || buf[0] == 'Y') { 1712 result = 1; 1713 break; 1714 } 1715 if (buf[0] == 'n' || buf[0] == 'N') { 1716 result = 0; 1717 break; 1718 } 1719 fprintf(stderr, "Response not understood\n"); 1720 break; 1721 } 1722 fclose(fp); 1723 return(result); 1724 } 1725 1726 static void 1727 mirror_usage(int code) 1728 { 1729 fprintf(stderr, 1730 "hammer mirror-read <filesystem> [begin-tid]\n" 1731 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1732 "hammer mirror-write <filesystem>\n" 1733 "hammer mirror-dump [header]\n" 1734 "hammer mirror-copy [[user@]host:]<filesystem>" 1735 " [[user@]host:]<filesystem>\n" 1736 "hammer mirror-stream [[user@]host:]<filesystem>" 1737 " [[user@]host:]<filesystem>\n" 1738 ); 1739 exit(code); 1740 } 1741 1742