1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 35 #include "hammer.h" 36 37 #define LINE1 0,20 38 #define LINE2 20,78 39 #define LINE3 90,70 40 41 #define SERIALBUF_SIZE (512 * 1024) 42 43 typedef struct histogram { 44 hammer_tid_t tid; 45 u_int64_t bytes; 46 } *histogram_t; 47 48 static int read_mrecords(int fd, char *buf, u_int size, 49 hammer_ioc_mrecord_head_t pickup); 50 static int generate_histogram(int fd, const char *filesystem, 51 histogram_t *histogram_ary, 52 struct hammer_ioc_mirror_rw *mirror_base, 53 int *repeatp); 54 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 55 hammer_ioc_mrecord_head_t pickup); 56 static void write_mrecord(int fdout, u_int32_t type, 57 hammer_ioc_mrecord_any_t mrec, int bytes); 58 static void generate_mrec_header(int fd, int pfs_id, 59 union hammer_ioc_mrecord_any *mrec_tmp); 60 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 61 struct hammer_ioc_mrecord_head *pickup, 62 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 63 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 64 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 65 u_int64_t *bwcount, struct timeval *tv1); 66 static int getyn(void); 67 static void mirror_usage(int code); 68 69 /* 70 * Generate a mirroring data stream from the specific source over the 71 * entire key range, but restricted to the specified transaction range. 72 * 73 * The HAMMER VFS does most of the work, we add a few new mrecord 74 * types to negotiate the TID ranges and verify that the entire 75 * stream made it to the destination. 76 * 77 * streaming will be 0 for mirror-read, 1 for mirror-stream. The code will 78 * set up a fake value of -1 when running the histogram for mirror-read. 79 */ 80 void 81 hammer_cmd_mirror_read(char **av, int ac, int streaming) 82 { 83 struct hammer_ioc_mirror_rw mirror; 84 struct hammer_ioc_pseudofs_rw pfs; 85 union hammer_ioc_mrecord_any mrec_tmp; 86 struct hammer_ioc_mrecord_head pickup; 87 hammer_ioc_mrecord_any_t mrec; 88 hammer_tid_t sync_tid; 89 histogram_t histogram_ary; 90 char *filesystem; 91 char *buf = malloc(SERIALBUF_SIZE); 92 int interrupted = 0; 93 int error; 94 int fd; 95 int n; 96 int didwork; 97 int histogram; 98 int histindex; 99 int histmax; 100 int repeat = 0; 101 int sameline; 102 int64_t total_bytes; 103 time_t base_t = time(NULL); 104 struct timeval bwtv; 105 u_int64_t bwcount; 106 u_int64_t estbytes; 107 108 if (ac == 0 || ac > 2) 109 mirror_usage(1); 110 filesystem = av[0]; 111 hammer_check_restrict(filesystem); 112 113 pickup.signature = 0; 114 pickup.type = 0; 115 histogram = 0; 116 histindex = 0; 117 histmax = 0; 118 histogram_ary = NULL; 119 sameline = 0; 120 121 again: 122 bzero(&mirror, sizeof(mirror)); 123 hammer_key_beg_init(&mirror.key_beg); 124 hammer_key_end_init(&mirror.key_end); 125 126 fd = getpfs(&pfs, filesystem); 127 128 if (streaming >= 0) 129 score_printf(LINE1, "Running"); 130 131 if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) { 132 fprintf(stderr, "%cRunning \b\b", (sameline ? '\r' : '\n')); 133 fflush(stderr); 134 sameline = 1; 135 } 136 sameline = 1; 137 total_bytes = 0; 138 gettimeofday(&bwtv, NULL); 139 bwcount = 0; 140 141 /* 142 * Send initial header for the purpose of determining the 143 * shared-uuid. 144 */ 145 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 146 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 147 &mrec_tmp, sizeof(mrec_tmp.pfs)); 148 149 /* 150 * In 2-way mode the target will send us a PFS info packet 151 * first. Use the target's current snapshot TID as our default 152 * begin TID. 153 */ 154 if (TwoWayPipeOpt) { 155 mirror.tid_beg = 0; 156 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 157 NULL, &mirror.tid_beg); 158 if (n < 0) { /* got TERM record */ 159 relpfs(fd, &pfs); 160 return; 161 } 162 ++mirror.tid_beg; 163 } else if (streaming && histogram) { 164 mirror.tid_beg = histogram_ary[histindex].tid + 1; 165 } else { 166 mirror.tid_beg = 0; 167 } 168 169 /* 170 * Write out the PFS header, tid_beg will be updated if our PFS 171 * has a larger begin sync. tid_end is set to the latest source 172 * TID whos flush cycle has completed. 173 */ 174 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 175 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 176 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 177 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 178 mirror.ubuf = buf; 179 mirror.size = SERIALBUF_SIZE; 180 mirror.pfs_id = pfs.pfs_id; 181 mirror.shared_uuid = pfs.ondisk->shared_uuid; 182 183 /* 184 * XXX If the histogram is exhausted and the TID delta is large 185 * the stream might have been offline for a while and is 186 * now picking it up again. Do another histogram. 187 */ 188 #if 0 189 if (streaming && histogram && histindex == histend) { 190 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM) 191 histogram = 0; 192 } 193 #endif 194 195 /* 196 * Initial bulk startup control, try to do some incremental 197 * mirroring in order to allow the stream to be killed and 198 * restarted without having to start over. 199 */ 200 if (histogram == 0 && BulkOpt == 0) { 201 if (VerboseOpt && repeat == 0) { 202 fprintf(stderr, "\n"); 203 sameline = 0; 204 } 205 histmax = generate_histogram(fd, filesystem, 206 &histogram_ary, &mirror, 207 &repeat); 208 histindex = 0; 209 histogram = 1; 210 211 /* 212 * Just stream the histogram, then stop 213 */ 214 if (streaming == 0) 215 streaming = -1; 216 } 217 218 if (streaming && histogram) { 219 ++histindex; 220 mirror.tid_end = histogram_ary[histindex].tid; 221 estbytes = histogram_ary[histindex-1].bytes; 222 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end; 223 } else { 224 estbytes = 0; 225 } 226 227 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 228 &mrec_tmp, sizeof(mrec_tmp.pfs)); 229 230 /* 231 * A cycle file overrides the beginning TID only if we are 232 * not operating in two-way or histogram mode. 233 */ 234 if (TwoWayPipeOpt == 0 && histogram == 0) { 235 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 236 } 237 238 /* 239 * An additional argument overrides the beginning TID regardless 240 * of what mode we are in. This is not recommending if operating 241 * in two-way mode. 242 */ 243 if (ac == 2) 244 mirror.tid_beg = strtoull(av[1], NULL, 0); 245 246 if (streaming == 0 || VerboseOpt >= 2) { 247 fprintf(stderr, 248 "Mirror-read: Mirror %016jx to %016jx", 249 (uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end); 250 if (histogram) 251 fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes); 252 fprintf(stderr, "\n"); 253 fflush(stderr); 254 } 255 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 256 fprintf(stderr, "Mirror-read: Resuming at object %016jx\n", 257 (uintmax_t)mirror.key_beg.obj_id); 258 } 259 260 /* 261 * Nothing to do if begin equals end. 262 */ 263 if (mirror.tid_beg >= mirror.tid_end) { 264 if (streaming == 0 || VerboseOpt >= 2) 265 fprintf(stderr, "Mirror-read: No work to do\n"); 266 sleep(DelayOpt); 267 didwork = 0; 268 histogram = 0; 269 goto done; 270 } 271 didwork = 1; 272 273 /* 274 * Write out bulk records 275 */ 276 mirror.ubuf = buf; 277 mirror.size = SERIALBUF_SIZE; 278 279 do { 280 mirror.count = 0; 281 mirror.pfs_id = pfs.pfs_id; 282 mirror.shared_uuid = pfs.ondisk->shared_uuid; 283 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 284 score_printf(LINE3, "Mirror-read %s failed: %s", 285 filesystem, strerror(errno)); 286 fprintf(stderr, "Mirror-read %s failed: %s\n", 287 filesystem, strerror(errno)); 288 exit(1); 289 } 290 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 291 score_printf(LINE3, "Mirror-read %s fatal error %d", 292 filesystem, mirror.head.error); 293 fprintf(stderr, 294 "Mirror-read %s fatal error %d\n", 295 filesystem, mirror.head.error); 296 exit(1); 297 } 298 if (mirror.count) { 299 if (BandwidthOpt) { 300 n = writebw(1, mirror.ubuf, mirror.count, 301 &bwcount, &bwtv); 302 } else { 303 n = write(1, mirror.ubuf, mirror.count); 304 } 305 if (n != mirror.count) { 306 score_printf(LINE3, 307 "Mirror-read %s failed: " 308 "short write", 309 filesystem); 310 fprintf(stderr, 311 "Mirror-read %s failed: " 312 "short write\n", 313 filesystem); 314 exit(1); 315 } 316 } 317 total_bytes += mirror.count; 318 if (streaming && VerboseOpt) { 319 fprintf(stderr, 320 "\rscan obj=%016jx tids=%016jx:%016jx %11jd", 321 (uintmax_t)mirror.key_cur.obj_id, 322 (uintmax_t)mirror.tid_beg, 323 (uintmax_t)mirror.tid_end, 324 (intmax_t)total_bytes); 325 fflush(stderr); 326 sameline = 0; 327 } else if (streaming) { 328 score_printf(LINE2, 329 "obj=%016jx tids=%016jx:%016jx %11jd", 330 (uintmax_t)mirror.key_cur.obj_id, 331 (uintmax_t)mirror.tid_beg, 332 (uintmax_t)mirror.tid_end, 333 (intmax_t)total_bytes); 334 } 335 mirror.key_beg = mirror.key_cur; 336 337 /* 338 * Deal with time limit option 339 */ 340 if (TimeoutOpt && 341 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 342 score_printf(LINE3, 343 "Mirror-read %s interrupted by timer at" 344 " %016jx", 345 filesystem, 346 (uintmax_t)mirror.key_cur.obj_id); 347 fprintf(stderr, 348 "Mirror-read %s interrupted by timer at" 349 " %016jx\n", 350 filesystem, 351 (uintmax_t)mirror.key_cur.obj_id); 352 interrupted = 1; 353 break; 354 } 355 } while (mirror.count != 0); 356 357 done: 358 if (streaming && VerboseOpt && sameline == 0) { 359 fprintf(stderr, "\n"); 360 fflush(stderr); 361 sameline = 1; 362 } 363 364 /* 365 * Write out the termination sync record - only if not interrupted 366 */ 367 if (interrupted == 0) { 368 if (didwork) { 369 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 370 &mrec_tmp, sizeof(mrec_tmp.sync)); 371 } else { 372 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 373 &mrec_tmp, sizeof(mrec_tmp.sync)); 374 } 375 } 376 377 /* 378 * If the -2 option was given (automatic when doing mirror-copy), 379 * a two-way pipe is assumed and we expect a response mrec from 380 * the target. 381 */ 382 if (TwoWayPipeOpt) { 383 mrec = read_mrecord(0, &error, &pickup); 384 if (mrec == NULL || 385 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 386 mrec->head.rec_size != sizeof(mrec->update)) { 387 fprintf(stderr, "mirror_read: Did not get final " 388 "acknowledgement packet from target\n"); 389 exit(1); 390 } 391 if (interrupted) { 392 if (CyclePath) { 393 hammer_set_cycle(&mirror.key_cur, 394 mirror.tid_beg); 395 fprintf(stderr, "Cyclefile %s updated for " 396 "continuation\n", CyclePath); 397 } 398 } else { 399 sync_tid = mrec->update.tid; 400 if (CyclePath) { 401 hammer_key_beg_init(&mirror.key_beg); 402 hammer_set_cycle(&mirror.key_beg, sync_tid); 403 fprintf(stderr, 404 "Cyclefile %s updated to 0x%016jx\n", 405 CyclePath, (uintmax_t)sync_tid); 406 } 407 } 408 } else if (CyclePath) { 409 /* NOTE! mirror.tid_beg cannot be updated */ 410 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 411 "fully updated unless you use mirror-copy\n"); 412 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 413 } 414 if (streaming && interrupted == 0) { 415 time_t t1 = time(NULL); 416 time_t t2; 417 418 /* 419 * Try to break down large bulk transfers into smaller ones 420 * so it can sync the transaction id on the slave. This 421 * way if we get interrupted a restart doesn't have to 422 * start from scratch. 423 */ 424 if (streaming && histogram) { 425 if (histindex != histmax) { 426 if (VerboseOpt && VerboseOpt < 2 && 427 streaming >= 0) { 428 fprintf(stderr, " (bulk incremental)"); 429 } 430 relpfs(fd, &pfs); 431 goto again; 432 } 433 } 434 435 if (VerboseOpt && streaming >= 0) { 436 fprintf(stderr, " W"); 437 fflush(stderr); 438 } else if (streaming >= 0) { 439 score_printf(LINE1, "Waiting"); 440 } 441 pfs.ondisk->sync_end_tid = mirror.tid_end; 442 if (streaming < 0) { 443 /* 444 * Fake streaming mode when using a histogram to 445 * break up a mirror-read, do not wait on source. 446 */ 447 streaming = 0; 448 } else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 449 score_printf(LINE3, 450 "Mirror-read %s: cannot stream: %s\n", 451 filesystem, strerror(errno)); 452 fprintf(stderr, 453 "Mirror-read %s: cannot stream: %s\n", 454 filesystem, strerror(errno)); 455 } else { 456 t2 = time(NULL) - t1; 457 if (t2 >= 0 && t2 < DelayOpt) { 458 if (VerboseOpt) { 459 fprintf(stderr, "\bD"); 460 fflush(stderr); 461 } 462 sleep(DelayOpt - t2); 463 } 464 if (VerboseOpt) { 465 fprintf(stderr, "\b "); 466 fflush(stderr); 467 } 468 relpfs(fd, &pfs); 469 goto again; 470 } 471 } 472 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 473 &mrec_tmp, sizeof(mrec_tmp.sync)); 474 relpfs(fd, &pfs); 475 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 476 } 477 478 /* 479 * What we are trying to do here is figure out how much data is 480 * going to be sent for the TID range and to break the TID range 481 * down into reasonably-sized slices (from the point of view of 482 * data sent) so a lost connection can restart at a reasonable 483 * place and not all the way back at the beginning. 484 * 485 * An entry's TID serves as the end_tid for the prior entry 486 * So we have to offset the calculation by 1 so that TID falls into 487 * the previous entry when populating entries. 488 * 489 * Because the transaction id space is bursty we need a relatively 490 * large number of buckets (like a million) to do a reasonable job 491 * for things like an initial bulk mirrors on a very large filesystem. 492 */ 493 #define HIST_COUNT (1024 * 1024) 494 495 static int 496 generate_histogram(int fd, const char *filesystem, 497 histogram_t *histogram_ary, 498 struct hammer_ioc_mirror_rw *mirror_base, 499 int *repeatp) 500 { 501 struct hammer_ioc_mirror_rw mirror; 502 union hammer_ioc_mrecord_any *mrec; 503 hammer_tid_t tid_beg; 504 hammer_tid_t tid_end; 505 hammer_tid_t tid; 506 hammer_tid_t tidx; 507 u_int64_t *tid_bytes; 508 u_int64_t total; 509 u_int64_t accum; 510 int chunkno; 511 int i; 512 int res; 513 int off; 514 int len; 515 516 mirror = *mirror_base; 517 tid_beg = mirror.tid_beg; 518 tid_end = mirror.tid_end; 519 mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA; 520 521 if (*histogram_ary == NULL) { 522 *histogram_ary = malloc(sizeof(struct histogram) * 523 (HIST_COUNT + 2)); 524 } 525 if (tid_beg >= tid_end) 526 return(0); 527 528 /* needs 2 extra */ 529 tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2)); 530 bzero(tid_bytes, sizeof(*tid_bytes) * (HIST_COUNT + 2)); 531 532 if (*repeatp == 0) { 533 fprintf(stderr, "Prescan to break up bulk transfer"); 534 if (VerboseOpt > 1) 535 fprintf(stderr, " (%juMB chunks)", 536 (uintmax_t)(SplitupOpt / (1024 * 1024))); 537 fprintf(stderr, "\n"); 538 } 539 540 /* 541 * Note: (tid_beg,tid_end), range is inclusive of both beg & end. 542 * 543 * Note: Estimates can be off when the mirror is way behind due 544 * to skips. 545 */ 546 total = 0; 547 accum = 0; 548 chunkno = 0; 549 for (;;) { 550 mirror.count = 0; 551 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 552 fprintf(stderr, "Mirror-read %s failed: %s\n", 553 filesystem, strerror(errno)); 554 exit(1); 555 } 556 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 557 fprintf(stderr, 558 "Mirror-read %s fatal error %d\n", 559 filesystem, mirror.head.error); 560 exit(1); 561 } 562 for (off = 0; 563 off < mirror.count; 564 off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size) 565 ) { 566 mrec = (void *)((char *)mirror.ubuf + off); 567 568 /* 569 * We only care about general RECs and PASS 570 * records. We ignore SKIPs. 571 */ 572 switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) { 573 case HAMMER_MREC_TYPE_REC: 574 case HAMMER_MREC_TYPE_PASS: 575 break; 576 default: 577 continue; 578 } 579 580 /* 581 * Calculate for two indices, create_tid and 582 * delete_tid. Record data only applies to 583 * the create_tid. 584 * 585 * When tid is exactly on the boundary it really 586 * belongs to the previous entry because scans 587 * are inclusive of the ending entry. 588 */ 589 tid = mrec->rec.leaf.base.delete_tid; 590 if (tid && tid >= tid_beg && tid <= tid_end) { 591 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 592 if (mrec->head.type == 593 HAMMER_MREC_TYPE_REC) { 594 len -= HAMMER_HEAD_DOALIGN( 595 mrec->rec.leaf.data_len); 596 assert(len > 0); 597 } 598 i = (tid - tid_beg) * HIST_COUNT / 599 (tid_end - tid_beg); 600 tidx = tid_beg + i * (tid_end - tid_beg) / 601 HIST_COUNT; 602 if (tid == tidx && i) 603 --i; 604 assert(i >= 0 && i < HIST_COUNT); 605 tid_bytes[i] += len; 606 total += len; 607 accum += len; 608 } 609 610 tid = mrec->rec.leaf.base.create_tid; 611 if (tid && tid >= tid_beg && tid <= tid_end) { 612 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 613 if (mrec->head.type == 614 HAMMER_MREC_TYPE_REC_NODATA) { 615 len += HAMMER_HEAD_DOALIGN( 616 mrec->rec.leaf.data_len); 617 } 618 i = (tid - tid_beg) * HIST_COUNT / 619 (tid_end - tid_beg); 620 tidx = tid_beg + i * (tid_end - tid_beg) / 621 HIST_COUNT; 622 if (tid == tidx && i) 623 --i; 624 assert(i >= 0 && i < HIST_COUNT); 625 tid_bytes[i] += len; 626 total += len; 627 accum += len; 628 } 629 } 630 if (*repeatp == 0 && accum > SplitupOpt) { 631 if (VerboseOpt > 1) { 632 fprintf(stderr, "."); 633 fflush(stderr); 634 } 635 ++chunkno; 636 score_printf(LINE2, "Prescan chunk %d", chunkno); 637 accum = 0; 638 } 639 if (mirror.count == 0) 640 break; 641 mirror.key_beg = mirror.key_cur; 642 } 643 644 /* 645 * Reduce to SplitupOpt (default 4GB) chunks. This code may 646 * use up to two additional elements. Do the array in-place. 647 * 648 * Inefficient degenerate cases can occur if we do not accumulate 649 * at least the requested split amount, so error on the side of 650 * going over a bit. 651 */ 652 res = 0; 653 (*histogram_ary)[res].tid = tid_beg; 654 (*histogram_ary)[res].bytes = tid_bytes[0]; 655 for (i = 1; i < HIST_COUNT; ++i) { 656 if ((*histogram_ary)[res].bytes >= SplitupOpt) { 657 ++res; 658 (*histogram_ary)[res].tid = tid_beg + 659 i * (tid_end - tid_beg) / 660 HIST_COUNT; 661 (*histogram_ary)[res].bytes = 0; 662 663 } 664 (*histogram_ary)[res].bytes += tid_bytes[i]; 665 } 666 ++res; 667 (*histogram_ary)[res].tid = tid_end; 668 (*histogram_ary)[res].bytes = -1; 669 670 if (*repeatp == 0) { 671 if (VerboseOpt > 1) 672 fprintf(stderr, "\n"); /* newline after ... */ 673 score_printf(LINE3, "Prescan %d chunks, total %ju MBytes", 674 res, (uintmax_t)total / (1024 * 1024)); 675 fprintf(stderr, "Prescan %d chunks, total %ju MBytes (", 676 res, (uintmax_t)total / (1024 * 1024)); 677 for (i = 0; i < res && i < 3; ++i) { 678 if (i) 679 fprintf(stderr, ", "); 680 fprintf(stderr, "%ju", 681 (uintmax_t)(*histogram_ary)[i].bytes); 682 } 683 if (i < res) 684 fprintf(stderr, ", ..."); 685 fprintf(stderr, ")\n"); 686 } 687 assert(res <= HIST_COUNT); 688 *repeatp = 1; 689 690 free(tid_bytes); 691 return(res); 692 } 693 694 static void 695 create_pfs(const char *filesystem, uuid_t *s_uuid) 696 { 697 if (ForceYesOpt == 1) { 698 fprintf(stderr, "PFS slave %s does not exist. " 699 "Auto create new slave PFS!\n", filesystem); 700 701 } else { 702 fprintf(stderr, "PFS slave %s does not exist.\n" 703 "Do you want to create a new slave PFS? (yes|no) ", 704 filesystem); 705 fflush(stderr); 706 if (getyn() != 1) { 707 fprintf(stderr, "Aborting operation\n"); 708 exit(1); 709 } 710 } 711 712 u_int32_t status; 713 char *shared_uuid = NULL; 714 uuid_to_string(s_uuid, &shared_uuid, &status); 715 716 char *cmd = NULL; 717 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 718 filesystem, shared_uuid); 719 free(shared_uuid); 720 721 if (cmd == NULL) { 722 fprintf(stderr, "Failed to alloc memory\n"); 723 exit(1); 724 } 725 if (system(cmd) != 0) { 726 fprintf(stderr, "Failed to create PFS\n"); 727 } 728 free(cmd); 729 } 730 731 /* 732 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 733 * some additional packet types to negotiate TID ranges and to verify 734 * completion. The HAMMER VFS does most of the work. 735 * 736 * It is important to note that the mirror.key_{beg,end} range must 737 * match the ranged used by the original. For now both sides use 738 * range the entire key space. 739 * 740 * It is even more important that the records in the stream conform 741 * to the TID range also supplied in the stream. The HAMMER VFS will 742 * use the REC, PASS, and SKIP record types to track the portions of 743 * the B-Tree being scanned in order to be able to proactively delete 744 * records on the target within those active areas that are not mentioned 745 * by the source. 746 * 747 * The mirror.key_cur field is used by the VFS to do this tracking. It 748 * must be initialized to key_beg but then is persistently updated by 749 * the HAMMER VFS on each successive ioctl() call. If you blow up this 750 * field you will blow up the mirror target, possibly to the point of 751 * deleting everything. As a safety measure the HAMMER VFS simply marks 752 * the records that the source has destroyed as deleted on the target, 753 * and normal pruning operations will deal with their final disposition 754 * at some later time. 755 */ 756 void 757 hammer_cmd_mirror_write(char **av, int ac) 758 { 759 struct hammer_ioc_mirror_rw mirror; 760 char *filesystem; 761 char *buf = malloc(SERIALBUF_SIZE); 762 struct hammer_ioc_pseudofs_rw pfs; 763 struct hammer_ioc_mrecord_head pickup; 764 struct hammer_ioc_synctid synctid; 765 union hammer_ioc_mrecord_any mrec_tmp; 766 hammer_ioc_mrecord_any_t mrec; 767 struct stat st; 768 int error; 769 int fd; 770 int n; 771 772 if (ac != 1) 773 mirror_usage(1); 774 filesystem = av[0]; 775 hammer_check_restrict(filesystem); 776 777 pickup.signature = 0; 778 pickup.type = 0; 779 780 again: 781 bzero(&mirror, sizeof(mirror)); 782 hammer_key_beg_init(&mirror.key_beg); 783 hammer_key_end_init(&mirror.key_end); 784 mirror.key_end = mirror.key_beg; 785 786 /* 787 * Read initial packet 788 */ 789 mrec = read_mrecord(0, &error, &pickup); 790 if (mrec == NULL) { 791 if (error == 0) 792 fprintf(stderr, "validate_mrec_header: short read\n"); 793 exit(1); 794 } 795 /* 796 * Validate packet 797 */ 798 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 799 return; 800 } 801 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 802 fprintf(stderr, "validate_mrec_header: did not get expected " 803 "PFSD record type\n"); 804 exit(1); 805 } 806 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 807 fprintf(stderr, "validate_mrec_header: unexpected payload " 808 "size\n"); 809 exit(1); 810 } 811 812 /* 813 * Create slave PFS if it doesn't yet exist 814 */ 815 if (lstat(filesystem, &st) != 0) { 816 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 817 } 818 free(mrec); 819 mrec = NULL; 820 821 fd = getpfs(&pfs, filesystem); 822 823 /* 824 * In two-way mode the target writes out a PFS packet first. 825 * The source uses our tid_end as its tid_beg by default, 826 * picking up where it left off. 827 */ 828 mirror.tid_beg = 0; 829 if (TwoWayPipeOpt) { 830 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 831 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 832 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 833 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 834 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 835 &mrec_tmp, sizeof(mrec_tmp.pfs)); 836 } 837 838 /* 839 * Read and process the PFS header. The source informs us of 840 * the TID range the stream represents. 841 */ 842 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 843 &mirror.tid_beg, &mirror.tid_end); 844 if (n < 0) { /* got TERM record */ 845 relpfs(fd, &pfs); 846 return; 847 } 848 849 mirror.ubuf = buf; 850 mirror.size = SERIALBUF_SIZE; 851 852 /* 853 * Read and process bulk records (REC, PASS, and SKIP types). 854 * 855 * On your life, do NOT mess with mirror.key_cur or your mirror 856 * target may become history. 857 */ 858 for (;;) { 859 mirror.count = 0; 860 mirror.pfs_id = pfs.pfs_id; 861 mirror.shared_uuid = pfs.ondisk->shared_uuid; 862 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 863 if (mirror.size <= 0) 864 break; 865 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 866 fprintf(stderr, "Mirror-write %s failed: %s\n", 867 filesystem, strerror(errno)); 868 exit(1); 869 } 870 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 871 fprintf(stderr, 872 "Mirror-write %s fatal error %d\n", 873 filesystem, mirror.head.error); 874 exit(1); 875 } 876 #if 0 877 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 878 fprintf(stderr, 879 "Mirror-write %s interrupted by timer at" 880 " %016llx\n", 881 filesystem, 882 mirror.key_cur.obj_id); 883 exit(0); 884 } 885 #endif 886 } 887 888 /* 889 * Read and process the termination sync record. 890 */ 891 mrec = read_mrecord(0, &error, &pickup); 892 893 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 894 fprintf(stderr, "Mirror-write: received termination request\n"); 895 free(mrec); 896 return; 897 } 898 899 if (mrec == NULL || 900 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 901 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 902 mrec->head.rec_size != sizeof(mrec->sync)) { 903 fprintf(stderr, "Mirror-write %s: Did not get termination " 904 "sync record, or rec_size is wrong rt=%d\n", 905 filesystem, 906 (mrec ? (int)mrec->head.type : -1)); 907 exit(1); 908 } 909 910 /* 911 * Update the PFS info on the target so the user has visibility 912 * into the new snapshot, and sync the target filesystem. 913 */ 914 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 915 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 916 917 bzero(&synctid, sizeof(synctid)); 918 synctid.op = HAMMER_SYNCTID_SYNC2; 919 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 920 921 if (VerboseOpt >= 2) { 922 fprintf(stderr, "Mirror-write %s: succeeded\n", 923 filesystem); 924 } 925 } 926 927 free(mrec); 928 mrec = NULL; 929 930 /* 931 * Report back to the originator. 932 */ 933 if (TwoWayPipeOpt) { 934 mrec_tmp.update.tid = mirror.tid_end; 935 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 936 &mrec_tmp, sizeof(mrec_tmp.update)); 937 } else { 938 printf("Source can update synctid to 0x%016jx\n", 939 (uintmax_t)mirror.tid_end); 940 } 941 relpfs(fd, &pfs); 942 goto again; 943 } 944 945 void 946 hammer_cmd_mirror_dump(char **av, int ac) 947 { 948 char *buf = malloc(SERIALBUF_SIZE); 949 struct hammer_ioc_mrecord_head pickup; 950 hammer_ioc_mrecord_any_t mrec; 951 int error; 952 int size; 953 int offset; 954 int bytes; 955 int header_only = 0; 956 957 if (ac == 1 && strcmp(*av, "header") == 0) 958 header_only = 1; 959 else if (ac != 0) 960 mirror_usage(1); 961 962 /* 963 * Read and process the PFS header 964 */ 965 pickup.signature = 0; 966 pickup.type = 0; 967 968 mrec = read_mrecord(0, &error, &pickup); 969 970 /* 971 * Dump the PFS header. mirror-dump takes its input from the output 972 * of a mirror-read so getpfs() can't be used to get a fd to be passed 973 * to dump_pfsd(). 974 */ 975 if (header_only && mrec != NULL) { 976 dump_pfsd(&mrec->pfs.pfsd, -1); 977 return; 978 } 979 980 again: 981 /* 982 * Read and process bulk records 983 */ 984 for (;;) { 985 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 986 if (size <= 0) 987 break; 988 offset = 0; 989 while (offset < size) { 990 mrec = (void *)((char *)buf + offset); 991 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 992 if (offset + bytes > size) { 993 fprintf(stderr, "Misaligned record\n"); 994 exit(1); 995 } 996 997 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 998 case HAMMER_MREC_TYPE_REC_BADCRC: 999 case HAMMER_MREC_TYPE_REC: 1000 printf("Record obj=%016jx key=%016jx " 1001 "rt=%02x ot=%02x", 1002 (uintmax_t)mrec->rec.leaf.base.obj_id, 1003 (uintmax_t)mrec->rec.leaf.base.key, 1004 mrec->rec.leaf.base.rec_type, 1005 mrec->rec.leaf.base.obj_type); 1006 if (mrec->head.type == 1007 HAMMER_MREC_TYPE_REC_BADCRC) { 1008 printf(" (BAD CRC)"); 1009 } 1010 printf("\n"); 1011 printf(" tids %016jx:%016jx data=%d\n", 1012 (uintmax_t)mrec->rec.leaf.base.create_tid, 1013 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1014 mrec->rec.leaf.data_len); 1015 break; 1016 case HAMMER_MREC_TYPE_PASS: 1017 printf("Pass obj=%016jx key=%016jx " 1018 "rt=%02x ot=%02x\n", 1019 (uintmax_t)mrec->rec.leaf.base.obj_id, 1020 (uintmax_t)mrec->rec.leaf.base.key, 1021 mrec->rec.leaf.base.rec_type, 1022 mrec->rec.leaf.base.obj_type); 1023 printf(" tids %016jx:%016jx data=%d\n", 1024 (uintmax_t)mrec->rec.leaf.base.create_tid, 1025 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1026 mrec->rec.leaf.data_len); 1027 break; 1028 case HAMMER_MREC_TYPE_SKIP: 1029 printf("Skip obj=%016jx key=%016jx rt=%02x to\n" 1030 " obj=%016jx key=%016jx rt=%02x\n", 1031 (uintmax_t)mrec->skip.skip_beg.obj_id, 1032 (uintmax_t)mrec->skip.skip_beg.key, 1033 mrec->skip.skip_beg.rec_type, 1034 (uintmax_t)mrec->skip.skip_end.obj_id, 1035 (uintmax_t)mrec->skip.skip_end.key, 1036 mrec->skip.skip_end.rec_type); 1037 default: 1038 break; 1039 } 1040 offset += bytes; 1041 } 1042 } 1043 1044 /* 1045 * Read and process the termination sync record. 1046 */ 1047 mrec = read_mrecord(0, &error, &pickup); 1048 if (mrec == NULL || 1049 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 1050 mrec->head.type != HAMMER_MREC_TYPE_IDLE) 1051 ) { 1052 fprintf(stderr, "Mirror-dump: Did not get termination " 1053 "sync record\n"); 1054 } 1055 1056 /* 1057 * Continue with more batches until EOF. 1058 */ 1059 mrec = read_mrecord(0, &error, &pickup); 1060 if (mrec) 1061 goto again; 1062 } 1063 1064 void 1065 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 1066 { 1067 pid_t pid1; 1068 pid_t pid2; 1069 int fds[2]; 1070 const char *xav[32]; 1071 char tbuf[16]; 1072 char *sh, *user, *host, *rfs; 1073 int xac; 1074 1075 if (ac != 2) 1076 mirror_usage(1); 1077 1078 TwoWayPipeOpt = 1; 1079 signal(SIGPIPE, SIG_IGN); 1080 1081 again: 1082 if (pipe(fds) < 0) { 1083 perror("pipe"); 1084 exit(1); 1085 } 1086 1087 /* 1088 * Source 1089 */ 1090 if ((pid1 = fork()) == 0) { 1091 signal(SIGPIPE, SIG_DFL); 1092 dup2(fds[0], 0); 1093 dup2(fds[0], 1); 1094 close(fds[0]); 1095 close(fds[1]); 1096 if ((rfs = strchr(av[0], ':')) != NULL) { 1097 xac = 0; 1098 1099 if((sh = getenv("HAMMER_RSH")) == NULL) 1100 xav[xac++] = "ssh"; 1101 else 1102 xav[xac++] = sh; 1103 1104 if (CompressOpt) 1105 xav[xac++] = "-C"; 1106 1107 user = strndup(av[0], (rfs - av[0])); 1108 1109 if ((host = strchr(av[0], '@')) != NULL) { 1110 user = strndup( av[0], (host++ - av[0])); 1111 host = strndup( host, (rfs++ - host)); 1112 xav[xac++] = "-l"; 1113 xav[xac++] = user; 1114 xav[xac++] = host; 1115 } 1116 else { 1117 host = strndup( av[0], (rfs++ - av[0])); 1118 user = NULL; 1119 xav[xac++] = host; 1120 } 1121 1122 1123 if (SshPort) { 1124 xav[xac++] = "-p"; 1125 xav[xac++] = SshPort; 1126 } 1127 1128 xav[xac++] = "hammer"; 1129 1130 switch(VerboseOpt) { 1131 case 0: 1132 break; 1133 case 1: 1134 xav[xac++] = "-v"; 1135 break; 1136 case 2: 1137 xav[xac++] = "-vv"; 1138 break; 1139 default: 1140 xav[xac++] = "-vvv"; 1141 break; 1142 } 1143 if (ForceYesOpt) { 1144 xav[xac++] = "-y"; 1145 } 1146 xav[xac++] = "-2"; 1147 if (TimeoutOpt) { 1148 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 1149 xav[xac++] = "-t"; 1150 xav[xac++] = tbuf; 1151 } 1152 if (SplitupOptStr) { 1153 xav[xac++] = "-S"; 1154 xav[xac++] = SplitupOptStr; 1155 } 1156 if (streaming) 1157 xav[xac++] = "mirror-read-stream"; 1158 else 1159 xav[xac++] = "mirror-read"; 1160 xav[xac++] = rfs; 1161 xav[xac++] = NULL; 1162 execvp(*xav, (void *)xav); 1163 } else { 1164 hammer_cmd_mirror_read(av, 1, streaming); 1165 fflush(stdout); 1166 fflush(stderr); 1167 } 1168 _exit(1); 1169 } 1170 1171 /* 1172 * Target 1173 */ 1174 if ((pid2 = fork()) == 0) { 1175 signal(SIGPIPE, SIG_DFL); 1176 dup2(fds[1], 0); 1177 dup2(fds[1], 1); 1178 close(fds[0]); 1179 close(fds[1]); 1180 if ((rfs = strchr(av[1], ':')) != NULL) { 1181 xac = 0; 1182 1183 if((sh = getenv("HAMMER_RSH")) == NULL) 1184 xav[xac++] = "ssh"; 1185 else 1186 xav[xac++] = sh; 1187 1188 if (CompressOpt) 1189 xav[xac++] = "-C"; 1190 1191 user = strndup(av[1], (rfs - av[1])); 1192 1193 if ((host = strchr(av[1], '@')) != NULL) { 1194 user = strndup( av[1], (host++ - av[1])); 1195 host = strndup( host, (rfs++ - host)); 1196 xav[xac++] = "-l"; 1197 xav[xac++] = user; 1198 xav[xac++] = host; 1199 } 1200 else { 1201 host = strndup( av[1], (rfs++ - av[1])); 1202 user = NULL; 1203 xav[xac++] = host; 1204 } 1205 1206 if (SshPort) { 1207 xav[xac++] = "-p"; 1208 xav[xac++] = SshPort; 1209 } 1210 1211 xav[xac++] = "hammer"; 1212 1213 switch(VerboseOpt) { 1214 case 0: 1215 break; 1216 case 1: 1217 xav[xac++] = "-v"; 1218 break; 1219 case 2: 1220 xav[xac++] = "-vv"; 1221 break; 1222 default: 1223 xav[xac++] = "-vvv"; 1224 break; 1225 } 1226 if (ForceYesOpt) { 1227 xav[xac++] = "-y"; 1228 } 1229 xav[xac++] = "-2"; 1230 xav[xac++] = "mirror-write"; 1231 xav[xac++] = rfs; 1232 xav[xac++] = NULL; 1233 execvp(*xav, (void *)xav); 1234 } else { 1235 hammer_cmd_mirror_write(av + 1, 1); 1236 fflush(stdout); 1237 fflush(stderr); 1238 } 1239 _exit(1); 1240 } 1241 close(fds[0]); 1242 close(fds[1]); 1243 1244 while (waitpid(pid1, NULL, 0) <= 0) 1245 ; 1246 while (waitpid(pid2, NULL, 0) <= 0) 1247 ; 1248 1249 /* 1250 * If the link is lost restart 1251 */ 1252 if (streaming) { 1253 if (VerboseOpt) { 1254 fprintf(stderr, "\nLost Link\n"); 1255 fflush(stderr); 1256 } 1257 sleep(15 + DelayOpt); 1258 goto again; 1259 } 1260 1261 } 1262 1263 /* 1264 * Read and return multiple mrecords 1265 */ 1266 static int 1267 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 1268 { 1269 hammer_ioc_mrecord_any_t mrec; 1270 u_int count; 1271 size_t n; 1272 size_t i; 1273 size_t bytes; 1274 int type; 1275 1276 count = 0; 1277 while (size - count >= HAMMER_MREC_HEADSIZE) { 1278 /* 1279 * Cached the record header in case we run out of buffer 1280 * space. 1281 */ 1282 fflush(stdout); 1283 if (pickup->signature == 0) { 1284 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1285 i = read(fd, (char *)pickup + n, 1286 HAMMER_MREC_HEADSIZE - n); 1287 if (i <= 0) 1288 break; 1289 } 1290 if (n == 0) 1291 break; 1292 if (n != HAMMER_MREC_HEADSIZE) { 1293 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1294 exit(1); 1295 } 1296 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1297 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1298 "bad signature\n"); 1299 exit(1); 1300 } 1301 } 1302 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 1303 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 1304 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1305 "illegal rec_size\n"); 1306 exit(1); 1307 } 1308 1309 /* 1310 * Stop if we have insufficient space for the record and data. 1311 */ 1312 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 1313 if (size - count < bytes) 1314 break; 1315 1316 /* 1317 * Stop if the record type is not a REC, SKIP, or PASS, 1318 * which are the only types the ioctl supports. Other types 1319 * are used only by the userland protocol. 1320 * 1321 * Ignore all flags. 1322 */ 1323 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 1324 if (type != HAMMER_MREC_TYPE_PFSD && 1325 type != HAMMER_MREC_TYPE_REC && 1326 type != HAMMER_MREC_TYPE_SKIP && 1327 type != HAMMER_MREC_TYPE_PASS) { 1328 break; 1329 } 1330 1331 /* 1332 * Read the remainder and clear the pickup signature. 1333 */ 1334 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 1335 i = read(fd, buf + count + n, bytes - n); 1336 if (i <= 0) 1337 break; 1338 } 1339 if (n != bytes) { 1340 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1341 exit(1); 1342 } 1343 1344 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 1345 pickup->signature = 0; 1346 pickup->type = 0; 1347 mrec = (void *)(buf + count); 1348 1349 /* 1350 * Validate the completed record 1351 */ 1352 if (mrec->head.rec_crc != 1353 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1354 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1355 fprintf(stderr, "read_mrecords: malformed record " 1356 "on pipe, bad crc\n"); 1357 exit(1); 1358 } 1359 1360 /* 1361 * If its a B-Tree record validate the data crc. 1362 * 1363 * NOTE: If the VFS passes us an explicitly errorde mrec 1364 * we just pass it through. 1365 */ 1366 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 1367 1368 if (type == HAMMER_MREC_TYPE_REC) { 1369 if (mrec->head.rec_size < 1370 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 1371 fprintf(stderr, 1372 "read_mrecords: malformed record on " 1373 "pipe, illegal element data_len\n"); 1374 exit(1); 1375 } 1376 if (mrec->rec.leaf.data_len && 1377 mrec->rec.leaf.data_offset && 1378 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 1379 fprintf(stderr, 1380 "read_mrecords: data_crc did not " 1381 "match data! obj=%016jx key=%016jx\n", 1382 (uintmax_t)mrec->rec.leaf.base.obj_id, 1383 (uintmax_t)mrec->rec.leaf.base.key); 1384 fprintf(stderr, 1385 "continuing, but there are problems\n"); 1386 } 1387 } 1388 count += bytes; 1389 } 1390 return(count); 1391 } 1392 1393 /* 1394 * Read and return a single mrecord. 1395 */ 1396 static 1397 hammer_ioc_mrecord_any_t 1398 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 1399 { 1400 hammer_ioc_mrecord_any_t mrec; 1401 struct hammer_ioc_mrecord_head mrechd; 1402 size_t bytes; 1403 size_t n; 1404 size_t i; 1405 1406 if (pickup && pickup->type != 0) { 1407 mrechd = *pickup; 1408 pickup->signature = 0; 1409 pickup->type = 0; 1410 n = HAMMER_MREC_HEADSIZE; 1411 } else { 1412 /* 1413 * Read in the PFSD header from the sender. 1414 */ 1415 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1416 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 1417 if (i <= 0) 1418 break; 1419 } 1420 if (n == 0) { 1421 *errorp = 0; /* EOF */ 1422 return(NULL); 1423 } 1424 if (n != HAMMER_MREC_HEADSIZE) { 1425 fprintf(stderr, "short read of mrecord header\n"); 1426 *errorp = EPIPE; 1427 return(NULL); 1428 } 1429 } 1430 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1431 fprintf(stderr, "read_mrecord: bad signature\n"); 1432 *errorp = EINVAL; 1433 return(NULL); 1434 } 1435 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 1436 assert(bytes >= sizeof(mrechd)); 1437 mrec = malloc(bytes); 1438 mrec->head = mrechd; 1439 1440 while (n < bytes) { 1441 i = read(fdin, (char *)mrec + n, bytes - n); 1442 if (i <= 0) 1443 break; 1444 n += i; 1445 } 1446 if (n != bytes) { 1447 fprintf(stderr, "read_mrecord: short read on payload\n"); 1448 *errorp = EPIPE; 1449 return(NULL); 1450 } 1451 if (mrec->head.rec_crc != 1452 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1453 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1454 fprintf(stderr, "read_mrecord: bad CRC\n"); 1455 *errorp = EINVAL; 1456 return(NULL); 1457 } 1458 *errorp = 0; 1459 return(mrec); 1460 } 1461 1462 static 1463 void 1464 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec, 1465 int bytes) 1466 { 1467 char zbuf[HAMMER_HEAD_ALIGN]; 1468 int pad; 1469 1470 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 1471 1472 assert(bytes >= (int)sizeof(mrec->head)); 1473 bzero(&mrec->head, sizeof(mrec->head)); 1474 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 1475 mrec->head.type = type; 1476 mrec->head.rec_size = bytes; 1477 mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1478 bytes - HAMMER_MREC_CRCOFF); 1479 if (write(fdout, mrec, bytes) != bytes) { 1480 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1481 errno, strerror(errno)); 1482 exit(1); 1483 } 1484 if (pad) { 1485 bzero(zbuf, pad); 1486 if (write(fdout, zbuf, pad) != pad) { 1487 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1488 errno, strerror(errno)); 1489 exit(1); 1490 } 1491 } 1492 } 1493 1494 /* 1495 * Generate a mirroring header with the pfs information of the 1496 * originating filesytem. 1497 */ 1498 static void 1499 generate_mrec_header(int fd, int pfs_id, 1500 union hammer_ioc_mrecord_any *mrec_tmp) 1501 { 1502 struct hammer_ioc_pseudofs_rw pfs; 1503 1504 bzero(&pfs, sizeof(pfs)); 1505 bzero(mrec_tmp, sizeof(*mrec_tmp)); 1506 pfs.pfs_id = pfs_id; 1507 pfs.ondisk = &mrec_tmp->pfs.pfsd; 1508 pfs.bytes = sizeof(mrec_tmp->pfs.pfsd); 1509 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1510 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 1511 exit(1); 1512 } 1513 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1514 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n"); 1515 exit(1); 1516 } 1517 mrec_tmp->pfs.version = pfs.version; 1518 } 1519 1520 /* 1521 * Validate the pfs information from the originating filesystem 1522 * against the target filesystem. shared_uuid must match. 1523 * 1524 * return -1 if we got a TERM record 1525 */ 1526 static int 1527 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1528 struct hammer_ioc_mrecord_head *pickup, 1529 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1530 { 1531 struct hammer_ioc_pseudofs_rw pfs; 1532 struct hammer_pseudofs_data pfsd; 1533 hammer_ioc_mrecord_any_t mrec; 1534 int error; 1535 1536 /* 1537 * Get the PFSD info from the target filesystem. 1538 */ 1539 bzero(&pfs, sizeof(pfs)); 1540 bzero(&pfsd, sizeof(pfsd)); 1541 pfs.pfs_id = pfs_id; 1542 pfs.ondisk = &pfsd; 1543 pfs.bytes = sizeof(pfsd); 1544 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1545 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 1546 exit(1); 1547 } 1548 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1549 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n"); 1550 exit(1); 1551 } 1552 1553 mrec = read_mrecord(fdin, &error, pickup); 1554 if (mrec == NULL) { 1555 if (error == 0) 1556 fprintf(stderr, "validate_mrec_header: short read\n"); 1557 exit(1); 1558 } 1559 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1560 free(mrec); 1561 return(-1); 1562 } 1563 1564 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1565 fprintf(stderr, "validate_mrec_header: did not get expected " 1566 "PFSD record type\n"); 1567 exit(1); 1568 } 1569 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1570 fprintf(stderr, "validate_mrec_header: unexpected payload " 1571 "size\n"); 1572 exit(1); 1573 } 1574 if (mrec->pfs.version != pfs.version) { 1575 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 1576 exit(1); 1577 } 1578 1579 /* 1580 * Whew. Ok, is the read PFS info compatible with the target? 1581 */ 1582 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1583 sizeof(pfsd.shared_uuid)) != 0) { 1584 fprintf(stderr, 1585 "mirror-write: source and target have " 1586 "different shared-uuid's!\n"); 1587 exit(1); 1588 } 1589 if (is_target && 1590 (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) { 1591 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1592 exit(1); 1593 } 1594 if (tid_begp) 1595 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1596 if (tid_endp) 1597 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1598 free(mrec); 1599 return(0); 1600 } 1601 1602 static void 1603 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1604 { 1605 struct hammer_ioc_pseudofs_rw pfs; 1606 struct hammer_pseudofs_data pfsd; 1607 1608 bzero(&pfs, sizeof(pfs)); 1609 bzero(&pfsd, sizeof(pfsd)); 1610 pfs.pfs_id = pfs_id; 1611 pfs.ondisk = &pfsd; 1612 pfs.bytes = sizeof(pfsd); 1613 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1614 perror("update_pfs_snapshot (read)"); 1615 exit(1); 1616 } 1617 if (pfsd.sync_end_tid != snapshot_tid) { 1618 pfsd.sync_end_tid = snapshot_tid; 1619 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1620 perror("update_pfs_snapshot (rewrite)"); 1621 exit(1); 1622 } 1623 if (VerboseOpt >= 2) { 1624 fprintf(stderr, 1625 "Mirror-write: Completed, updated snapshot " 1626 "to %016jx\n", 1627 (uintmax_t)snapshot_tid); 1628 fflush(stderr); 1629 } 1630 } 1631 } 1632 1633 /* 1634 * Bandwidth-limited write in chunks 1635 */ 1636 static 1637 ssize_t 1638 writebw(int fd, const void *buf, size_t nbytes, 1639 u_int64_t *bwcount, struct timeval *tv1) 1640 { 1641 struct timeval tv2; 1642 size_t n; 1643 ssize_t r; 1644 ssize_t a; 1645 int usec; 1646 1647 a = 0; 1648 r = 0; 1649 while (nbytes) { 1650 if (*bwcount + nbytes > BandwidthOpt) 1651 n = BandwidthOpt - *bwcount; 1652 else 1653 n = nbytes; 1654 if (n) 1655 r = write(fd, buf, n); 1656 if (r >= 0) { 1657 a += r; 1658 nbytes -= r; 1659 buf = (const char *)buf + r; 1660 } 1661 if ((size_t)r != n) 1662 break; 1663 *bwcount += n; 1664 if (*bwcount >= BandwidthOpt) { 1665 gettimeofday(&tv2, NULL); 1666 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1667 (int)(tv2.tv_usec - tv1->tv_usec); 1668 if (usec >= 0 && usec < 1000000) 1669 usleep(1000000 - usec); 1670 gettimeofday(tv1, NULL); 1671 *bwcount -= BandwidthOpt; 1672 } 1673 } 1674 return(a ? a : r); 1675 } 1676 1677 /* 1678 * Get a yes or no answer from the terminal. The program may be run as 1679 * part of a two-way pipe so we cannot use stdin for this operation. 1680 */ 1681 static int 1682 getyn(void) 1683 { 1684 char buf[256]; 1685 FILE *fp; 1686 int result; 1687 1688 fp = fopen("/dev/tty", "r"); 1689 if (fp == NULL) { 1690 fprintf(stderr, "No terminal for response\n"); 1691 return(-1); 1692 } 1693 result = -1; 1694 while (fgets(buf, sizeof(buf), fp) != NULL) { 1695 if (buf[0] == 'y' || buf[0] == 'Y') { 1696 result = 1; 1697 break; 1698 } 1699 if (buf[0] == 'n' || buf[0] == 'N') { 1700 result = 0; 1701 break; 1702 } 1703 fprintf(stderr, "Response not understood\n"); 1704 break; 1705 } 1706 fclose(fp); 1707 return(result); 1708 } 1709 1710 static void 1711 mirror_usage(int code) 1712 { 1713 fprintf(stderr, 1714 "hammer mirror-read <filesystem> [begin-tid]\n" 1715 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1716 "hammer mirror-write <filesystem>\n" 1717 "hammer mirror-dump [header]\n" 1718 "hammer mirror-copy [[user@]host:]<filesystem>" 1719 " [[user@]host:]<filesystem>\n" 1720 "hammer mirror-stream [[user@]host:]<filesystem>" 1721 " [[user@]host:]<filesystem>\n" 1722 ); 1723 exit(code); 1724 } 1725 1726