1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 35 #include "hammer.h" 36 37 #define LINE1 0,20 38 #define LINE2 20,78 39 #define LINE3 90,70 40 41 #define SERIALBUF_SIZE (512 * 1024) 42 43 typedef struct histogram { 44 hammer_tid_t tid; 45 uint64_t bytes; 46 } *histogram_t; 47 48 const char *ScoreBoardFile; 49 const char *RestrictTarget; 50 51 static int read_mrecords(int fd, char *buf, u_int size, 52 hammer_ioc_mrecord_head_t pickup); 53 static int generate_histogram(int fd, const char *filesystem, 54 histogram_t *histogram_ary, 55 struct hammer_ioc_mirror_rw *mirror_base, 56 int *repeatp); 57 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 58 hammer_ioc_mrecord_head_t pickup); 59 static void write_mrecord(int fdout, uint32_t type, 60 hammer_ioc_mrecord_any_t mrec, int bytes); 61 static void generate_mrec_header(int fd, int pfs_id, 62 union hammer_ioc_mrecord_any *mrec_tmp); 63 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 64 struct hammer_ioc_mrecord_head *pickup, 65 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 66 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 67 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 68 uint64_t *bwcount, struct timeval *tv1); 69 static int getyntty(void); 70 static void score_printf(size_t i, size_t w, const char *ctl, ...); 71 static void hammer_check_restrict(const char *filesystem); 72 static void mirror_usage(int code); 73 74 /* 75 * Generate a mirroring data stream from the specific source over the 76 * entire key range, but restricted to the specified transaction range. 77 * 78 * The HAMMER VFS does most of the work, we add a few new mrecord 79 * types to negotiate the TID ranges and verify that the entire 80 * stream made it to the destination. 81 * 82 * streaming will be 0 for mirror-read, 1 for mirror-stream. The code will 83 * set up a fake value of -1 when running the histogram for mirror-read. 84 */ 85 void 86 hammer_cmd_mirror_read(char **av, int ac, int streaming) 87 { 88 struct hammer_ioc_mirror_rw mirror; 89 struct hammer_ioc_pseudofs_rw pfs; 90 union hammer_ioc_mrecord_any mrec_tmp; 91 struct hammer_ioc_mrecord_head pickup; 92 hammer_ioc_mrecord_any_t mrec; 93 hammer_tid_t sync_tid; 94 histogram_t histogram_ary; 95 const char *filesystem; 96 char *buf = malloc(SERIALBUF_SIZE); 97 int interrupted = 0; 98 int error; 99 int fd; 100 int n; 101 int didwork; 102 int histogram; 103 int histindex; 104 int histmax; 105 int repeat = 0; 106 int sameline; 107 int64_t total_bytes; 108 time_t base_t = time(NULL); 109 struct timeval bwtv; 110 uint64_t bwcount; 111 uint64_t estbytes; 112 113 if (ac == 0 || ac > 2) 114 mirror_usage(1); 115 filesystem = av[0]; 116 hammer_check_restrict(filesystem); 117 118 pickup.signature = 0; 119 pickup.type = 0; 120 histogram = 0; 121 histindex = 0; 122 histmax = 0; 123 histogram_ary = NULL; 124 sameline = 0; 125 126 again: 127 bzero(&mirror, sizeof(mirror)); 128 hammer_key_beg_init(&mirror.key_beg); 129 hammer_key_end_init(&mirror.key_end); 130 131 fd = getpfs(&pfs, filesystem); 132 133 if (streaming >= 0) 134 score_printf(LINE1, "Running"); 135 136 if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) { 137 fprintf(stderr, "%cRunning \b\b", (sameline ? '\r' : '\n')); 138 fflush(stderr); 139 sameline = 1; 140 } 141 sameline = 1; 142 total_bytes = 0; 143 gettimeofday(&bwtv, NULL); 144 bwcount = 0; 145 146 /* 147 * Send initial header for the purpose of determining the 148 * shared-uuid. 149 */ 150 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 151 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 152 &mrec_tmp, sizeof(mrec_tmp.pfs)); 153 154 /* 155 * In 2-way mode the target will send us a PFS info packet 156 * first. Use the target's current snapshot TID as our default 157 * begin TID. 158 */ 159 if (TwoWayPipeOpt) { 160 mirror.tid_beg = 0; 161 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 162 NULL, &mirror.tid_beg); 163 if (n < 0) { /* got TERM record */ 164 relpfs(fd, &pfs); 165 free(buf); 166 free(histogram_ary); 167 return; 168 } 169 ++mirror.tid_beg; 170 } else if (streaming && histogram) { 171 mirror.tid_beg = histogram_ary[histindex].tid + 1; 172 } else { 173 mirror.tid_beg = 0; 174 } 175 176 /* 177 * Write out the PFS header, tid_beg will be updated if our PFS 178 * has a larger begin sync. tid_end is set to the latest source 179 * TID whos flush cycle has completed. 180 */ 181 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 182 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 183 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 184 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 185 mirror.ubuf = buf; 186 mirror.size = SERIALBUF_SIZE; 187 mirror.pfs_id = pfs.pfs_id; 188 mirror.shared_uuid = pfs.ondisk->shared_uuid; 189 190 /* 191 * XXX If the histogram is exhausted and the TID delta is large 192 * the stream might have been offline for a while and is 193 * now picking it up again. Do another histogram. 194 */ 195 #if 0 196 if (streaming && histogram && histindex == histend) { 197 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM) 198 histogram = 0; 199 } 200 #endif 201 202 /* 203 * Initial bulk startup control, try to do some incremental 204 * mirroring in order to allow the stream to be killed and 205 * restarted without having to start over. 206 */ 207 if (histogram == 0 && BulkOpt == 0) { 208 if (VerboseOpt && repeat == 0) { 209 fprintf(stderr, "\n"); 210 sameline = 0; 211 } 212 histmax = generate_histogram(fd, filesystem, 213 &histogram_ary, &mirror, 214 &repeat); 215 histindex = 0; 216 histogram = 1; 217 218 /* 219 * Just stream the histogram, then stop 220 */ 221 if (streaming == 0) 222 streaming = -1; 223 } 224 225 if (streaming && histogram) { 226 ++histindex; 227 mirror.tid_end = histogram_ary[histindex].tid; 228 estbytes = histogram_ary[histindex-1].bytes; 229 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end; 230 } else { 231 estbytes = 0; 232 } 233 234 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 235 &mrec_tmp, sizeof(mrec_tmp.pfs)); 236 237 /* 238 * A cycle file overrides the beginning TID only if we are 239 * not operating in two-way or histogram mode. 240 */ 241 if (TwoWayPipeOpt == 0 && histogram == 0) { 242 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 243 } 244 245 /* 246 * An additional argument overrides the beginning TID regardless 247 * of what mode we are in. This is not recommending if operating 248 * in two-way mode. 249 */ 250 if (ac == 2) 251 mirror.tid_beg = strtoull(av[1], NULL, 0); 252 253 if (streaming == 0 || VerboseOpt >= 2) { 254 fprintf(stderr, 255 "Mirror-read: Mirror %016jx to %016jx", 256 (uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end); 257 if (histogram) 258 fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes); 259 fprintf(stderr, "\n"); 260 fflush(stderr); 261 } 262 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 263 fprintf(stderr, "Mirror-read: Resuming at object %016jx\n", 264 (uintmax_t)mirror.key_beg.obj_id); 265 } 266 267 /* 268 * Nothing to do if begin equals end. 269 */ 270 if (mirror.tid_beg >= mirror.tid_end) { 271 if (streaming == 0 || VerboseOpt >= 2) 272 fprintf(stderr, "Mirror-read: No work to do\n"); 273 sleep(DelayOpt); 274 didwork = 0; 275 histogram = 0; 276 goto done; 277 } 278 didwork = 1; 279 280 /* 281 * Write out bulk records 282 */ 283 mirror.ubuf = buf; 284 mirror.size = SERIALBUF_SIZE; 285 286 do { 287 mirror.count = 0; 288 mirror.pfs_id = pfs.pfs_id; 289 mirror.shared_uuid = pfs.ondisk->shared_uuid; 290 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 291 score_printf(LINE3, "Mirror-read %s failed: %s", 292 filesystem, strerror(errno)); 293 err(1, "Mirror-read %s failed", filesystem); 294 } 295 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 296 score_printf(LINE3, "Mirror-read %s fatal error %d", 297 filesystem, mirror.head.error); 298 errx(1, "Mirror-read %s fatal error %d", 299 filesystem, mirror.head.error); 300 } 301 if (mirror.count) { 302 if (BandwidthOpt) { 303 n = writebw(1, mirror.ubuf, mirror.count, 304 &bwcount, &bwtv); 305 } else { 306 n = write(1, mirror.ubuf, mirror.count); 307 } 308 if (n != mirror.count) { 309 score_printf(LINE3, 310 "Mirror-read %s failed: " 311 "short write", 312 filesystem); 313 errx(1, "Mirror-read %s failed: short write", 314 filesystem); 315 } 316 } 317 total_bytes += mirror.count; 318 if (streaming && VerboseOpt) { 319 fprintf(stderr, 320 "\rscan obj=%016jx tids=%016jx:%016jx %11jd", 321 (uintmax_t)mirror.key_cur.obj_id, 322 (uintmax_t)mirror.tid_beg, 323 (uintmax_t)mirror.tid_end, 324 (intmax_t)total_bytes); 325 fflush(stderr); 326 sameline = 0; 327 } else if (streaming) { 328 score_printf(LINE2, 329 "obj=%016jx tids=%016jx:%016jx %11jd", 330 (uintmax_t)mirror.key_cur.obj_id, 331 (uintmax_t)mirror.tid_beg, 332 (uintmax_t)mirror.tid_end, 333 (intmax_t)total_bytes); 334 } 335 mirror.key_beg = mirror.key_cur; 336 337 /* 338 * Deal with time limit option 339 */ 340 if (TimeoutOpt && 341 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 342 score_printf(LINE3, 343 "Mirror-read %s interrupted by timer at" 344 " %016jx", 345 filesystem, 346 (uintmax_t)mirror.key_cur.obj_id); 347 fprintf(stderr, 348 "Mirror-read %s interrupted by timer at" 349 " %016jx\n", 350 filesystem, 351 (uintmax_t)mirror.key_cur.obj_id); 352 interrupted = 1; 353 break; 354 } 355 } while (mirror.count != 0); 356 357 done: 358 if (streaming && VerboseOpt && sameline == 0) { 359 fprintf(stderr, "\n"); 360 fflush(stderr); 361 sameline = 1; 362 } 363 364 /* 365 * Write out the termination sync record - only if not interrupted 366 */ 367 if (interrupted == 0) { 368 if (didwork) { 369 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 370 &mrec_tmp, sizeof(mrec_tmp.sync)); 371 } else { 372 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 373 &mrec_tmp, sizeof(mrec_tmp.sync)); 374 } 375 } 376 377 /* 378 * If the -2 option was given (automatic when doing mirror-copy), 379 * a two-way pipe is assumed and we expect a response mrec from 380 * the target. 381 */ 382 if (TwoWayPipeOpt) { 383 mrec = read_mrecord(0, &error, &pickup); 384 if (mrec == NULL || 385 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 386 mrec->head.rec_size != sizeof(mrec->update)) { 387 errx(1, "mirror_read: Did not get final " 388 "acknowledgement packet from target"); 389 } 390 if (interrupted) { 391 if (CyclePath) { 392 hammer_set_cycle(&mirror.key_cur, 393 mirror.tid_beg); 394 fprintf(stderr, "Cyclefile %s updated for " 395 "continuation\n", CyclePath); 396 } 397 } else { 398 sync_tid = mrec->update.tid; 399 if (CyclePath) { 400 hammer_key_beg_init(&mirror.key_beg); 401 hammer_set_cycle(&mirror.key_beg, sync_tid); 402 fprintf(stderr, 403 "Cyclefile %s updated to 0x%016jx\n", 404 CyclePath, (uintmax_t)sync_tid); 405 } 406 } 407 free(mrec); 408 } else if (CyclePath) { 409 /* NOTE! mirror.tid_beg cannot be updated */ 410 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 411 "fully updated unless you use mirror-copy\n"); 412 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 413 } 414 if (streaming && interrupted == 0) { 415 time_t t1 = time(NULL); 416 time_t t2; 417 418 /* 419 * Try to break down large bulk transfers into smaller ones 420 * so it can sync the transaction id on the slave. This 421 * way if we get interrupted a restart doesn't have to 422 * start from scratch. 423 */ 424 if (streaming && histogram) { 425 if (histindex != histmax) { 426 if (VerboseOpt && VerboseOpt < 2 && 427 streaming >= 0) { 428 fprintf(stderr, " (bulk incremental)"); 429 } 430 relpfs(fd, &pfs); 431 goto again; 432 } 433 } 434 435 if (VerboseOpt && streaming >= 0) { 436 fprintf(stderr, " W"); 437 fflush(stderr); 438 } else if (streaming >= 0) { 439 score_printf(LINE1, "Waiting"); 440 } 441 pfs.ondisk->sync_end_tid = mirror.tid_end; 442 if (streaming < 0) { 443 /* 444 * Fake streaming mode when using a histogram to 445 * break up a mirror-read, do not wait on source. 446 */ 447 streaming = 0; 448 } else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 449 score_printf(LINE3, 450 "Mirror-read %s: cannot stream: %s\n", 451 filesystem, strerror(errno)); 452 fprintf(stderr, 453 "Mirror-read %s: cannot stream: %s\n", 454 filesystem, strerror(errno)); 455 } else { 456 t2 = time(NULL) - t1; 457 if (t2 >= 0 && t2 < DelayOpt) { 458 if (VerboseOpt) { 459 fprintf(stderr, "\bD"); 460 fflush(stderr); 461 } 462 sleep(DelayOpt - t2); 463 } 464 if (VerboseOpt) { 465 fprintf(stderr, "\b "); 466 fflush(stderr); 467 } 468 relpfs(fd, &pfs); 469 goto again; 470 } 471 } 472 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 473 &mrec_tmp, sizeof(mrec_tmp.sync)); 474 relpfs(fd, &pfs); 475 free(buf); 476 free(histogram_ary); 477 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 478 } 479 480 /* 481 * What we are trying to do here is figure out how much data is 482 * going to be sent for the TID range and to break the TID range 483 * down into reasonably-sized slices (from the point of view of 484 * data sent) so a lost connection can restart at a reasonable 485 * place and not all the way back at the beginning. 486 * 487 * An entry's TID serves as the end_tid for the prior entry 488 * So we have to offset the calculation by 1 so that TID falls into 489 * the previous entry when populating entries. 490 * 491 * Because the transaction id space is bursty we need a relatively 492 * large number of buckets (like a million) to do a reasonable job 493 * for things like an initial bulk mirrors on a very large filesystem. 494 */ 495 #define HIST_COUNT (1024 * 1024) 496 497 static int 498 generate_histogram(int fd, const char *filesystem, 499 histogram_t *histogram_ary, 500 struct hammer_ioc_mirror_rw *mirror_base, 501 int *repeatp) 502 { 503 struct hammer_ioc_mirror_rw mirror; 504 union hammer_ioc_mrecord_any *mrec; 505 hammer_tid_t tid_beg; 506 hammer_tid_t tid_end; 507 hammer_tid_t tid; 508 hammer_tid_t tidx; 509 uint64_t *tid_bytes; 510 uint64_t total; 511 uint64_t accum; 512 int chunkno; 513 int i; 514 int res; 515 int off; 516 int len; 517 518 mirror = *mirror_base; 519 tid_beg = mirror.tid_beg; 520 tid_end = mirror.tid_end; 521 mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA; 522 523 if (*histogram_ary == NULL) { 524 *histogram_ary = malloc(sizeof(struct histogram) * 525 (HIST_COUNT + 2)); 526 } 527 if (tid_beg >= tid_end) 528 return(0); 529 530 /* needs 2 extra */ 531 tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2)); 532 bzero(tid_bytes, sizeof(*tid_bytes) * (HIST_COUNT + 2)); 533 534 if (*repeatp == 0) { 535 fprintf(stderr, "Prescan to break up bulk transfer"); 536 if (VerboseOpt > 1) 537 fprintf(stderr, " (%juMB chunks)", 538 (uintmax_t)(SplitupOpt / (1024 * 1024))); 539 fprintf(stderr, "\n"); 540 } 541 542 /* 543 * Note: (tid_beg,tid_end), range is inclusive of both beg & end. 544 * 545 * Note: Estimates can be off when the mirror is way behind due 546 * to skips. 547 */ 548 total = 0; 549 accum = 0; 550 chunkno = 0; 551 for (;;) { 552 mirror.count = 0; 553 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) 554 err(1, "Mirror-read %s failed", filesystem); 555 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) 556 errx(1, "Mirror-read %s fatal error %d", 557 filesystem, mirror.head.error); 558 for (off = 0; 559 off < mirror.count; 560 off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size)) { 561 mrec = (void *)((char *)mirror.ubuf + off); 562 563 /* 564 * We only care about general RECs and PASS 565 * records. We ignore SKIPs. 566 */ 567 switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) { 568 case HAMMER_MREC_TYPE_REC: 569 case HAMMER_MREC_TYPE_PASS: 570 break; 571 default: 572 continue; 573 } 574 575 /* 576 * Calculate for two indices, create_tid and 577 * delete_tid. Record data only applies to 578 * the create_tid. 579 * 580 * When tid is exactly on the boundary it really 581 * belongs to the previous entry because scans 582 * are inclusive of the ending entry. 583 */ 584 tid = mrec->rec.leaf.base.delete_tid; 585 if (tid && tid >= tid_beg && tid <= tid_end) { 586 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 587 if (mrec->head.type == 588 HAMMER_MREC_TYPE_REC) { 589 len -= HAMMER_HEAD_DOALIGN( 590 mrec->rec.leaf.data_len); 591 assert(len > 0); 592 } 593 i = (tid - tid_beg) * HIST_COUNT / 594 (tid_end - tid_beg); 595 tidx = tid_beg + i * (tid_end - tid_beg) / 596 HIST_COUNT; 597 if (tid == tidx && i) 598 --i; 599 assert(i >= 0 && i < HIST_COUNT); 600 tid_bytes[i] += len; 601 total += len; 602 accum += len; 603 } 604 605 tid = mrec->rec.leaf.base.create_tid; 606 if (tid && tid >= tid_beg && tid <= tid_end) { 607 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 608 if (mrec->head.type == 609 HAMMER_MREC_TYPE_REC_NODATA) { 610 len += HAMMER_HEAD_DOALIGN( 611 mrec->rec.leaf.data_len); 612 } 613 i = (tid - tid_beg) * HIST_COUNT / 614 (tid_end - tid_beg); 615 tidx = tid_beg + i * (tid_end - tid_beg) / 616 HIST_COUNT; 617 if (tid == tidx && i) 618 --i; 619 assert(i >= 0 && i < HIST_COUNT); 620 tid_bytes[i] += len; 621 total += len; 622 accum += len; 623 } 624 } 625 if (*repeatp == 0 && accum > SplitupOpt) { 626 if (VerboseOpt > 1) { 627 fprintf(stderr, "."); 628 fflush(stderr); 629 } 630 ++chunkno; 631 score_printf(LINE2, "Prescan chunk %d", chunkno); 632 accum = 0; 633 } 634 if (mirror.count == 0) 635 break; 636 mirror.key_beg = mirror.key_cur; 637 } 638 639 /* 640 * Reduce to SplitupOpt (default 4GB) chunks. This code may 641 * use up to two additional elements. Do the array in-place. 642 * 643 * Inefficient degenerate cases can occur if we do not accumulate 644 * at least the requested split amount, so error on the side of 645 * going over a bit. 646 */ 647 res = 0; 648 (*histogram_ary)[res].tid = tid_beg; 649 (*histogram_ary)[res].bytes = tid_bytes[0]; 650 for (i = 1; i < HIST_COUNT; ++i) { 651 if ((*histogram_ary)[res].bytes >= SplitupOpt) { 652 ++res; 653 (*histogram_ary)[res].tid = tid_beg + 654 i * (tid_end - tid_beg) / 655 HIST_COUNT; 656 (*histogram_ary)[res].bytes = 0; 657 658 } 659 (*histogram_ary)[res].bytes += tid_bytes[i]; 660 } 661 ++res; 662 (*histogram_ary)[res].tid = tid_end; 663 (*histogram_ary)[res].bytes = -1; 664 665 if (*repeatp == 0) { 666 if (VerboseOpt > 1) 667 fprintf(stderr, "\n"); /* newline after ... */ 668 score_printf(LINE3, "Prescan %d chunks, total %ju MBytes", 669 res, (uintmax_t)total / (1024 * 1024)); 670 fprintf(stderr, "Prescan %d chunks, total %ju MBytes (", 671 res, (uintmax_t)total / (1024 * 1024)); 672 for (i = 0; i < res && i < 3; ++i) { 673 if (i) 674 fprintf(stderr, ", "); 675 fprintf(stderr, "%ju", 676 (uintmax_t)(*histogram_ary)[i].bytes); 677 } 678 if (i < res) 679 fprintf(stderr, ", ..."); 680 fprintf(stderr, ")\n"); 681 } 682 assert(res <= HIST_COUNT); 683 *repeatp = 1; 684 685 free(tid_bytes); 686 return(res); 687 } 688 689 static void 690 create_pfs(const char *filesystem, uuid_t *s_uuid) 691 { 692 if (ForceYesOpt == 1) { 693 fprintf(stderr, "PFS slave %s does not exist. " 694 "Auto create new slave PFS!\n", filesystem); 695 696 } else { 697 fprintf(stderr, "PFS slave %s does not exist.\n" 698 "Do you want to create a new slave PFS? [y/n] ", 699 filesystem); 700 fflush(stderr); 701 if (getyntty() != 1) 702 errx(1, "Aborting operation"); 703 } 704 705 uint32_t status; 706 char *shared_uuid = NULL; 707 uuid_to_string(s_uuid, &shared_uuid, &status); 708 709 char *cmd = NULL; 710 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 711 filesystem, shared_uuid); 712 free(shared_uuid); 713 714 if (cmd == NULL) { 715 errx(1, "Failed to alloc memory"); 716 } 717 if (system(cmd) != 0) { 718 fprintf(stderr, "Failed to create PFS\n"); 719 } 720 free(cmd); 721 } 722 723 /* 724 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 725 * some additional packet types to negotiate TID ranges and to verify 726 * completion. The HAMMER VFS does most of the work. 727 * 728 * It is important to note that the mirror.key_{beg,end} range must 729 * match the ranged used by the original. For now both sides use 730 * range the entire key space. 731 * 732 * It is even more important that the records in the stream conform 733 * to the TID range also supplied in the stream. The HAMMER VFS will 734 * use the REC, PASS, and SKIP record types to track the portions of 735 * the B-Tree being scanned in order to be able to proactively delete 736 * records on the target within those active areas that are not mentioned 737 * by the source. 738 * 739 * The mirror.key_cur field is used by the VFS to do this tracking. It 740 * must be initialized to key_beg but then is persistently updated by 741 * the HAMMER VFS on each successive ioctl() call. If you blow up this 742 * field you will blow up the mirror target, possibly to the point of 743 * deleting everything. As a safety measure the HAMMER VFS simply marks 744 * the records that the source has destroyed as deleted on the target, 745 * and normal pruning operations will deal with their final disposition 746 * at some later time. 747 */ 748 void 749 hammer_cmd_mirror_write(char **av, int ac) 750 { 751 struct hammer_ioc_mirror_rw mirror; 752 const char *filesystem; 753 char *buf = malloc(SERIALBUF_SIZE); 754 struct hammer_ioc_pseudofs_rw pfs; 755 struct hammer_ioc_mrecord_head pickup; 756 struct hammer_ioc_synctid synctid; 757 union hammer_ioc_mrecord_any mrec_tmp; 758 hammer_ioc_mrecord_any_t mrec; 759 struct stat st; 760 int error; 761 int fd; 762 int n; 763 764 if (ac != 1) 765 mirror_usage(1); 766 filesystem = av[0]; 767 hammer_check_restrict(filesystem); 768 769 pickup.signature = 0; 770 pickup.type = 0; 771 772 again: 773 bzero(&mirror, sizeof(mirror)); 774 hammer_key_beg_init(&mirror.key_beg); 775 hammer_key_end_init(&mirror.key_end); 776 mirror.key_end = mirror.key_beg; 777 778 /* 779 * Read initial packet 780 */ 781 mrec = read_mrecord(0, &error, &pickup); 782 if (mrec == NULL) { 783 if (error == 0) 784 errx(1, "validate_mrec_header: short read"); 785 exit(1); 786 } 787 /* 788 * Validate packet 789 */ 790 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 791 free(buf); 792 return; 793 } 794 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) 795 errx(1, "validate_mrec_header: did not get expected " 796 "PFSD record type"); 797 if (mrec->head.rec_size != sizeof(mrec->pfs)) 798 errx(1, "validate_mrec_header: unexpected payload size"); 799 800 /* 801 * Create slave PFS if it doesn't yet exist 802 */ 803 if (lstat(filesystem, &st) != 0) { 804 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 805 } 806 free(mrec); 807 mrec = NULL; 808 809 fd = getpfs(&pfs, filesystem); 810 811 /* 812 * In two-way mode the target writes out a PFS packet first. 813 * The source uses our tid_end as its tid_beg by default, 814 * picking up where it left off. 815 */ 816 mirror.tid_beg = 0; 817 if (TwoWayPipeOpt) { 818 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 819 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 820 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 821 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 822 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 823 &mrec_tmp, sizeof(mrec_tmp.pfs)); 824 } 825 826 /* 827 * Read and process the PFS header. The source informs us of 828 * the TID range the stream represents. 829 */ 830 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 831 &mirror.tid_beg, &mirror.tid_end); 832 if (n < 0) { /* got TERM record */ 833 relpfs(fd, &pfs); 834 free(buf); 835 return; 836 } 837 838 mirror.ubuf = buf; 839 mirror.size = SERIALBUF_SIZE; 840 841 /* 842 * Read and process bulk records (REC, PASS, and SKIP types). 843 * 844 * On your life, do NOT mess with mirror.key_cur or your mirror 845 * target may become history. 846 */ 847 for (;;) { 848 mirror.count = 0; 849 mirror.pfs_id = pfs.pfs_id; 850 mirror.shared_uuid = pfs.ondisk->shared_uuid; 851 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 852 if (mirror.size <= 0) 853 break; 854 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) 855 err(1, "Mirror-write %s failed", filesystem); 856 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) 857 errx(1, "Mirror-write %s fatal error %d", 858 filesystem, mirror.head.error); 859 #if 0 860 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 861 errx(1, "Mirror-write %s interrupted by timer at" 862 " %016llx", 863 filesystem, 864 mirror.key_cur.obj_id); 865 } 866 #endif 867 } 868 869 /* 870 * Read and process the termination sync record. 871 */ 872 mrec = read_mrecord(0, &error, &pickup); 873 874 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 875 fprintf(stderr, "Mirror-write: received termination request\n"); 876 relpfs(fd, &pfs); 877 free(mrec); 878 free(buf); 879 return; 880 } 881 882 if (mrec == NULL || 883 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 884 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 885 mrec->head.rec_size != sizeof(mrec->sync)) { 886 errx(1, "Mirror-write %s: Did not get termination " 887 "sync record, or rec_size is wrong rt=%d", 888 filesystem, (mrec ? (int)mrec->head.type : -1)); 889 } 890 891 /* 892 * Update the PFS info on the target so the user has visibility 893 * into the new snapshot, and sync the target filesystem. 894 */ 895 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 896 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 897 898 bzero(&synctid, sizeof(synctid)); 899 synctid.op = HAMMER_SYNCTID_SYNC2; 900 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 901 902 if (VerboseOpt >= 2) { 903 fprintf(stderr, "Mirror-write %s: succeeded\n", 904 filesystem); 905 } 906 } 907 908 free(mrec); 909 mrec = NULL; 910 911 /* 912 * Report back to the originator. 913 */ 914 if (TwoWayPipeOpt) { 915 mrec_tmp.update.tid = mirror.tid_end; 916 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 917 &mrec_tmp, sizeof(mrec_tmp.update)); 918 } else { 919 printf("Source can update synctid to 0x%016jx\n", 920 (uintmax_t)mirror.tid_end); 921 } 922 relpfs(fd, &pfs); 923 goto again; 924 } 925 926 void 927 hammer_cmd_mirror_dump(char **av, int ac) 928 { 929 char *buf = malloc(SERIALBUF_SIZE); 930 struct hammer_ioc_mrecord_head pickup; 931 hammer_ioc_mrecord_any_t mrec; 932 int error; 933 int size; 934 int offset; 935 int bytes; 936 int header_only = 0; 937 938 if (ac == 1 && strcmp(*av, "header") == 0) 939 header_only = 1; 940 else if (ac != 0) 941 mirror_usage(1); 942 943 /* 944 * Read and process the PFS header 945 */ 946 pickup.signature = 0; 947 pickup.type = 0; 948 949 mrec = read_mrecord(0, &error, &pickup); 950 951 /* 952 * Dump the PFS header. mirror-dump takes its input from the output 953 * of a mirror-read so getpfs() can't be used to get a fd to be passed 954 * to dump_pfsd(). 955 */ 956 if (header_only && mrec != NULL) { 957 dump_pfsd(&mrec->pfs.pfsd, -1); 958 free(mrec); 959 free(buf); 960 return; 961 } 962 free(mrec); 963 964 again: 965 /* 966 * Read and process bulk records 967 */ 968 for (;;) { 969 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 970 if (size <= 0) 971 break; 972 offset = 0; 973 while (offset < size) { 974 mrec = (void *)((char *)buf + offset); 975 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 976 if (offset + bytes > size) 977 errx(1, "Misaligned record"); 978 979 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 980 case HAMMER_MREC_TYPE_REC_BADCRC: 981 case HAMMER_MREC_TYPE_REC: 982 printf("Record lo=%08x obj=%016jx key=%016jx " 983 "rt=%02x ot=%02x", 984 mrec->rec.leaf.base.localization, 985 (uintmax_t)mrec->rec.leaf.base.obj_id, 986 (uintmax_t)mrec->rec.leaf.base.key, 987 mrec->rec.leaf.base.rec_type, 988 mrec->rec.leaf.base.obj_type); 989 if (mrec->head.type == 990 HAMMER_MREC_TYPE_REC_BADCRC) { 991 printf(" (BAD CRC)"); 992 } 993 printf("\n"); 994 printf(" tids %016jx:%016jx data=%d\n", 995 (uintmax_t)mrec->rec.leaf.base.create_tid, 996 (uintmax_t)mrec->rec.leaf.base.delete_tid, 997 mrec->rec.leaf.data_len); 998 break; 999 case HAMMER_MREC_TYPE_PASS: 1000 printf("Pass lo=%08x obj=%016jx key=%016jx " 1001 "rt=%02x ot=%02x\n", 1002 mrec->rec.leaf.base.localization, 1003 (uintmax_t)mrec->rec.leaf.base.obj_id, 1004 (uintmax_t)mrec->rec.leaf.base.key, 1005 mrec->rec.leaf.base.rec_type, 1006 mrec->rec.leaf.base.obj_type); 1007 printf(" tids %016jx:%016jx data=%d\n", 1008 (uintmax_t)mrec->rec.leaf.base.create_tid, 1009 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1010 mrec->rec.leaf.data_len); 1011 break; 1012 case HAMMER_MREC_TYPE_SKIP: 1013 printf("Skip lo=%08x obj=%016jx key=%016jx rt=%02x to\n" 1014 " lo=%08x obj=%016jx key=%016jx rt=%02x\n", 1015 mrec->skip.skip_beg.localization, 1016 (uintmax_t)mrec->skip.skip_beg.obj_id, 1017 (uintmax_t)mrec->skip.skip_beg.key, 1018 mrec->skip.skip_beg.rec_type, 1019 mrec->skip.skip_end.localization, 1020 (uintmax_t)mrec->skip.skip_end.obj_id, 1021 (uintmax_t)mrec->skip.skip_end.key, 1022 mrec->skip.skip_end.rec_type); 1023 default: 1024 break; 1025 } 1026 offset += bytes; 1027 } 1028 } 1029 1030 /* 1031 * Read and process the termination sync record. 1032 */ 1033 mrec = read_mrecord(0, &error, &pickup); 1034 if (mrec == NULL || 1035 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 1036 mrec->head.type != HAMMER_MREC_TYPE_IDLE)) { 1037 fprintf(stderr, "Mirror-dump: Did not get termination " 1038 "sync record\n"); 1039 } 1040 free(mrec); 1041 1042 /* 1043 * Continue with more batches until EOF. 1044 */ 1045 mrec = read_mrecord(0, &error, &pickup); 1046 if (mrec) { 1047 free(mrec); 1048 goto again; 1049 } 1050 free(buf); 1051 } 1052 1053 void 1054 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 1055 { 1056 pid_t pid1; 1057 pid_t pid2; 1058 int fds[2]; 1059 const char *xav[32]; 1060 char tbuf[16]; 1061 char *sh, *user, *host, *rfs; 1062 int xac; 1063 1064 if (ac != 2) 1065 mirror_usage(1); 1066 1067 TwoWayPipeOpt = 1; 1068 signal(SIGPIPE, SIG_IGN); 1069 1070 again: 1071 if (pipe(fds) < 0) 1072 err(1, "pipe"); 1073 1074 /* 1075 * Source 1076 */ 1077 if ((pid1 = fork()) == 0) { 1078 signal(SIGPIPE, SIG_DFL); 1079 dup2(fds[0], 0); 1080 dup2(fds[0], 1); 1081 close(fds[0]); 1082 close(fds[1]); 1083 if ((rfs = strchr(av[0], ':')) != NULL) { 1084 xac = 0; 1085 1086 if((sh = getenv("HAMMER_RSH")) == NULL) 1087 xav[xac++] = "ssh"; 1088 else 1089 xav[xac++] = sh; 1090 1091 if (CompressOpt) 1092 xav[xac++] = "-C"; 1093 1094 if ((host = strchr(av[0], '@')) != NULL) { 1095 user = strndup( av[0], (host++ - av[0])); 1096 host = strndup( host, (rfs++ - host)); 1097 xav[xac++] = "-l"; 1098 xav[xac++] = user; 1099 xav[xac++] = host; 1100 } 1101 else { 1102 host = strndup( av[0], (rfs++ - av[0])); 1103 user = NULL; 1104 xav[xac++] = host; 1105 } 1106 1107 1108 if (SshPort) { 1109 xav[xac++] = "-p"; 1110 xav[xac++] = SshPort; 1111 } 1112 1113 xav[xac++] = "hammer"; 1114 1115 switch(VerboseOpt) { 1116 case 0: 1117 break; 1118 case 1: 1119 xav[xac++] = "-v"; 1120 break; 1121 case 2: 1122 xav[xac++] = "-vv"; 1123 break; 1124 default: 1125 xav[xac++] = "-vvv"; 1126 break; 1127 } 1128 if (ForceYesOpt) { 1129 xav[xac++] = "-y"; 1130 } 1131 xav[xac++] = "-2"; 1132 if (TimeoutOpt) { 1133 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 1134 xav[xac++] = "-t"; 1135 xav[xac++] = tbuf; 1136 } 1137 if (SplitupOptStr) { 1138 xav[xac++] = "-S"; 1139 xav[xac++] = SplitupOptStr; 1140 } 1141 if (streaming) 1142 xav[xac++] = "mirror-read-stream"; 1143 else 1144 xav[xac++] = "mirror-read"; 1145 xav[xac++] = rfs; 1146 xav[xac++] = NULL; 1147 execvp(*xav, (void *)xav); 1148 } else { 1149 hammer_cmd_mirror_read(av, 1, streaming); 1150 fflush(stdout); 1151 fflush(stderr); 1152 } 1153 _exit(1); 1154 } 1155 1156 /* 1157 * Target 1158 */ 1159 if ((pid2 = fork()) == 0) { 1160 signal(SIGPIPE, SIG_DFL); 1161 dup2(fds[1], 0); 1162 dup2(fds[1], 1); 1163 close(fds[0]); 1164 close(fds[1]); 1165 if ((rfs = strchr(av[1], ':')) != NULL) { 1166 xac = 0; 1167 1168 if((sh = getenv("HAMMER_RSH")) == NULL) 1169 xav[xac++] = "ssh"; 1170 else 1171 xav[xac++] = sh; 1172 1173 if (CompressOpt) 1174 xav[xac++] = "-C"; 1175 1176 if ((host = strchr(av[1], '@')) != NULL) { 1177 user = strndup( av[1], (host++ - av[1])); 1178 host = strndup( host, (rfs++ - host)); 1179 xav[xac++] = "-l"; 1180 xav[xac++] = user; 1181 xav[xac++] = host; 1182 } 1183 else { 1184 host = strndup( av[1], (rfs++ - av[1])); 1185 user = NULL; 1186 xav[xac++] = host; 1187 } 1188 1189 if (SshPort) { 1190 xav[xac++] = "-p"; 1191 xav[xac++] = SshPort; 1192 } 1193 1194 xav[xac++] = "hammer"; 1195 1196 switch(VerboseOpt) { 1197 case 0: 1198 break; 1199 case 1: 1200 xav[xac++] = "-v"; 1201 break; 1202 case 2: 1203 xav[xac++] = "-vv"; 1204 break; 1205 default: 1206 xav[xac++] = "-vvv"; 1207 break; 1208 } 1209 if (ForceYesOpt) { 1210 xav[xac++] = "-y"; 1211 } 1212 xav[xac++] = "-2"; 1213 xav[xac++] = "mirror-write"; 1214 xav[xac++] = rfs; 1215 xav[xac++] = NULL; 1216 execvp(*xav, (void *)xav); 1217 } else { 1218 hammer_cmd_mirror_write(av + 1, 1); 1219 fflush(stdout); 1220 fflush(stderr); 1221 } 1222 _exit(1); 1223 } 1224 close(fds[0]); 1225 close(fds[1]); 1226 1227 while (waitpid(pid1, NULL, 0) <= 0) 1228 ; 1229 while (waitpid(pid2, NULL, 0) <= 0) 1230 ; 1231 1232 /* 1233 * If the link is lost restart 1234 */ 1235 if (streaming) { 1236 if (VerboseOpt) { 1237 fprintf(stderr, "\nLost Link\n"); 1238 fflush(stderr); 1239 } 1240 sleep(15 + DelayOpt); 1241 goto again; 1242 } 1243 1244 } 1245 1246 /* 1247 * Read and return multiple mrecords 1248 */ 1249 static int 1250 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 1251 { 1252 hammer_ioc_mrecord_any_t mrec; 1253 u_int count; 1254 size_t n; 1255 size_t i; 1256 size_t bytes; 1257 int type; 1258 1259 count = 0; 1260 while (size - count >= HAMMER_MREC_HEADSIZE) { 1261 /* 1262 * Cached the record header in case we run out of buffer 1263 * space. 1264 */ 1265 fflush(stdout); 1266 if (pickup->signature == 0) { 1267 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1268 i = read(fd, (char *)pickup + n, 1269 HAMMER_MREC_HEADSIZE - n); 1270 if (i <= 0) 1271 break; 1272 } 1273 if (n == 0) 1274 break; 1275 if (n != HAMMER_MREC_HEADSIZE) 1276 errx(1, "read_mrecords: short read on pipe"); 1277 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) 1278 errx(1, "read_mrecords: malformed record on pipe, " 1279 "bad signature"); 1280 } 1281 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 1282 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) 1283 errx(1, "read_mrecords: malformed record on pipe, " 1284 "illegal rec_size"); 1285 1286 /* 1287 * Stop if we have insufficient space for the record and data. 1288 */ 1289 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 1290 if (size - count < bytes) 1291 break; 1292 1293 /* 1294 * Stop if the record type is not a REC, SKIP, or PASS, 1295 * which are the only types the ioctl supports. Other types 1296 * are used only by the userland protocol. 1297 * 1298 * Ignore all flags. 1299 */ 1300 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 1301 if (type != HAMMER_MREC_TYPE_PFSD && 1302 type != HAMMER_MREC_TYPE_REC && 1303 type != HAMMER_MREC_TYPE_SKIP && 1304 type != HAMMER_MREC_TYPE_PASS) { 1305 break; 1306 } 1307 1308 /* 1309 * Read the remainder and clear the pickup signature. 1310 */ 1311 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 1312 i = read(fd, buf + count + n, bytes - n); 1313 if (i <= 0) 1314 break; 1315 } 1316 if (n != bytes) 1317 errx(1, "read_mrecords: short read on pipe"); 1318 1319 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 1320 pickup->signature = 0; 1321 pickup->type = 0; 1322 mrec = (void *)(buf + count); 1323 1324 /* 1325 * Validate the completed record 1326 */ 1327 if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size)) 1328 errx(1, "read_mrecords: malformed record on pipe, bad crc"); 1329 1330 /* 1331 * If its a B-Tree record validate the data crc. 1332 * 1333 * NOTE: If the VFS passes us an explicitly errorde mrec 1334 * we just pass it through. 1335 */ 1336 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 1337 1338 if (type == HAMMER_MREC_TYPE_REC) { 1339 if (mrec->head.rec_size < 1340 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 1341 errx(1, "read_mrecords: malformed record on " 1342 "pipe, illegal element data_len"); 1343 } 1344 if (mrec->rec.leaf.data_len && 1345 mrec->rec.leaf.data_offset && 1346 hammer_crc_test_leaf(HammerVersion, &mrec->rec + 1, &mrec->rec.leaf) == 0) { 1347 fprintf(stderr, 1348 "read_mrecords: data_crc did not " 1349 "match data! obj=%016jx key=%016jx\n", 1350 (uintmax_t)mrec->rec.leaf.base.obj_id, 1351 (uintmax_t)mrec->rec.leaf.base.key); 1352 fprintf(stderr, 1353 "continuing, but there are problems\n"); 1354 } 1355 } 1356 count += bytes; 1357 } 1358 return(count); 1359 } 1360 1361 /* 1362 * Read and return a single mrecord. 1363 */ 1364 static 1365 hammer_ioc_mrecord_any_t 1366 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 1367 { 1368 hammer_ioc_mrecord_any_t mrec; 1369 struct hammer_ioc_mrecord_head mrechd; 1370 size_t bytes; 1371 size_t n; 1372 size_t i; 1373 1374 if (pickup && pickup->type != 0) { 1375 mrechd = *pickup; 1376 pickup->signature = 0; 1377 pickup->type = 0; 1378 n = HAMMER_MREC_HEADSIZE; 1379 } else { 1380 /* 1381 * Read in the PFSD header from the sender. 1382 */ 1383 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1384 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 1385 if (i <= 0) 1386 break; 1387 } 1388 if (n == 0) { 1389 *errorp = 0; /* EOF */ 1390 return(NULL); 1391 } 1392 if (n != HAMMER_MREC_HEADSIZE) { 1393 fprintf(stderr, "short read of mrecord header\n"); 1394 *errorp = EPIPE; 1395 return(NULL); 1396 } 1397 } 1398 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1399 fprintf(stderr, "read_mrecord: bad signature\n"); 1400 *errorp = EINVAL; 1401 return(NULL); 1402 } 1403 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 1404 assert(bytes >= sizeof(mrechd)); 1405 mrec = malloc(bytes); 1406 mrec->head = mrechd; 1407 1408 while (n < bytes) { 1409 i = read(fdin, (char *)mrec + n, bytes - n); 1410 if (i <= 0) 1411 break; 1412 n += i; 1413 } 1414 if (n != bytes) { 1415 fprintf(stderr, "read_mrecord: short read on payload\n"); 1416 *errorp = EPIPE; 1417 return(NULL); 1418 } 1419 if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size)) { 1420 fprintf(stderr, "read_mrecord: bad CRC\n"); 1421 *errorp = EINVAL; 1422 return(NULL); 1423 } 1424 *errorp = 0; 1425 return(mrec); 1426 } 1427 1428 static 1429 void 1430 write_mrecord(int fdout, uint32_t type, hammer_ioc_mrecord_any_t mrec, 1431 int bytes) 1432 { 1433 char zbuf[HAMMER_HEAD_ALIGN]; 1434 int pad; 1435 1436 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 1437 1438 assert(bytes >= (int)sizeof(mrec->head)); 1439 bzero(&mrec->head, sizeof(mrec->head)); 1440 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 1441 mrec->head.type = type; 1442 mrec->head.rec_size = bytes; 1443 hammer_crc_set_mrec_head(&mrec->head, bytes); 1444 if (write(fdout, mrec, bytes) != bytes) 1445 err(1, "write_mrecord"); 1446 if (pad) { 1447 bzero(zbuf, pad); 1448 if (write(fdout, zbuf, pad) != pad) 1449 err(1, "write_mrecord"); 1450 } 1451 } 1452 1453 /* 1454 * Generate a mirroring header with the pfs information of the 1455 * originating filesytem. 1456 */ 1457 static void 1458 generate_mrec_header(int fd, int pfs_id, 1459 union hammer_ioc_mrecord_any *mrec_tmp) 1460 { 1461 struct hammer_ioc_pseudofs_rw pfs; 1462 1463 bzero(mrec_tmp, sizeof(*mrec_tmp)); 1464 clrpfs(&pfs, &mrec_tmp->pfs.pfsd, pfs_id); 1465 1466 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) 1467 err(1, "Mirror-read: not a HAMMER fs/pseudofs!"); 1468 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) 1469 errx(1, "Mirror-read: HAMMER PFS version mismatch!"); 1470 mrec_tmp->pfs.version = pfs.version; 1471 } 1472 1473 /* 1474 * Validate the pfs information from the originating filesystem 1475 * against the target filesystem. shared_uuid must match. 1476 * 1477 * return -1 if we got a TERM record 1478 */ 1479 static int 1480 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1481 struct hammer_ioc_mrecord_head *pickup, 1482 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1483 { 1484 struct hammer_ioc_pseudofs_rw pfs; 1485 struct hammer_pseudofs_data pfsd; 1486 hammer_ioc_mrecord_any_t mrec; 1487 int error; 1488 1489 /* 1490 * Get the PFSD info from the target filesystem. 1491 */ 1492 clrpfs(&pfs, &pfsd, pfs_id); 1493 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) 1494 err(1, "mirror-write: not a HAMMER fs/pseudofs!"); 1495 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) 1496 errx(1, "mirror-write: HAMMER PFS version mismatch!"); 1497 1498 mrec = read_mrecord(fdin, &error, pickup); 1499 if (mrec == NULL) { 1500 if (error == 0) 1501 errx(1, "validate_mrec_header: short read"); 1502 exit(1); 1503 } 1504 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1505 free(mrec); 1506 return(-1); 1507 } 1508 1509 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) 1510 errx(1, "validate_mrec_header: did not get expected " 1511 "PFSD record type"); 1512 if (mrec->head.rec_size != sizeof(mrec->pfs)) 1513 errx(1, "validate_mrec_header: unexpected payload size"); 1514 if (mrec->pfs.version != pfs.version) 1515 errx(1, "validate_mrec_header: Version mismatch"); 1516 1517 /* 1518 * Whew. Ok, is the read PFS info compatible with the target? 1519 */ 1520 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1521 sizeof(pfsd.shared_uuid)) != 0) 1522 errx(1, "mirror-write: source and target have " 1523 "different shared-uuid's!"); 1524 if (is_target && hammer_is_pfs_master(&pfsd)) 1525 errx(1, "mirror-write: target must be in slave mode"); 1526 if (tid_begp) 1527 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1528 if (tid_endp) 1529 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1530 free(mrec); 1531 return(0); 1532 } 1533 1534 static void 1535 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1536 { 1537 struct hammer_ioc_pseudofs_rw pfs; 1538 struct hammer_pseudofs_data pfsd; 1539 1540 clrpfs(&pfs, &pfsd, pfs_id); 1541 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) 1542 err(1, "update_pfs_snapshot (read)"); 1543 1544 if (pfsd.sync_end_tid != snapshot_tid) { 1545 pfsd.sync_end_tid = snapshot_tid; 1546 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) 1547 err(1, "update_pfs_snapshot (rewrite)"); 1548 if (VerboseOpt >= 2) { 1549 fprintf(stderr, 1550 "Mirror-write: Completed, updated snapshot " 1551 "to %016jx\n", 1552 (uintmax_t)snapshot_tid); 1553 fflush(stderr); 1554 } 1555 } 1556 } 1557 1558 /* 1559 * Bandwidth-limited write in chunks 1560 */ 1561 static 1562 ssize_t 1563 writebw(int fd, const void *buf, size_t nbytes, 1564 uint64_t *bwcount, struct timeval *tv1) 1565 { 1566 struct timeval tv2; 1567 size_t n; 1568 ssize_t r; 1569 ssize_t a; 1570 int usec; 1571 1572 a = 0; 1573 r = 0; 1574 while (nbytes) { 1575 if (*bwcount + nbytes > BandwidthOpt) 1576 n = BandwidthOpt - *bwcount; 1577 else 1578 n = nbytes; 1579 if (n) 1580 r = write(fd, buf, n); 1581 if (r >= 0) { 1582 a += r; 1583 nbytes -= r; 1584 buf = (const char *)buf + r; 1585 } 1586 if ((size_t)r != n) 1587 break; 1588 *bwcount += n; 1589 if (*bwcount >= BandwidthOpt) { 1590 gettimeofday(&tv2, NULL); 1591 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1592 (int)(tv2.tv_usec - tv1->tv_usec); 1593 if (usec >= 0 && usec < 1000000) 1594 usleep(1000000 - usec); 1595 gettimeofday(tv1, NULL); 1596 *bwcount -= BandwidthOpt; 1597 } 1598 } 1599 return(a ? a : r); 1600 } 1601 1602 /* 1603 * Get a yes or no answer from the terminal. The program may be run as 1604 * part of a two-way pipe so we cannot use stdin for this operation. 1605 */ 1606 static int 1607 getyntty(void) 1608 { 1609 char buf[256]; 1610 FILE *fp; 1611 int result; 1612 1613 fp = fopen("/dev/tty", "r"); 1614 if (fp == NULL) { 1615 fprintf(stderr, "No terminal for response\n"); 1616 return(-1); 1617 } 1618 result = -1; 1619 while (fgets(buf, sizeof(buf), fp) != NULL) { 1620 if (buf[0] == 'y' || buf[0] == 'Y') { 1621 result = 1; 1622 break; 1623 } 1624 if (buf[0] == 'n' || buf[0] == 'N') { 1625 result = 0; 1626 break; 1627 } 1628 fprintf(stderr, "Response not understood\n"); 1629 break; 1630 } 1631 fclose(fp); 1632 return(result); 1633 } 1634 1635 static void 1636 score_printf(size_t i, size_t w, const char *ctl, ...) 1637 { 1638 va_list va; 1639 size_t n; 1640 static size_t SSize; 1641 static int SFd = -1; 1642 static char ScoreBuf[1024]; 1643 1644 if (ScoreBoardFile == NULL) 1645 return; 1646 assert(i + w < sizeof(ScoreBuf)); 1647 if (SFd < 0) { 1648 SFd = open(ScoreBoardFile, O_RDWR|O_CREAT|O_TRUNC, 0644); 1649 if (SFd < 0) 1650 return; 1651 SSize = 0; 1652 } 1653 for (n = 0; n < i; ++n) { 1654 if (ScoreBuf[n] == 0) 1655 ScoreBuf[n] = ' '; 1656 } 1657 va_start(va, ctl); 1658 vsnprintf(ScoreBuf + i, w - 1, ctl, va); 1659 va_end(va); 1660 n = strlen(ScoreBuf + i); 1661 while (n < w - 1) { 1662 ScoreBuf[i + n] = ' '; 1663 ++n; 1664 } 1665 ScoreBuf[i + n] = '\n'; 1666 if (SSize < i + w) 1667 SSize = i + w; 1668 pwrite(SFd, ScoreBuf, SSize, 0); 1669 } 1670 1671 static void 1672 hammer_check_restrict(const char *filesystem) 1673 { 1674 size_t rlen; 1675 int atslash; 1676 1677 if (RestrictTarget == NULL) 1678 return; 1679 rlen = strlen(RestrictTarget); 1680 if (strncmp(filesystem, RestrictTarget, rlen) != 0) 1681 errx(1, "hammer-remote: restricted target"); 1682 1683 atslash = 1; 1684 while (filesystem[rlen]) { 1685 if (atslash && 1686 filesystem[rlen] == '.' && 1687 filesystem[rlen+1] == '.') { 1688 errx(1, "hammer-remote: '..' not allowed"); 1689 } 1690 if (filesystem[rlen] == '/') 1691 atslash = 1; 1692 else 1693 atslash = 0; 1694 ++rlen; 1695 } 1696 } 1697 1698 static void 1699 mirror_usage(int code) 1700 { 1701 fprintf(stderr, 1702 "hammer mirror-read <filesystem> [begin-tid]\n" 1703 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1704 "hammer mirror-write <filesystem>\n" 1705 "hammer mirror-dump [header]\n" 1706 "hammer mirror-copy [[user@]host:]<filesystem>" 1707 " [[user@]host:]<filesystem>\n" 1708 "hammer mirror-stream [[user@]host:]<filesystem>" 1709 " [[user@]host:]<filesystem>\n" 1710 ); 1711 exit(code); 1712 } 1713 1714