1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 35 #include "hammer.h" 36 37 #define LINE1 0,20 38 #define LINE2 20,78 39 #define LINE3 90,70 40 41 #define SERIALBUF_SIZE (512 * 1024) 42 43 typedef struct histogram { 44 hammer_tid_t tid; 45 u_int64_t bytes; 46 } *histogram_t; 47 48 static int read_mrecords(int fd, char *buf, u_int size, 49 hammer_ioc_mrecord_head_t pickup); 50 static int generate_histogram(int fd, const char *filesystem, 51 histogram_t *histogram_ary, 52 struct hammer_ioc_mirror_rw *mirror_base, 53 int *repeatp); 54 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 55 hammer_ioc_mrecord_head_t pickup); 56 static void write_mrecord(int fdout, u_int32_t type, 57 hammer_ioc_mrecord_any_t mrec, int bytes); 58 static void generate_mrec_header(int fd, int pfs_id, 59 union hammer_ioc_mrecord_any *mrec_tmp); 60 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 61 struct hammer_ioc_mrecord_head *pickup, 62 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 63 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 64 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 65 u_int64_t *bwcount, struct timeval *tv1); 66 static int getyn(void); 67 static void mirror_usage(int code); 68 69 /* 70 * Generate a mirroring data stream from the specific source over the 71 * entire key range, but restricted to the specified transaction range. 72 * 73 * The HAMMER VFS does most of the work, we add a few new mrecord 74 * types to negotiate the TID ranges and verify that the entire 75 * stream made it to the destination. 76 * 77 * streaming will be 0 for mirror-read, 1 for mirror-stream. The code will 78 * set up a fake value of -1 when running the histogram for mirror-read. 79 */ 80 void 81 hammer_cmd_mirror_read(char **av, int ac, int streaming) 82 { 83 struct hammer_ioc_mirror_rw mirror; 84 struct hammer_ioc_pseudofs_rw pfs; 85 union hammer_ioc_mrecord_any mrec_tmp; 86 struct hammer_ioc_mrecord_head pickup; 87 hammer_ioc_mrecord_any_t mrec; 88 hammer_tid_t sync_tid; 89 histogram_t histogram_ary; 90 const char *filesystem; 91 char *buf = malloc(SERIALBUF_SIZE); 92 int interrupted = 0; 93 int error; 94 int fd; 95 int n; 96 int didwork; 97 int histogram; 98 int histindex; 99 int histmax; 100 int repeat = 0; 101 int sameline; 102 int64_t total_bytes; 103 time_t base_t = time(NULL); 104 struct timeval bwtv; 105 u_int64_t bwcount; 106 u_int64_t estbytes; 107 108 if (ac == 0 || ac > 2) 109 mirror_usage(1); 110 filesystem = av[0]; 111 hammer_check_restrict(filesystem); 112 113 pickup.signature = 0; 114 pickup.type = 0; 115 histogram = 0; 116 histindex = 0; 117 histmax = 0; 118 histogram_ary = NULL; 119 sameline = 0; 120 121 again: 122 bzero(&mirror, sizeof(mirror)); 123 hammer_key_beg_init(&mirror.key_beg); 124 hammer_key_end_init(&mirror.key_end); 125 126 fd = getpfs(&pfs, filesystem); 127 128 if (streaming >= 0) 129 score_printf(LINE1, "Running"); 130 131 if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) { 132 fprintf(stderr, "%cRunning \b\b", (sameline ? '\r' : '\n')); 133 fflush(stderr); 134 sameline = 1; 135 } 136 sameline = 1; 137 total_bytes = 0; 138 gettimeofday(&bwtv, NULL); 139 bwcount = 0; 140 141 /* 142 * Send initial header for the purpose of determining the 143 * shared-uuid. 144 */ 145 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 146 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 147 &mrec_tmp, sizeof(mrec_tmp.pfs)); 148 149 /* 150 * In 2-way mode the target will send us a PFS info packet 151 * first. Use the target's current snapshot TID as our default 152 * begin TID. 153 */ 154 if (TwoWayPipeOpt) { 155 mirror.tid_beg = 0; 156 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 157 NULL, &mirror.tid_beg); 158 if (n < 0) { /* got TERM record */ 159 relpfs(fd, &pfs); 160 return; 161 } 162 ++mirror.tid_beg; 163 } else if (streaming && histogram) { 164 mirror.tid_beg = histogram_ary[histindex].tid + 1; 165 } else { 166 mirror.tid_beg = 0; 167 } 168 169 /* 170 * Write out the PFS header, tid_beg will be updated if our PFS 171 * has a larger begin sync. tid_end is set to the latest source 172 * TID whos flush cycle has completed. 173 */ 174 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 175 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 176 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 177 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 178 mirror.ubuf = buf; 179 mirror.size = SERIALBUF_SIZE; 180 mirror.pfs_id = pfs.pfs_id; 181 mirror.shared_uuid = pfs.ondisk->shared_uuid; 182 183 /* 184 * XXX If the histogram is exhausted and the TID delta is large 185 * the stream might have been offline for a while and is 186 * now picking it up again. Do another histogram. 187 */ 188 #if 0 189 if (streaming && histogram && histindex == histend) { 190 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM) 191 histogram = 0; 192 } 193 #endif 194 195 /* 196 * Initial bulk startup control, try to do some incremental 197 * mirroring in order to allow the stream to be killed and 198 * restarted without having to start over. 199 */ 200 if (histogram == 0 && BulkOpt == 0) { 201 if (VerboseOpt && repeat == 0) { 202 fprintf(stderr, "\n"); 203 sameline = 0; 204 } 205 histmax = generate_histogram(fd, filesystem, 206 &histogram_ary, &mirror, 207 &repeat); 208 histindex = 0; 209 histogram = 1; 210 211 /* 212 * Just stream the histogram, then stop 213 */ 214 if (streaming == 0) 215 streaming = -1; 216 } 217 218 if (streaming && histogram) { 219 ++histindex; 220 mirror.tid_end = histogram_ary[histindex].tid; 221 estbytes = histogram_ary[histindex-1].bytes; 222 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end; 223 } else { 224 estbytes = 0; 225 } 226 227 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 228 &mrec_tmp, sizeof(mrec_tmp.pfs)); 229 230 /* 231 * A cycle file overrides the beginning TID only if we are 232 * not operating in two-way or histogram mode. 233 */ 234 if (TwoWayPipeOpt == 0 && histogram == 0) { 235 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 236 } 237 238 /* 239 * An additional argument overrides the beginning TID regardless 240 * of what mode we are in. This is not recommending if operating 241 * in two-way mode. 242 */ 243 if (ac == 2) 244 mirror.tid_beg = strtoull(av[1], NULL, 0); 245 246 if (streaming == 0 || VerboseOpt >= 2) { 247 fprintf(stderr, 248 "Mirror-read: Mirror %016jx to %016jx", 249 (uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end); 250 if (histogram) 251 fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes); 252 fprintf(stderr, "\n"); 253 fflush(stderr); 254 } 255 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 256 fprintf(stderr, "Mirror-read: Resuming at object %016jx\n", 257 (uintmax_t)mirror.key_beg.obj_id); 258 } 259 260 /* 261 * Nothing to do if begin equals end. 262 */ 263 if (mirror.tid_beg >= mirror.tid_end) { 264 if (streaming == 0 || VerboseOpt >= 2) 265 fprintf(stderr, "Mirror-read: No work to do\n"); 266 sleep(DelayOpt); 267 didwork = 0; 268 histogram = 0; 269 goto done; 270 } 271 didwork = 1; 272 273 /* 274 * Write out bulk records 275 */ 276 mirror.ubuf = buf; 277 mirror.size = SERIALBUF_SIZE; 278 279 do { 280 mirror.count = 0; 281 mirror.pfs_id = pfs.pfs_id; 282 mirror.shared_uuid = pfs.ondisk->shared_uuid; 283 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 284 score_printf(LINE3, "Mirror-read %s failed: %s", 285 filesystem, strerror(errno)); 286 fprintf(stderr, "Mirror-read %s failed: %s\n", 287 filesystem, strerror(errno)); 288 exit(1); 289 } 290 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 291 score_printf(LINE3, "Mirror-read %s fatal error %d", 292 filesystem, mirror.head.error); 293 fprintf(stderr, 294 "Mirror-read %s fatal error %d\n", 295 filesystem, mirror.head.error); 296 exit(1); 297 } 298 if (mirror.count) { 299 if (BandwidthOpt) { 300 n = writebw(1, mirror.ubuf, mirror.count, 301 &bwcount, &bwtv); 302 } else { 303 n = write(1, mirror.ubuf, mirror.count); 304 } 305 if (n != mirror.count) { 306 score_printf(LINE3, 307 "Mirror-read %s failed: " 308 "short write", 309 filesystem); 310 fprintf(stderr, 311 "Mirror-read %s failed: " 312 "short write\n", 313 filesystem); 314 exit(1); 315 } 316 } 317 total_bytes += mirror.count; 318 if (streaming && VerboseOpt) { 319 fprintf(stderr, 320 "\rscan obj=%016jx tids=%016jx:%016jx %11jd", 321 (uintmax_t)mirror.key_cur.obj_id, 322 (uintmax_t)mirror.tid_beg, 323 (uintmax_t)mirror.tid_end, 324 (intmax_t)total_bytes); 325 fflush(stderr); 326 sameline = 0; 327 } else if (streaming) { 328 score_printf(LINE2, 329 "obj=%016jx tids=%016jx:%016jx %11jd", 330 (uintmax_t)mirror.key_cur.obj_id, 331 (uintmax_t)mirror.tid_beg, 332 (uintmax_t)mirror.tid_end, 333 (intmax_t)total_bytes); 334 } 335 mirror.key_beg = mirror.key_cur; 336 337 /* 338 * Deal with time limit option 339 */ 340 if (TimeoutOpt && 341 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 342 score_printf(LINE3, 343 "Mirror-read %s interrupted by timer at" 344 " %016jx", 345 filesystem, 346 (uintmax_t)mirror.key_cur.obj_id); 347 fprintf(stderr, 348 "Mirror-read %s interrupted by timer at" 349 " %016jx\n", 350 filesystem, 351 (uintmax_t)mirror.key_cur.obj_id); 352 interrupted = 1; 353 break; 354 } 355 } while (mirror.count != 0); 356 357 done: 358 if (streaming && VerboseOpt && sameline == 0) { 359 fprintf(stderr, "\n"); 360 fflush(stderr); 361 sameline = 1; 362 } 363 364 /* 365 * Write out the termination sync record - only if not interrupted 366 */ 367 if (interrupted == 0) { 368 if (didwork) { 369 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 370 &mrec_tmp, sizeof(mrec_tmp.sync)); 371 } else { 372 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 373 &mrec_tmp, sizeof(mrec_tmp.sync)); 374 } 375 } 376 377 /* 378 * If the -2 option was given (automatic when doing mirror-copy), 379 * a two-way pipe is assumed and we expect a response mrec from 380 * the target. 381 */ 382 if (TwoWayPipeOpt) { 383 mrec = read_mrecord(0, &error, &pickup); 384 if (mrec == NULL || 385 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 386 mrec->head.rec_size != sizeof(mrec->update)) { 387 fprintf(stderr, "mirror_read: Did not get final " 388 "acknowledgement packet from target\n"); 389 exit(1); 390 } 391 if (interrupted) { 392 if (CyclePath) { 393 hammer_set_cycle(&mirror.key_cur, 394 mirror.tid_beg); 395 fprintf(stderr, "Cyclefile %s updated for " 396 "continuation\n", CyclePath); 397 } 398 } else { 399 sync_tid = mrec->update.tid; 400 if (CyclePath) { 401 hammer_key_beg_init(&mirror.key_beg); 402 hammer_set_cycle(&mirror.key_beg, sync_tid); 403 fprintf(stderr, 404 "Cyclefile %s updated to 0x%016jx\n", 405 CyclePath, (uintmax_t)sync_tid); 406 } 407 } 408 } else if (CyclePath) { 409 /* NOTE! mirror.tid_beg cannot be updated */ 410 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 411 "fully updated unless you use mirror-copy\n"); 412 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 413 } 414 if (streaming && interrupted == 0) { 415 time_t t1 = time(NULL); 416 time_t t2; 417 418 /* 419 * Try to break down large bulk transfers into smaller ones 420 * so it can sync the transaction id on the slave. This 421 * way if we get interrupted a restart doesn't have to 422 * start from scratch. 423 */ 424 if (streaming && histogram) { 425 if (histindex != histmax) { 426 if (VerboseOpt && VerboseOpt < 2 && 427 streaming >= 0) { 428 fprintf(stderr, " (bulk incremental)"); 429 } 430 relpfs(fd, &pfs); 431 goto again; 432 } 433 } 434 435 if (VerboseOpt && streaming >= 0) { 436 fprintf(stderr, " W"); 437 fflush(stderr); 438 } else if (streaming >= 0) { 439 score_printf(LINE1, "Waiting"); 440 } 441 pfs.ondisk->sync_end_tid = mirror.tid_end; 442 if (streaming < 0) { 443 /* 444 * Fake streaming mode when using a histogram to 445 * break up a mirror-read, do not wait on source. 446 */ 447 streaming = 0; 448 } else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 449 score_printf(LINE3, 450 "Mirror-read %s: cannot stream: %s\n", 451 filesystem, strerror(errno)); 452 fprintf(stderr, 453 "Mirror-read %s: cannot stream: %s\n", 454 filesystem, strerror(errno)); 455 } else { 456 t2 = time(NULL) - t1; 457 if (t2 >= 0 && t2 < DelayOpt) { 458 if (VerboseOpt) { 459 fprintf(stderr, "\bD"); 460 fflush(stderr); 461 } 462 sleep(DelayOpt - t2); 463 } 464 if (VerboseOpt) { 465 fprintf(stderr, "\b "); 466 fflush(stderr); 467 } 468 relpfs(fd, &pfs); 469 goto again; 470 } 471 } 472 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 473 &mrec_tmp, sizeof(mrec_tmp.sync)); 474 relpfs(fd, &pfs); 475 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 476 } 477 478 /* 479 * What we are trying to do here is figure out how much data is 480 * going to be sent for the TID range and to break the TID range 481 * down into reasonably-sized slices (from the point of view of 482 * data sent) so a lost connection can restart at a reasonable 483 * place and not all the way back at the beginning. 484 * 485 * An entry's TID serves as the end_tid for the prior entry 486 * So we have to offset the calculation by 1 so that TID falls into 487 * the previous entry when populating entries. 488 * 489 * Because the transaction id space is bursty we need a relatively 490 * large number of buckets (like a million) to do a reasonable job 491 * for things like an initial bulk mirrors on a very large filesystem. 492 */ 493 #define HIST_COUNT (1024 * 1024) 494 495 static int 496 generate_histogram(int fd, const char *filesystem, 497 histogram_t *histogram_ary, 498 struct hammer_ioc_mirror_rw *mirror_base, 499 int *repeatp) 500 { 501 struct hammer_ioc_mirror_rw mirror; 502 union hammer_ioc_mrecord_any *mrec; 503 hammer_tid_t tid_beg; 504 hammer_tid_t tid_end; 505 hammer_tid_t tid; 506 hammer_tid_t tidx; 507 u_int64_t *tid_bytes; 508 u_int64_t total; 509 u_int64_t accum; 510 int chunkno; 511 int i; 512 int res; 513 int off; 514 int len; 515 516 mirror = *mirror_base; 517 tid_beg = mirror.tid_beg; 518 tid_end = mirror.tid_end; 519 mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA; 520 521 if (*histogram_ary == NULL) { 522 *histogram_ary = malloc(sizeof(struct histogram) * 523 (HIST_COUNT + 2)); 524 } 525 if (tid_beg >= tid_end) 526 return(0); 527 528 /* needs 2 extra */ 529 tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2)); 530 bzero(tid_bytes, sizeof(*tid_bytes) * (HIST_COUNT + 2)); 531 532 if (*repeatp == 0) { 533 fprintf(stderr, "Prescan to break up bulk transfer"); 534 if (VerboseOpt > 1) 535 fprintf(stderr, " (%juMB chunks)", 536 (uintmax_t)(SplitupOpt / (1024 * 1024))); 537 fprintf(stderr, "\n"); 538 } 539 540 /* 541 * Note: (tid_beg,tid_end), range is inclusive of both beg & end. 542 * 543 * Note: Estimates can be off when the mirror is way behind due 544 * to skips. 545 */ 546 total = 0; 547 accum = 0; 548 chunkno = 0; 549 for (;;) { 550 mirror.count = 0; 551 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 552 fprintf(stderr, "Mirror-read %s failed: %s\n", 553 filesystem, strerror(errno)); 554 exit(1); 555 } 556 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 557 fprintf(stderr, 558 "Mirror-read %s fatal error %d\n", 559 filesystem, mirror.head.error); 560 exit(1); 561 } 562 for (off = 0; 563 off < mirror.count; 564 off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size) 565 ) { 566 mrec = (void *)((char *)mirror.ubuf + off); 567 568 /* 569 * We only care about general RECs and PASS 570 * records. We ignore SKIPs. 571 */ 572 switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) { 573 case HAMMER_MREC_TYPE_REC: 574 case HAMMER_MREC_TYPE_PASS: 575 break; 576 default: 577 continue; 578 } 579 580 /* 581 * Calculate for two indices, create_tid and 582 * delete_tid. Record data only applies to 583 * the create_tid. 584 * 585 * When tid is exactly on the boundary it really 586 * belongs to the previous entry because scans 587 * are inclusive of the ending entry. 588 */ 589 tid = mrec->rec.leaf.base.delete_tid; 590 if (tid && tid >= tid_beg && tid <= tid_end) { 591 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 592 if (mrec->head.type == 593 HAMMER_MREC_TYPE_REC) { 594 len -= HAMMER_HEAD_DOALIGN( 595 mrec->rec.leaf.data_len); 596 assert(len > 0); 597 } 598 i = (tid - tid_beg) * HIST_COUNT / 599 (tid_end - tid_beg); 600 tidx = tid_beg + i * (tid_end - tid_beg) / 601 HIST_COUNT; 602 if (tid == tidx && i) 603 --i; 604 assert(i >= 0 && i < HIST_COUNT); 605 tid_bytes[i] += len; 606 total += len; 607 accum += len; 608 } 609 610 tid = mrec->rec.leaf.base.create_tid; 611 if (tid && tid >= tid_beg && tid <= tid_end) { 612 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 613 if (mrec->head.type == 614 HAMMER_MREC_TYPE_REC_NODATA) { 615 len += HAMMER_HEAD_DOALIGN( 616 mrec->rec.leaf.data_len); 617 } 618 i = (tid - tid_beg) * HIST_COUNT / 619 (tid_end - tid_beg); 620 tidx = tid_beg + i * (tid_end - tid_beg) / 621 HIST_COUNT; 622 if (tid == tidx && i) 623 --i; 624 assert(i >= 0 && i < HIST_COUNT); 625 tid_bytes[i] += len; 626 total += len; 627 accum += len; 628 } 629 } 630 if (*repeatp == 0 && accum > SplitupOpt) { 631 if (VerboseOpt > 1) { 632 fprintf(stderr, "."); 633 fflush(stderr); 634 } 635 ++chunkno; 636 score_printf(LINE2, "Prescan chunk %d", chunkno); 637 accum = 0; 638 } 639 if (mirror.count == 0) 640 break; 641 mirror.key_beg = mirror.key_cur; 642 } 643 644 /* 645 * Reduce to SplitupOpt (default 4GB) chunks. This code may 646 * use up to two additional elements. Do the array in-place. 647 * 648 * Inefficient degenerate cases can occur if we do not accumulate 649 * at least the requested split amount, so error on the side of 650 * going over a bit. 651 */ 652 res = 0; 653 (*histogram_ary)[res].tid = tid_beg; 654 (*histogram_ary)[res].bytes = tid_bytes[0]; 655 for (i = 1; i < HIST_COUNT; ++i) { 656 if ((*histogram_ary)[res].bytes >= SplitupOpt) { 657 ++res; 658 (*histogram_ary)[res].tid = tid_beg + 659 i * (tid_end - tid_beg) / 660 HIST_COUNT; 661 (*histogram_ary)[res].bytes = 0; 662 663 } 664 (*histogram_ary)[res].bytes += tid_bytes[i]; 665 } 666 ++res; 667 (*histogram_ary)[res].tid = tid_end; 668 (*histogram_ary)[res].bytes = -1; 669 670 if (*repeatp == 0) { 671 if (VerboseOpt > 1) 672 fprintf(stderr, "\n"); /* newline after ... */ 673 score_printf(LINE3, "Prescan %d chunks, total %ju MBytes", 674 res, (uintmax_t)total / (1024 * 1024)); 675 fprintf(stderr, "Prescan %d chunks, total %ju MBytes (", 676 res, (uintmax_t)total / (1024 * 1024)); 677 for (i = 0; i < res && i < 3; ++i) { 678 if (i) 679 fprintf(stderr, ", "); 680 fprintf(stderr, "%ju", 681 (uintmax_t)(*histogram_ary)[i].bytes); 682 } 683 if (i < res) 684 fprintf(stderr, ", ..."); 685 fprintf(stderr, ")\n"); 686 } 687 assert(res <= HIST_COUNT); 688 *repeatp = 1; 689 690 free(tid_bytes); 691 return(res); 692 } 693 694 static void 695 create_pfs(const char *filesystem, uuid_t *s_uuid) 696 { 697 if (ForceYesOpt == 1) { 698 fprintf(stderr, "PFS slave %s does not exist. " 699 "Auto create new slave PFS!\n", filesystem); 700 701 } else { 702 fprintf(stderr, "PFS slave %s does not exist.\n" 703 "Do you want to create a new slave PFS? (yes|no) ", 704 filesystem); 705 fflush(stderr); 706 if (getyn() != 1) { 707 fprintf(stderr, "Aborting operation\n"); 708 exit(1); 709 } 710 } 711 712 u_int32_t status; 713 char *shared_uuid = NULL; 714 uuid_to_string(s_uuid, &shared_uuid, &status); 715 716 char *cmd = NULL; 717 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 718 filesystem, shared_uuid); 719 free(shared_uuid); 720 721 if (cmd == NULL) { 722 fprintf(stderr, "Failed to alloc memory\n"); 723 exit(1); 724 } 725 if (system(cmd) != 0) { 726 fprintf(stderr, "Failed to create PFS\n"); 727 } 728 free(cmd); 729 } 730 731 /* 732 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 733 * some additional packet types to negotiate TID ranges and to verify 734 * completion. The HAMMER VFS does most of the work. 735 * 736 * It is important to note that the mirror.key_{beg,end} range must 737 * match the ranged used by the original. For now both sides use 738 * range the entire key space. 739 * 740 * It is even more important that the records in the stream conform 741 * to the TID range also supplied in the stream. The HAMMER VFS will 742 * use the REC, PASS, and SKIP record types to track the portions of 743 * the B-Tree being scanned in order to be able to proactively delete 744 * records on the target within those active areas that are not mentioned 745 * by the source. 746 * 747 * The mirror.key_cur field is used by the VFS to do this tracking. It 748 * must be initialized to key_beg but then is persistently updated by 749 * the HAMMER VFS on each successive ioctl() call. If you blow up this 750 * field you will blow up the mirror target, possibly to the point of 751 * deleting everything. As a safety measure the HAMMER VFS simply marks 752 * the records that the source has destroyed as deleted on the target, 753 * and normal pruning operations will deal with their final disposition 754 * at some later time. 755 */ 756 void 757 hammer_cmd_mirror_write(char **av, int ac) 758 { 759 struct hammer_ioc_mirror_rw mirror; 760 const char *filesystem; 761 char *buf = malloc(SERIALBUF_SIZE); 762 struct hammer_ioc_pseudofs_rw pfs; 763 struct hammer_ioc_mrecord_head pickup; 764 struct hammer_ioc_synctid synctid; 765 union hammer_ioc_mrecord_any mrec_tmp; 766 hammer_ioc_mrecord_any_t mrec; 767 struct stat st; 768 int error; 769 int fd; 770 int n; 771 772 if (ac != 1) 773 mirror_usage(1); 774 filesystem = av[0]; 775 hammer_check_restrict(filesystem); 776 777 pickup.signature = 0; 778 pickup.type = 0; 779 780 again: 781 bzero(&mirror, sizeof(mirror)); 782 hammer_key_beg_init(&mirror.key_beg); 783 hammer_key_end_init(&mirror.key_end); 784 mirror.key_end = mirror.key_beg; 785 786 /* 787 * Read initial packet 788 */ 789 mrec = read_mrecord(0, &error, &pickup); 790 if (mrec == NULL) { 791 if (error == 0) 792 fprintf(stderr, "validate_mrec_header: short read\n"); 793 exit(1); 794 } 795 /* 796 * Validate packet 797 */ 798 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 799 return; 800 } 801 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 802 fprintf(stderr, "validate_mrec_header: did not get expected " 803 "PFSD record type\n"); 804 exit(1); 805 } 806 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 807 fprintf(stderr, "validate_mrec_header: unexpected payload " 808 "size\n"); 809 exit(1); 810 } 811 812 /* 813 * Create slave PFS if it doesn't yet exist 814 */ 815 if (lstat(filesystem, &st) != 0) { 816 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 817 } 818 free(mrec); 819 mrec = NULL; 820 821 fd = getpfs(&pfs, filesystem); 822 823 /* 824 * In two-way mode the target writes out a PFS packet first. 825 * The source uses our tid_end as its tid_beg by default, 826 * picking up where it left off. 827 */ 828 mirror.tid_beg = 0; 829 if (TwoWayPipeOpt) { 830 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 831 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 832 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 833 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 834 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 835 &mrec_tmp, sizeof(mrec_tmp.pfs)); 836 } 837 838 /* 839 * Read and process the PFS header. The source informs us of 840 * the TID range the stream represents. 841 */ 842 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 843 &mirror.tid_beg, &mirror.tid_end); 844 if (n < 0) { /* got TERM record */ 845 relpfs(fd, &pfs); 846 return; 847 } 848 849 mirror.ubuf = buf; 850 mirror.size = SERIALBUF_SIZE; 851 852 /* 853 * Read and process bulk records (REC, PASS, and SKIP types). 854 * 855 * On your life, do NOT mess with mirror.key_cur or your mirror 856 * target may become history. 857 */ 858 for (;;) { 859 mirror.count = 0; 860 mirror.pfs_id = pfs.pfs_id; 861 mirror.shared_uuid = pfs.ondisk->shared_uuid; 862 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 863 if (mirror.size <= 0) 864 break; 865 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 866 fprintf(stderr, "Mirror-write %s failed: %s\n", 867 filesystem, strerror(errno)); 868 exit(1); 869 } 870 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 871 fprintf(stderr, 872 "Mirror-write %s fatal error %d\n", 873 filesystem, mirror.head.error); 874 exit(1); 875 } 876 #if 0 877 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 878 fprintf(stderr, 879 "Mirror-write %s interrupted by timer at" 880 " %016llx\n", 881 filesystem, 882 mirror.key_cur.obj_id); 883 exit(0); 884 } 885 #endif 886 } 887 888 /* 889 * Read and process the termination sync record. 890 */ 891 mrec = read_mrecord(0, &error, &pickup); 892 893 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 894 fprintf(stderr, "Mirror-write: received termination request\n"); 895 free(mrec); 896 return; 897 } 898 899 if (mrec == NULL || 900 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 901 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 902 mrec->head.rec_size != sizeof(mrec->sync)) { 903 fprintf(stderr, "Mirror-write %s: Did not get termination " 904 "sync record, or rec_size is wrong rt=%d\n", 905 filesystem, 906 (mrec ? (int)mrec->head.type : -1)); 907 exit(1); 908 } 909 910 /* 911 * Update the PFS info on the target so the user has visibility 912 * into the new snapshot, and sync the target filesystem. 913 */ 914 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 915 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 916 917 bzero(&synctid, sizeof(synctid)); 918 synctid.op = HAMMER_SYNCTID_SYNC2; 919 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 920 921 if (VerboseOpt >= 2) { 922 fprintf(stderr, "Mirror-write %s: succeeded\n", 923 filesystem); 924 } 925 } 926 927 free(mrec); 928 mrec = NULL; 929 930 /* 931 * Report back to the originator. 932 */ 933 if (TwoWayPipeOpt) { 934 mrec_tmp.update.tid = mirror.tid_end; 935 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 936 &mrec_tmp, sizeof(mrec_tmp.update)); 937 } else { 938 printf("Source can update synctid to 0x%016jx\n", 939 (uintmax_t)mirror.tid_end); 940 } 941 relpfs(fd, &pfs); 942 goto again; 943 } 944 945 void 946 hammer_cmd_mirror_dump(void) 947 { 948 char *buf = malloc(SERIALBUF_SIZE); 949 struct hammer_ioc_mrecord_head pickup; 950 hammer_ioc_mrecord_any_t mrec; 951 int error; 952 int size; 953 int offset; 954 int bytes; 955 956 /* 957 * Read and process the PFS header 958 */ 959 pickup.signature = 0; 960 pickup.type = 0; 961 962 mrec = read_mrecord(0, &error, &pickup); 963 964 again: 965 /* 966 * Read and process bulk records 967 */ 968 for (;;) { 969 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 970 if (size <= 0) 971 break; 972 offset = 0; 973 while (offset < size) { 974 mrec = (void *)((char *)buf + offset); 975 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 976 if (offset + bytes > size) { 977 fprintf(stderr, "Misaligned record\n"); 978 exit(1); 979 } 980 981 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 982 case HAMMER_MREC_TYPE_REC_BADCRC: 983 case HAMMER_MREC_TYPE_REC: 984 printf("Record obj=%016jx key=%016jx " 985 "rt=%02x ot=%02x", 986 (uintmax_t)mrec->rec.leaf.base.obj_id, 987 (uintmax_t)mrec->rec.leaf.base.key, 988 mrec->rec.leaf.base.rec_type, 989 mrec->rec.leaf.base.obj_type); 990 if (mrec->head.type == 991 HAMMER_MREC_TYPE_REC_BADCRC) { 992 printf(" (BAD CRC)"); 993 } 994 printf("\n"); 995 printf(" tids %016jx:%016jx data=%d\n", 996 (uintmax_t)mrec->rec.leaf.base.create_tid, 997 (uintmax_t)mrec->rec.leaf.base.delete_tid, 998 mrec->rec.leaf.data_len); 999 break; 1000 case HAMMER_MREC_TYPE_PASS: 1001 printf("Pass obj=%016jx key=%016jx " 1002 "rt=%02x ot=%02x\n", 1003 (uintmax_t)mrec->rec.leaf.base.obj_id, 1004 (uintmax_t)mrec->rec.leaf.base.key, 1005 mrec->rec.leaf.base.rec_type, 1006 mrec->rec.leaf.base.obj_type); 1007 printf(" tids %016jx:%016jx data=%d\n", 1008 (uintmax_t)mrec->rec.leaf.base.create_tid, 1009 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1010 mrec->rec.leaf.data_len); 1011 break; 1012 case HAMMER_MREC_TYPE_SKIP: 1013 printf("Skip obj=%016jx key=%016jx rt=%02x to\n" 1014 " obj=%016jx key=%016jx rt=%02x\n", 1015 (uintmax_t)mrec->skip.skip_beg.obj_id, 1016 (uintmax_t)mrec->skip.skip_beg.key, 1017 mrec->skip.skip_beg.rec_type, 1018 (uintmax_t)mrec->skip.skip_end.obj_id, 1019 (uintmax_t)mrec->skip.skip_end.key, 1020 mrec->skip.skip_end.rec_type); 1021 default: 1022 break; 1023 } 1024 offset += bytes; 1025 } 1026 } 1027 1028 /* 1029 * Read and process the termination sync record. 1030 */ 1031 mrec = read_mrecord(0, &error, &pickup); 1032 if (mrec == NULL || 1033 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 1034 mrec->head.type != HAMMER_MREC_TYPE_IDLE) 1035 ) { 1036 fprintf(stderr, "Mirror-dump: Did not get termination " 1037 "sync record\n"); 1038 } 1039 1040 /* 1041 * Continue with more batches until EOF. 1042 */ 1043 mrec = read_mrecord(0, &error, &pickup); 1044 if (mrec) 1045 goto again; 1046 } 1047 1048 void 1049 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 1050 { 1051 pid_t pid1; 1052 pid_t pid2; 1053 int fds[2]; 1054 const char *xav[32]; 1055 char tbuf[16]; 1056 char *sh, *user, *host, *rfs; 1057 int xac; 1058 1059 if (ac != 2) 1060 mirror_usage(1); 1061 1062 TwoWayPipeOpt = 1; 1063 signal(SIGPIPE, SIG_IGN); 1064 1065 again: 1066 if (pipe(fds) < 0) { 1067 perror("pipe"); 1068 exit(1); 1069 } 1070 1071 /* 1072 * Source 1073 */ 1074 if ((pid1 = fork()) == 0) { 1075 signal(SIGPIPE, SIG_DFL); 1076 dup2(fds[0], 0); 1077 dup2(fds[0], 1); 1078 close(fds[0]); 1079 close(fds[1]); 1080 if ((rfs = strchr(av[0], ':')) != NULL) { 1081 xac = 0; 1082 1083 if((sh = getenv("HAMMER_RSH")) == NULL) 1084 xav[xac++] = "ssh"; 1085 else 1086 xav[xac++] = sh; 1087 1088 if (CompressOpt) 1089 xav[xac++] = "-C"; 1090 1091 user = strndup(av[0], (rfs - av[0])); 1092 1093 if ((host = strchr(av[0], '@')) != NULL) { 1094 user = strndup( av[0], (host++ - av[0])); 1095 host = strndup( host, (rfs++ - host)); 1096 xav[xac++] = "-l"; 1097 xav[xac++] = user; 1098 xav[xac++] = host; 1099 } 1100 else { 1101 host = strndup( av[0], (rfs++ - av[0])); 1102 user = NULL; 1103 xav[xac++] = host; 1104 } 1105 1106 1107 if (SshPort) { 1108 xav[xac++] = "-p"; 1109 xav[xac++] = SshPort; 1110 } 1111 1112 xav[xac++] = "hammer"; 1113 1114 switch(VerboseOpt) { 1115 case 0: 1116 break; 1117 case 1: 1118 xav[xac++] = "-v"; 1119 break; 1120 case 2: 1121 xav[xac++] = "-vv"; 1122 break; 1123 default: 1124 xav[xac++] = "-vvv"; 1125 break; 1126 } 1127 if (ForceYesOpt) { 1128 xav[xac++] = "-y"; 1129 } 1130 xav[xac++] = "-2"; 1131 if (TimeoutOpt) { 1132 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 1133 xav[xac++] = "-t"; 1134 xav[xac++] = tbuf; 1135 } 1136 if (SplitupOptStr) { 1137 xav[xac++] = "-S"; 1138 xav[xac++] = SplitupOptStr; 1139 } 1140 if (streaming) 1141 xav[xac++] = "mirror-read-stream"; 1142 else 1143 xav[xac++] = "mirror-read"; 1144 xav[xac++] = rfs; 1145 xav[xac++] = NULL; 1146 execvp(*xav, (void *)xav); 1147 } else { 1148 hammer_cmd_mirror_read(av, 1, streaming); 1149 fflush(stdout); 1150 fflush(stderr); 1151 } 1152 _exit(1); 1153 } 1154 1155 /* 1156 * Target 1157 */ 1158 if ((pid2 = fork()) == 0) { 1159 signal(SIGPIPE, SIG_DFL); 1160 dup2(fds[1], 0); 1161 dup2(fds[1], 1); 1162 close(fds[0]); 1163 close(fds[1]); 1164 if ((rfs = strchr(av[1], ':')) != NULL) { 1165 xac = 0; 1166 1167 if((sh = getenv("HAMMER_RSH")) == NULL) 1168 xav[xac++] = "ssh"; 1169 else 1170 xav[xac++] = sh; 1171 1172 if (CompressOpt) 1173 xav[xac++] = "-C"; 1174 1175 user = strndup(av[1], (rfs - av[1])); 1176 1177 if ((host = strchr(av[1], '@')) != NULL) { 1178 user = strndup( av[1], (host++ - av[1])); 1179 host = strndup( host, (rfs++ - host)); 1180 xav[xac++] = "-l"; 1181 xav[xac++] = user; 1182 xav[xac++] = host; 1183 } 1184 else { 1185 host = strndup( av[1], (rfs++ - av[1])); 1186 user = NULL; 1187 xav[xac++] = host; 1188 } 1189 1190 if (SshPort) { 1191 xav[xac++] = "-p"; 1192 xav[xac++] = SshPort; 1193 } 1194 1195 xav[xac++] = "hammer"; 1196 1197 switch(VerboseOpt) { 1198 case 0: 1199 break; 1200 case 1: 1201 xav[xac++] = "-v"; 1202 break; 1203 case 2: 1204 xav[xac++] = "-vv"; 1205 break; 1206 default: 1207 xav[xac++] = "-vvv"; 1208 break; 1209 } 1210 if (ForceYesOpt) { 1211 xav[xac++] = "-y"; 1212 } 1213 xav[xac++] = "-2"; 1214 xav[xac++] = "mirror-write"; 1215 xav[xac++] = rfs; 1216 xav[xac++] = NULL; 1217 execvp(*xav, (void *)xav); 1218 } else { 1219 hammer_cmd_mirror_write(av + 1, 1); 1220 fflush(stdout); 1221 fflush(stderr); 1222 } 1223 _exit(1); 1224 } 1225 close(fds[0]); 1226 close(fds[1]); 1227 1228 while (waitpid(pid1, NULL, 0) <= 0) 1229 ; 1230 while (waitpid(pid2, NULL, 0) <= 0) 1231 ; 1232 1233 /* 1234 * If the link is lost restart 1235 */ 1236 if (streaming) { 1237 if (VerboseOpt) { 1238 fprintf(stderr, "\nLost Link\n"); 1239 fflush(stderr); 1240 } 1241 sleep(15 + DelayOpt); 1242 goto again; 1243 } 1244 1245 } 1246 1247 /* 1248 * Read and return multiple mrecords 1249 */ 1250 static int 1251 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 1252 { 1253 hammer_ioc_mrecord_any_t mrec; 1254 u_int count; 1255 size_t n; 1256 size_t i; 1257 size_t bytes; 1258 int type; 1259 1260 count = 0; 1261 while (size - count >= HAMMER_MREC_HEADSIZE) { 1262 /* 1263 * Cached the record header in case we run out of buffer 1264 * space. 1265 */ 1266 fflush(stdout); 1267 if (pickup->signature == 0) { 1268 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1269 i = read(fd, (char *)pickup + n, 1270 HAMMER_MREC_HEADSIZE - n); 1271 if (i <= 0) 1272 break; 1273 } 1274 if (n == 0) 1275 break; 1276 if (n != HAMMER_MREC_HEADSIZE) { 1277 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1278 exit(1); 1279 } 1280 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1281 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1282 "bad signature\n"); 1283 exit(1); 1284 } 1285 } 1286 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 1287 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 1288 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1289 "illegal rec_size\n"); 1290 exit(1); 1291 } 1292 1293 /* 1294 * Stop if we have insufficient space for the record and data. 1295 */ 1296 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 1297 if (size - count < bytes) 1298 break; 1299 1300 /* 1301 * Stop if the record type is not a REC, SKIP, or PASS, 1302 * which are the only types the ioctl supports. Other types 1303 * are used only by the userland protocol. 1304 * 1305 * Ignore all flags. 1306 */ 1307 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 1308 if (type != HAMMER_MREC_TYPE_PFSD && 1309 type != HAMMER_MREC_TYPE_REC && 1310 type != HAMMER_MREC_TYPE_SKIP && 1311 type != HAMMER_MREC_TYPE_PASS) { 1312 break; 1313 } 1314 1315 /* 1316 * Read the remainder and clear the pickup signature. 1317 */ 1318 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 1319 i = read(fd, buf + count + n, bytes - n); 1320 if (i <= 0) 1321 break; 1322 } 1323 if (n != bytes) { 1324 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1325 exit(1); 1326 } 1327 1328 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 1329 pickup->signature = 0; 1330 pickup->type = 0; 1331 mrec = (void *)(buf + count); 1332 1333 /* 1334 * Validate the completed record 1335 */ 1336 if (mrec->head.rec_crc != 1337 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1338 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1339 fprintf(stderr, "read_mrecords: malformed record " 1340 "on pipe, bad crc\n"); 1341 exit(1); 1342 } 1343 1344 /* 1345 * If its a B-Tree record validate the data crc. 1346 * 1347 * NOTE: If the VFS passes us an explicitly errorde mrec 1348 * we just pass it through. 1349 */ 1350 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 1351 1352 if (type == HAMMER_MREC_TYPE_REC) { 1353 if (mrec->head.rec_size < 1354 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 1355 fprintf(stderr, 1356 "read_mrecords: malformed record on " 1357 "pipe, illegal element data_len\n"); 1358 exit(1); 1359 } 1360 if (mrec->rec.leaf.data_len && 1361 mrec->rec.leaf.data_offset && 1362 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 1363 fprintf(stderr, 1364 "read_mrecords: data_crc did not " 1365 "match data! obj=%016jx key=%016jx\n", 1366 (uintmax_t)mrec->rec.leaf.base.obj_id, 1367 (uintmax_t)mrec->rec.leaf.base.key); 1368 fprintf(stderr, 1369 "continuing, but there are problems\n"); 1370 } 1371 } 1372 count += bytes; 1373 } 1374 return(count); 1375 } 1376 1377 /* 1378 * Read and return a single mrecord. 1379 */ 1380 static 1381 hammer_ioc_mrecord_any_t 1382 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 1383 { 1384 hammer_ioc_mrecord_any_t mrec; 1385 struct hammer_ioc_mrecord_head mrechd; 1386 size_t bytes; 1387 size_t n; 1388 size_t i; 1389 1390 if (pickup && pickup->type != 0) { 1391 mrechd = *pickup; 1392 pickup->signature = 0; 1393 pickup->type = 0; 1394 n = HAMMER_MREC_HEADSIZE; 1395 } else { 1396 /* 1397 * Read in the PFSD header from the sender. 1398 */ 1399 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1400 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 1401 if (i <= 0) 1402 break; 1403 } 1404 if (n == 0) { 1405 *errorp = 0; /* EOF */ 1406 return(NULL); 1407 } 1408 if (n != HAMMER_MREC_HEADSIZE) { 1409 fprintf(stderr, "short read of mrecord header\n"); 1410 *errorp = EPIPE; 1411 return(NULL); 1412 } 1413 } 1414 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1415 fprintf(stderr, "read_mrecord: bad signature\n"); 1416 *errorp = EINVAL; 1417 return(NULL); 1418 } 1419 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 1420 assert(bytes >= sizeof(mrechd)); 1421 mrec = malloc(bytes); 1422 mrec->head = mrechd; 1423 1424 while (n < bytes) { 1425 i = read(fdin, (char *)mrec + n, bytes - n); 1426 if (i <= 0) 1427 break; 1428 n += i; 1429 } 1430 if (n != bytes) { 1431 fprintf(stderr, "read_mrecord: short read on payload\n"); 1432 *errorp = EPIPE; 1433 return(NULL); 1434 } 1435 if (mrec->head.rec_crc != 1436 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1437 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1438 fprintf(stderr, "read_mrecord: bad CRC\n"); 1439 *errorp = EINVAL; 1440 return(NULL); 1441 } 1442 *errorp = 0; 1443 return(mrec); 1444 } 1445 1446 static 1447 void 1448 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec, 1449 int bytes) 1450 { 1451 char zbuf[HAMMER_HEAD_ALIGN]; 1452 int pad; 1453 1454 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 1455 1456 assert(bytes >= (int)sizeof(mrec->head)); 1457 bzero(&mrec->head, sizeof(mrec->head)); 1458 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 1459 mrec->head.type = type; 1460 mrec->head.rec_size = bytes; 1461 mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1462 bytes - HAMMER_MREC_CRCOFF); 1463 if (write(fdout, mrec, bytes) != bytes) { 1464 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1465 errno, strerror(errno)); 1466 exit(1); 1467 } 1468 if (pad) { 1469 bzero(zbuf, pad); 1470 if (write(fdout, zbuf, pad) != pad) { 1471 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1472 errno, strerror(errno)); 1473 exit(1); 1474 } 1475 } 1476 } 1477 1478 /* 1479 * Generate a mirroring header with the pfs information of the 1480 * originating filesytem. 1481 */ 1482 static void 1483 generate_mrec_header(int fd, int pfs_id, 1484 union hammer_ioc_mrecord_any *mrec_tmp) 1485 { 1486 struct hammer_ioc_pseudofs_rw pfs; 1487 1488 bzero(&pfs, sizeof(pfs)); 1489 bzero(mrec_tmp, sizeof(*mrec_tmp)); 1490 pfs.pfs_id = pfs_id; 1491 pfs.ondisk = &mrec_tmp->pfs.pfsd; 1492 pfs.bytes = sizeof(mrec_tmp->pfs.pfsd); 1493 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1494 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 1495 exit(1); 1496 } 1497 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1498 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n"); 1499 exit(1); 1500 } 1501 mrec_tmp->pfs.version = pfs.version; 1502 } 1503 1504 /* 1505 * Validate the pfs information from the originating filesystem 1506 * against the target filesystem. shared_uuid must match. 1507 * 1508 * return -1 if we got a TERM record 1509 */ 1510 static int 1511 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1512 struct hammer_ioc_mrecord_head *pickup, 1513 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1514 { 1515 struct hammer_ioc_pseudofs_rw pfs; 1516 struct hammer_pseudofs_data pfsd; 1517 hammer_ioc_mrecord_any_t mrec; 1518 int error; 1519 1520 /* 1521 * Get the PFSD info from the target filesystem. 1522 */ 1523 bzero(&pfs, sizeof(pfs)); 1524 bzero(&pfsd, sizeof(pfsd)); 1525 pfs.pfs_id = pfs_id; 1526 pfs.ondisk = &pfsd; 1527 pfs.bytes = sizeof(pfsd); 1528 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1529 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 1530 exit(1); 1531 } 1532 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1533 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n"); 1534 exit(1); 1535 } 1536 1537 mrec = read_mrecord(fdin, &error, pickup); 1538 if (mrec == NULL) { 1539 if (error == 0) 1540 fprintf(stderr, "validate_mrec_header: short read\n"); 1541 exit(1); 1542 } 1543 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1544 free(mrec); 1545 return(-1); 1546 } 1547 1548 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1549 fprintf(stderr, "validate_mrec_header: did not get expected " 1550 "PFSD record type\n"); 1551 exit(1); 1552 } 1553 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1554 fprintf(stderr, "validate_mrec_header: unexpected payload " 1555 "size\n"); 1556 exit(1); 1557 } 1558 if (mrec->pfs.version != pfs.version) { 1559 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 1560 exit(1); 1561 } 1562 1563 /* 1564 * Whew. Ok, is the read PFS info compatible with the target? 1565 */ 1566 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1567 sizeof(pfsd.shared_uuid)) != 0) { 1568 fprintf(stderr, 1569 "mirror-write: source and target have " 1570 "different shared-uuid's!\n"); 1571 exit(1); 1572 } 1573 if (is_target && 1574 (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) { 1575 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1576 exit(1); 1577 } 1578 if (tid_begp) 1579 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1580 if (tid_endp) 1581 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1582 free(mrec); 1583 return(0); 1584 } 1585 1586 static void 1587 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1588 { 1589 struct hammer_ioc_pseudofs_rw pfs; 1590 struct hammer_pseudofs_data pfsd; 1591 1592 bzero(&pfs, sizeof(pfs)); 1593 bzero(&pfsd, sizeof(pfsd)); 1594 pfs.pfs_id = pfs_id; 1595 pfs.ondisk = &pfsd; 1596 pfs.bytes = sizeof(pfsd); 1597 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1598 perror("update_pfs_snapshot (read)"); 1599 exit(1); 1600 } 1601 if (pfsd.sync_end_tid != snapshot_tid) { 1602 pfsd.sync_end_tid = snapshot_tid; 1603 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1604 perror("update_pfs_snapshot (rewrite)"); 1605 exit(1); 1606 } 1607 if (VerboseOpt >= 2) { 1608 fprintf(stderr, 1609 "Mirror-write: Completed, updated snapshot " 1610 "to %016jx\n", 1611 (uintmax_t)snapshot_tid); 1612 fflush(stderr); 1613 } 1614 } 1615 } 1616 1617 /* 1618 * Bandwidth-limited write in chunks 1619 */ 1620 static 1621 ssize_t 1622 writebw(int fd, const void *buf, size_t nbytes, 1623 u_int64_t *bwcount, struct timeval *tv1) 1624 { 1625 struct timeval tv2; 1626 size_t n; 1627 ssize_t r; 1628 ssize_t a; 1629 int usec; 1630 1631 a = 0; 1632 r = 0; 1633 while (nbytes) { 1634 if (*bwcount + nbytes > BandwidthOpt) 1635 n = BandwidthOpt - *bwcount; 1636 else 1637 n = nbytes; 1638 if (n) 1639 r = write(fd, buf, n); 1640 if (r >= 0) { 1641 a += r; 1642 nbytes -= r; 1643 buf = (const char *)buf + r; 1644 } 1645 if ((size_t)r != n) 1646 break; 1647 *bwcount += n; 1648 if (*bwcount >= BandwidthOpt) { 1649 gettimeofday(&tv2, NULL); 1650 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1651 (int)(tv2.tv_usec - tv1->tv_usec); 1652 if (usec >= 0 && usec < 1000000) 1653 usleep(1000000 - usec); 1654 gettimeofday(tv1, NULL); 1655 *bwcount -= BandwidthOpt; 1656 } 1657 } 1658 return(a ? a : r); 1659 } 1660 1661 /* 1662 * Get a yes or no answer from the terminal. The program may be run as 1663 * part of a two-way pipe so we cannot use stdin for this operation. 1664 */ 1665 static int 1666 getyn(void) 1667 { 1668 char buf[256]; 1669 FILE *fp; 1670 int result; 1671 1672 fp = fopen("/dev/tty", "r"); 1673 if (fp == NULL) { 1674 fprintf(stderr, "No terminal for response\n"); 1675 return(-1); 1676 } 1677 result = -1; 1678 while (fgets(buf, sizeof(buf), fp) != NULL) { 1679 if (buf[0] == 'y' || buf[0] == 'Y') { 1680 result = 1; 1681 break; 1682 } 1683 if (buf[0] == 'n' || buf[0] == 'N') { 1684 result = 0; 1685 break; 1686 } 1687 fprintf(stderr, "Response not understood\n"); 1688 break; 1689 } 1690 fclose(fp); 1691 return(result); 1692 } 1693 1694 static void 1695 mirror_usage(int code) 1696 { 1697 fprintf(stderr, 1698 "hammer mirror-read <filesystem> [begin-tid]\n" 1699 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1700 "hammer mirror-write <filesystem>\n" 1701 "hammer mirror-dump\n" 1702 "hammer mirror-copy [[user@]host:]<filesystem>" 1703 " [[user@]host:]<filesystem>\n" 1704 "hammer mirror-stream [[user@]host:]<filesystem>" 1705 " [[user@]host:]<filesystem>\n" 1706 ); 1707 exit(code); 1708 } 1709 1710