1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 35 #include "hammer.h" 36 37 #define LINE1 0,20 38 #define LINE2 20,78 39 #define LINE3 90,70 40 41 #define SERIALBUF_SIZE (512 * 1024) 42 43 typedef struct histogram { 44 hammer_tid_t tid; 45 uint64_t bytes; 46 } *histogram_t; 47 48 const char *ScoreBoardFile; 49 const char *RestrictTarget; 50 51 static int read_mrecords(int fd, char *buf, u_int size, 52 struct hammer_ioc_mrecord_head *pickup); 53 static int generate_histogram(int fd, const char *filesystem, 54 histogram_t *histogram_ary, 55 struct hammer_ioc_mirror_rw *mirror_base, 56 int *repeatp); 57 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 58 struct hammer_ioc_mrecord_head *pickup); 59 static void write_mrecord(int fdout, uint32_t type, 60 hammer_ioc_mrecord_any_t mrec, int bytes); 61 static void generate_mrec_header(int fd, int pfs_id, 62 union hammer_ioc_mrecord_any *mrec_tmp); 63 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 64 struct hammer_ioc_mrecord_head *pickup, 65 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 66 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 67 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 68 uint64_t *bwcount, struct timeval *tv1); 69 static int getyntty(void); 70 static void score_printf(size_t i, size_t w, const char *ctl, ...); 71 static void hammer_check_restrict(const char *filesystem); 72 static void mirror_usage(int code); 73 74 /* 75 * Generate a mirroring data stream from the specific source over the 76 * entire key range, but restricted to the specified transaction range. 77 * 78 * The HAMMER VFS does most of the work, we add a few new mrecord 79 * types to negotiate the TID ranges and verify that the entire 80 * stream made it to the destination. 81 * 82 * streaming will be 0 for mirror-read, 1 for mirror-stream. The code will 83 * set up a fake value of -1 when running the histogram for mirror-read. 84 */ 85 void 86 hammer_cmd_mirror_read(char **av, int ac, int streaming) 87 { 88 struct hammer_ioc_mirror_rw mirror; 89 struct hammer_ioc_pseudofs_rw pfs; 90 union hammer_ioc_mrecord_any mrec_tmp; 91 struct hammer_ioc_mrecord_head pickup; 92 hammer_ioc_mrecord_any_t mrec; 93 hammer_tid_t sync_tid; 94 histogram_t histogram_ary; 95 const char *filesystem; 96 char *buf = malloc(SERIALBUF_SIZE); 97 int interrupted = 0; 98 int error; 99 int fd; 100 int n; 101 int didwork; 102 int histogram; 103 int histindex; 104 int histmax; 105 int repeat = 0; 106 int sameline; 107 int64_t total_bytes; 108 time_t base_t = time(NULL); 109 struct timeval bwtv; 110 uint64_t bwcount; 111 uint64_t estbytes; 112 113 if (ac == 0 || ac > 2) { 114 mirror_usage(1); 115 /* not reached */ 116 } 117 filesystem = av[0]; 118 hammer_check_restrict(filesystem); 119 120 pickup.signature = 0; 121 pickup.type = 0; 122 histogram = 0; 123 histindex = 0; 124 histmax = 0; 125 histogram_ary = NULL; 126 sameline = 0; 127 128 again: 129 bzero(&mirror, sizeof(mirror)); 130 hammer_key_beg_init(&mirror.key_beg); 131 hammer_key_end_init(&mirror.key_end); 132 133 fd = getpfs(&pfs, filesystem); 134 135 if (streaming >= 0) 136 score_printf(LINE1, "Running"); 137 138 if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) { 139 fprintf(stderr, "%cRunning \b\b", (sameline ? '\r' : '\n')); 140 fflush(stderr); 141 sameline = 1; 142 } 143 sameline = 1; 144 total_bytes = 0; 145 gettimeofday(&bwtv, NULL); 146 bwcount = 0; 147 148 /* 149 * Send initial header for the purpose of determining the 150 * shared-uuid. 151 */ 152 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 153 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 154 &mrec_tmp, sizeof(mrec_tmp.pfs)); 155 156 /* 157 * In 2-way mode the target will send us a PFS info packet 158 * first. Use the target's current snapshot TID as our default 159 * begin TID. 160 */ 161 if (TwoWayPipeOpt) { 162 mirror.tid_beg = 0; 163 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 164 NULL, &mirror.tid_beg); 165 if (n < 0) { /* got TERM record */ 166 relpfs(fd, &pfs); 167 free(buf); 168 free(histogram_ary); 169 return; 170 } 171 ++mirror.tid_beg; 172 } else if (streaming && histogram) { 173 mirror.tid_beg = histogram_ary[histindex].tid + 1; 174 } else { 175 mirror.tid_beg = 0; 176 } 177 178 /* 179 * Write out the PFS header, tid_beg will be updated if our PFS 180 * has a larger begin sync. tid_end is set to the latest source 181 * TID whos flush cycle has completed. 182 */ 183 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 184 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 185 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 186 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 187 mirror.ubuf = buf; 188 mirror.size = SERIALBUF_SIZE; 189 mirror.pfs_id = pfs.pfs_id; 190 mirror.shared_uuid = pfs.ondisk->shared_uuid; 191 192 /* 193 * XXX If the histogram is exhausted and the TID delta is large 194 * the stream might have been offline for a while and is 195 * now picking it up again. Do another histogram. 196 */ 197 #if 0 198 if (streaming && histogram && histindex == histend) { 199 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM) 200 histogram = 0; 201 } 202 #endif 203 204 /* 205 * Initial bulk startup control, try to do some incremental 206 * mirroring in order to allow the stream to be killed and 207 * restarted without having to start over. 208 */ 209 if (histogram == 0 && BulkOpt == 0) { 210 if (VerboseOpt && repeat == 0) { 211 fprintf(stderr, "\n"); 212 sameline = 0; 213 } 214 histmax = generate_histogram(fd, filesystem, 215 &histogram_ary, &mirror, 216 &repeat); 217 histindex = 0; 218 histogram = 1; 219 220 /* 221 * Just stream the histogram, then stop 222 */ 223 if (streaming == 0) 224 streaming = -1; 225 } 226 227 if (streaming && histogram) { 228 ++histindex; 229 mirror.tid_end = histogram_ary[histindex].tid; 230 estbytes = histogram_ary[histindex-1].bytes; 231 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end; 232 } else { 233 estbytes = 0; 234 } 235 236 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 237 &mrec_tmp, sizeof(mrec_tmp.pfs)); 238 239 /* 240 * A cycle file overrides the beginning TID only if we are 241 * not operating in two-way or histogram mode. 242 */ 243 if (TwoWayPipeOpt == 0 && histogram == 0) 244 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 245 246 /* 247 * An additional argument overrides the beginning TID regardless 248 * of what mode we are in. This is not recommending if operating 249 * in two-way mode. 250 */ 251 if (ac == 2) 252 mirror.tid_beg = strtoull(av[1], NULL, 0); 253 254 if (streaming == 0 || VerboseOpt >= 2) { 255 fprintf(stderr, 256 "Mirror-read: Mirror %016jx to %016jx", 257 (uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end); 258 if (histogram) 259 fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes); 260 fprintf(stderr, "\n"); 261 fflush(stderr); 262 } 263 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 264 fprintf(stderr, "Mirror-read: Resuming at object %016jx\n", 265 (uintmax_t)mirror.key_beg.obj_id); 266 } 267 268 /* 269 * Nothing to do if begin equals end. 270 */ 271 if (mirror.tid_beg >= mirror.tid_end) { 272 if (streaming == 0 || VerboseOpt >= 2) 273 fprintf(stderr, "Mirror-read: No work to do\n"); 274 sleep(DelayOpt); 275 didwork = 0; 276 histogram = 0; 277 goto done; 278 } 279 didwork = 1; 280 281 /* 282 * Write out bulk records 283 */ 284 mirror.ubuf = buf; 285 mirror.size = SERIALBUF_SIZE; 286 287 do { 288 mirror.count = 0; 289 mirror.pfs_id = pfs.pfs_id; 290 mirror.shared_uuid = pfs.ondisk->shared_uuid; 291 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 292 score_printf(LINE3, "Mirror-read %s failed: %s", 293 filesystem, strerror(errno)); 294 err(1, "Mirror-read %s failed", filesystem); 295 /* not reached */ 296 } 297 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 298 score_printf(LINE3, "Mirror-read %s fatal error %d", 299 filesystem, mirror.head.error); 300 errx(1, "Mirror-read %s fatal error %d", 301 filesystem, mirror.head.error); 302 /* not reached */ 303 } 304 if (mirror.count) { 305 if (BandwidthOpt) { 306 n = writebw(1, mirror.ubuf, mirror.count, 307 &bwcount, &bwtv); 308 } else { 309 n = write(1, mirror.ubuf, mirror.count); 310 } 311 if (n != mirror.count) { 312 score_printf(LINE3, 313 "Mirror-read %s failed: " 314 "short write", 315 filesystem); 316 errx(1, "Mirror-read %s failed: short write", 317 filesystem); 318 /* not reached */ 319 } 320 } 321 total_bytes += mirror.count; 322 if (streaming && VerboseOpt) { 323 fprintf(stderr, 324 "\rscan obj=%016jx tids=%016jx:%016jx %11jd", 325 (uintmax_t)mirror.key_cur.obj_id, 326 (uintmax_t)mirror.tid_beg, 327 (uintmax_t)mirror.tid_end, 328 (intmax_t)total_bytes); 329 fflush(stderr); 330 sameline = 0; 331 } else if (streaming) { 332 score_printf(LINE2, 333 "obj=%016jx tids=%016jx:%016jx %11jd", 334 (uintmax_t)mirror.key_cur.obj_id, 335 (uintmax_t)mirror.tid_beg, 336 (uintmax_t)mirror.tid_end, 337 (intmax_t)total_bytes); 338 } 339 mirror.key_beg = mirror.key_cur; 340 341 /* 342 * Deal with time limit option 343 */ 344 if (TimeoutOpt && 345 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 346 score_printf(LINE3, 347 "Mirror-read %s interrupted by timer at" 348 " %016jx", 349 filesystem, 350 (uintmax_t)mirror.key_cur.obj_id); 351 fprintf(stderr, 352 "Mirror-read %s interrupted by timer at" 353 " %016jx\n", 354 filesystem, 355 (uintmax_t)mirror.key_cur.obj_id); 356 interrupted = 1; 357 break; 358 } 359 } while (mirror.count != 0); 360 361 done: 362 if (streaming && VerboseOpt && sameline == 0) { 363 fprintf(stderr, "\n"); 364 fflush(stderr); 365 sameline = 1; 366 } 367 368 /* 369 * Write out the termination sync record - only if not interrupted 370 */ 371 if (interrupted == 0) { 372 if (didwork) { 373 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 374 &mrec_tmp, sizeof(mrec_tmp.sync)); 375 } else { 376 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 377 &mrec_tmp, sizeof(mrec_tmp.sync)); 378 } 379 } 380 381 /* 382 * If the -2 option was given (automatic when doing mirror-copy), 383 * a two-way pipe is assumed and we expect a response mrec from 384 * the target. 385 */ 386 if (TwoWayPipeOpt) { 387 mrec = read_mrecord(0, &error, &pickup); 388 if (mrec == NULL || 389 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 390 mrec->head.rec_size != sizeof(mrec->update)) { 391 errx(1, "mirror_read: Did not get final " 392 "acknowledgement packet from target"); 393 /* not reached */ 394 } 395 if (interrupted) { 396 if (CyclePath) { 397 hammer_set_cycle(&mirror.key_cur, 398 mirror.tid_beg); 399 fprintf(stderr, "Cyclefile %s updated for " 400 "continuation\n", CyclePath); 401 } 402 } else { 403 sync_tid = mrec->update.tid; 404 if (CyclePath) { 405 hammer_key_beg_init(&mirror.key_beg); 406 hammer_set_cycle(&mirror.key_beg, sync_tid); 407 fprintf(stderr, 408 "Cyclefile %s updated to 0x%016jx\n", 409 CyclePath, (uintmax_t)sync_tid); 410 } 411 } 412 free(mrec); 413 } else if (CyclePath) { 414 /* NOTE! mirror.tid_beg cannot be updated */ 415 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 416 "fully updated unless you use mirror-copy\n"); 417 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 418 } 419 if (streaming && interrupted == 0) { 420 time_t t1 = time(NULL); 421 time_t t2; 422 423 /* 424 * Try to break down large bulk transfers into smaller ones 425 * so it can sync the transaction id on the slave. This 426 * way if we get interrupted a restart doesn't have to 427 * start from scratch. 428 */ 429 if (streaming && histogram) { 430 if (histindex != histmax) { 431 if (VerboseOpt && VerboseOpt < 2 && 432 streaming >= 0) { 433 fprintf(stderr, " (bulk incremental)"); 434 } 435 relpfs(fd, &pfs); 436 goto again; 437 } 438 } 439 440 if (VerboseOpt && streaming >= 0) { 441 fprintf(stderr, " W"); 442 fflush(stderr); 443 } else if (streaming >= 0) { 444 score_printf(LINE1, "Waiting"); 445 } 446 pfs.ondisk->sync_end_tid = mirror.tid_end; 447 if (streaming < 0) { 448 /* 449 * Fake streaming mode when using a histogram to 450 * break up a mirror-read, do not wait on source. 451 */ 452 streaming = 0; 453 } else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 454 score_printf(LINE3, 455 "Mirror-read %s: cannot stream: %s\n", 456 filesystem, strerror(errno)); 457 fprintf(stderr, 458 "Mirror-read %s: cannot stream: %s\n", 459 filesystem, strerror(errno)); 460 } else { 461 t2 = time(NULL) - t1; 462 if (t2 >= 0 && t2 < DelayOpt) { 463 if (VerboseOpt) { 464 fprintf(stderr, "\bD"); 465 fflush(stderr); 466 } 467 sleep(DelayOpt - t2); 468 } 469 if (VerboseOpt) { 470 fprintf(stderr, "\b "); 471 fflush(stderr); 472 } 473 relpfs(fd, &pfs); 474 goto again; 475 } 476 } 477 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 478 &mrec_tmp, sizeof(mrec_tmp.sync)); 479 relpfs(fd, &pfs); 480 free(buf); 481 free(histogram_ary); 482 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 483 } 484 485 /* 486 * What we are trying to do here is figure out how much data is 487 * going to be sent for the TID range and to break the TID range 488 * down into reasonably-sized slices (from the point of view of 489 * data sent) so a lost connection can restart at a reasonable 490 * place and not all the way back at the beginning. 491 * 492 * An entry's TID serves as the end_tid for the prior entry 493 * So we have to offset the calculation by 1 so that TID falls into 494 * the previous entry when populating entries. 495 * 496 * Because the transaction id space is bursty we need a relatively 497 * large number of buckets (like a million) to do a reasonable job 498 * for things like an initial bulk mirrors on a very large filesystem. 499 */ 500 #define HIST_COUNT (1024 * 1024) 501 502 static 503 int 504 generate_histogram(int fd, const char *filesystem, 505 histogram_t *histogram_ary, 506 struct hammer_ioc_mirror_rw *mirror_base, 507 int *repeatp) 508 { 509 struct hammer_ioc_mirror_rw mirror; 510 union hammer_ioc_mrecord_any *mrec; 511 hammer_tid_t tid_beg; 512 hammer_tid_t tid_end; 513 hammer_tid_t tid; 514 hammer_tid_t tidx; 515 uint64_t *tid_bytes; 516 uint64_t total; 517 uint64_t accum; 518 int chunkno; 519 int i; 520 int res; 521 int off; 522 int len; 523 524 mirror = *mirror_base; 525 tid_beg = mirror.tid_beg; 526 tid_end = mirror.tid_end; 527 mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA; 528 529 if (*histogram_ary == NULL) { 530 *histogram_ary = malloc(sizeof(struct histogram) * 531 (HIST_COUNT + 2)); 532 } 533 if (tid_beg >= tid_end) 534 return(0); 535 536 /* needs 2 extra */ 537 tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2)); 538 bzero(tid_bytes, sizeof(*tid_bytes) * (HIST_COUNT + 2)); 539 540 if (*repeatp == 0) { 541 fprintf(stderr, "Prescan to break up bulk transfer"); 542 if (VerboseOpt > 1) { 543 fprintf(stderr, " (%juMB chunks)", 544 (uintmax_t)(SplitupOpt / (1024 * 1024))); 545 } 546 fprintf(stderr, "\n"); 547 } 548 549 /* 550 * Note: (tid_beg,tid_end), range is inclusive of both beg & end. 551 * 552 * Note: Estimates can be off when the mirror is way behind due 553 * to skips. 554 */ 555 total = 0; 556 accum = 0; 557 chunkno = 0; 558 for (;;) { 559 mirror.count = 0; 560 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 561 err(1, "Mirror-read %s failed", filesystem); 562 /* not reached */ 563 } 564 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 565 errx(1, "Mirror-read %s fatal error %d", 566 filesystem, mirror.head.error); 567 /* not reached */ 568 } 569 for (off = 0; 570 off < mirror.count; 571 off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size)) { 572 mrec = (void *)((char *)mirror.ubuf + off); 573 574 /* 575 * We only care about general RECs and PASS 576 * records. We ignore SKIPs. 577 */ 578 switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) { 579 case HAMMER_MREC_TYPE_REC: 580 case HAMMER_MREC_TYPE_PASS: 581 break; 582 default: 583 continue; 584 } 585 586 /* 587 * Calculate for two indices, create_tid and 588 * delete_tid. Record data only applies to 589 * the create_tid. 590 * 591 * When tid is exactly on the boundary it really 592 * belongs to the previous entry because scans 593 * are inclusive of the ending entry. 594 */ 595 tid = mrec->rec.leaf.base.delete_tid; 596 if (tid && tid >= tid_beg && tid <= tid_end) { 597 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 598 if (mrec->head.type == 599 HAMMER_MREC_TYPE_REC) { 600 len -= HAMMER_HEAD_DOALIGN( 601 mrec->rec.leaf.data_len); 602 assert(len > 0); 603 } 604 i = (tid - tid_beg) * HIST_COUNT / 605 (tid_end - tid_beg); 606 tidx = tid_beg + i * (tid_end - tid_beg) / 607 HIST_COUNT; 608 if (tid == tidx && i) 609 --i; 610 assert(i >= 0 && i < HIST_COUNT); 611 tid_bytes[i] += len; 612 total += len; 613 accum += len; 614 } 615 616 tid = mrec->rec.leaf.base.create_tid; 617 if (tid && tid >= tid_beg && tid <= tid_end) { 618 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 619 if (mrec->head.type == 620 HAMMER_MREC_TYPE_REC_NODATA) { 621 len += HAMMER_HEAD_DOALIGN( 622 mrec->rec.leaf.data_len); 623 } 624 i = (tid - tid_beg) * HIST_COUNT / 625 (tid_end - tid_beg); 626 tidx = tid_beg + i * (tid_end - tid_beg) / 627 HIST_COUNT; 628 if (tid == tidx && i) 629 --i; 630 assert(i >= 0 && i < HIST_COUNT); 631 tid_bytes[i] += len; 632 total += len; 633 accum += len; 634 } 635 } 636 if (*repeatp == 0 && accum > SplitupOpt) { 637 if (VerboseOpt > 1) { 638 fprintf(stderr, "."); 639 fflush(stderr); 640 } 641 ++chunkno; 642 score_printf(LINE2, "Prescan chunk %d", chunkno); 643 accum = 0; 644 } 645 if (mirror.count == 0) 646 break; 647 mirror.key_beg = mirror.key_cur; 648 } 649 650 /* 651 * Reduce to SplitupOpt (default 4GB) chunks. This code may 652 * use up to two additional elements. Do the array in-place. 653 * 654 * Inefficient degenerate cases can occur if we do not accumulate 655 * at least the requested split amount, so error on the side of 656 * going over a bit. 657 */ 658 res = 0; 659 (*histogram_ary)[res].tid = tid_beg; 660 (*histogram_ary)[res].bytes = tid_bytes[0]; 661 for (i = 1; i < HIST_COUNT; ++i) { 662 if ((*histogram_ary)[res].bytes >= SplitupOpt) { 663 ++res; 664 (*histogram_ary)[res].tid = tid_beg + 665 i * (tid_end - tid_beg) / 666 HIST_COUNT; 667 (*histogram_ary)[res].bytes = 0; 668 669 } 670 (*histogram_ary)[res].bytes += tid_bytes[i]; 671 } 672 ++res; 673 (*histogram_ary)[res].tid = tid_end; 674 (*histogram_ary)[res].bytes = -1; 675 676 if (*repeatp == 0) { 677 if (VerboseOpt > 1) 678 fprintf(stderr, "\n"); /* newline after ... */ 679 score_printf(LINE3, "Prescan %d chunks, total %ju MBytes", 680 res, (uintmax_t)total / (1024 * 1024)); 681 fprintf(stderr, "Prescan %d chunks, total %ju MBytes (", 682 res, (uintmax_t)total / (1024 * 1024)); 683 for (i = 0; i < res && i < 3; ++i) { 684 if (i) 685 fprintf(stderr, ", "); 686 fprintf(stderr, "%ju", 687 (uintmax_t)(*histogram_ary)[i].bytes); 688 } 689 if (i < res) 690 fprintf(stderr, ", ..."); 691 fprintf(stderr, ")\n"); 692 } 693 assert(res <= HIST_COUNT); 694 *repeatp = 1; 695 696 free(tid_bytes); 697 return(res); 698 } 699 700 static 701 void 702 create_pfs(const char *filesystem, uuid_t *s_uuid) 703 { 704 if (ForceYesOpt == 1) { 705 fprintf(stderr, "PFS slave %s does not exist. " 706 "Auto create new slave PFS!\n", filesystem); 707 708 } else { 709 fprintf(stderr, "PFS slave %s does not exist.\n" 710 "Do you want to create a new slave PFS? [y/n] ", 711 filesystem); 712 fflush(stderr); 713 if (getyntty() != 1) { 714 errx(1, "Aborting operation"); 715 /* not reached */ 716 } 717 } 718 719 uint32_t status; 720 char *shared_uuid = NULL; 721 uuid_to_string(s_uuid, &shared_uuid, &status); 722 723 char *cmd = NULL; 724 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 725 filesystem, shared_uuid); 726 free(shared_uuid); 727 728 if (cmd == NULL) { 729 errx(1, "Failed to alloc memory"); 730 /* not reached */ 731 } 732 if (system(cmd) != 0) 733 fprintf(stderr, "Failed to create PFS\n"); 734 free(cmd); 735 } 736 737 /* 738 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 739 * some additional packet types to negotiate TID ranges and to verify 740 * completion. The HAMMER VFS does most of the work. 741 * 742 * It is important to note that the mirror.key_{beg,end} range must 743 * match the ranged used by the original. For now both sides use 744 * range the entire key space. 745 * 746 * It is even more important that the records in the stream conform 747 * to the TID range also supplied in the stream. The HAMMER VFS will 748 * use the REC, PASS, and SKIP record types to track the portions of 749 * the B-Tree being scanned in order to be able to proactively delete 750 * records on the target within those active areas that are not mentioned 751 * by the source. 752 * 753 * The mirror.key_cur field is used by the VFS to do this tracking. It 754 * must be initialized to key_beg but then is persistently updated by 755 * the HAMMER VFS on each successive ioctl() call. If you blow up this 756 * field you will blow up the mirror target, possibly to the point of 757 * deleting everything. As a safety measure the HAMMER VFS simply marks 758 * the records that the source has destroyed as deleted on the target, 759 * and normal pruning operations will deal with their final disposition 760 * at some later time. 761 */ 762 void 763 hammer_cmd_mirror_write(char **av, int ac) 764 { 765 struct hammer_ioc_mirror_rw mirror; 766 const char *filesystem; 767 char *buf = malloc(SERIALBUF_SIZE); 768 struct hammer_ioc_pseudofs_rw pfs; 769 struct hammer_ioc_mrecord_head pickup; 770 struct hammer_ioc_synctid synctid; 771 union hammer_ioc_mrecord_any mrec_tmp; 772 hammer_ioc_mrecord_any_t mrec; 773 struct stat st; 774 int error; 775 int fd; 776 int n; 777 778 if (ac != 1) { 779 mirror_usage(1); 780 /* not reached */ 781 } 782 filesystem = av[0]; 783 hammer_check_restrict(filesystem); 784 785 pickup.signature = 0; 786 pickup.type = 0; 787 788 again: 789 bzero(&mirror, sizeof(mirror)); 790 hammer_key_beg_init(&mirror.key_beg); 791 hammer_key_end_init(&mirror.key_end); 792 mirror.key_end = mirror.key_beg; 793 794 /* 795 * Read initial packet 796 */ 797 mrec = read_mrecord(0, &error, &pickup); 798 if (mrec == NULL) { 799 if (error == 0) { 800 errx(1, "validate_mrec_header: short read"); 801 /* not reached */ 802 } 803 exit(1); 804 } 805 /* 806 * Validate packet 807 */ 808 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 809 free(buf); 810 return; 811 } 812 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 813 errx(1, "validate_mrec_header: did not get expected " 814 "PFSD record type"); 815 /* not reached */ 816 } 817 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 818 errx(1, "validate_mrec_header: unexpected payload size"); 819 /* not reached */ 820 } 821 822 /* 823 * Create slave PFS if it doesn't yet exist 824 */ 825 if (lstat(filesystem, &st) != 0) 826 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 827 free(mrec); 828 mrec = NULL; 829 830 fd = getpfs(&pfs, filesystem); 831 832 /* 833 * In two-way mode the target writes out a PFS packet first. 834 * The source uses our tid_end as its tid_beg by default, 835 * picking up where it left off. 836 */ 837 mirror.tid_beg = 0; 838 if (TwoWayPipeOpt) { 839 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 840 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 841 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 842 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 843 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 844 &mrec_tmp, sizeof(mrec_tmp.pfs)); 845 } 846 847 /* 848 * Read and process the PFS header. The source informs us of 849 * the TID range the stream represents. 850 */ 851 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 852 &mirror.tid_beg, &mirror.tid_end); 853 if (n < 0) { /* got TERM record */ 854 relpfs(fd, &pfs); 855 free(buf); 856 return; 857 } 858 859 mirror.ubuf = buf; 860 mirror.size = SERIALBUF_SIZE; 861 862 /* 863 * Read and process bulk records (REC, PASS, and SKIP types). 864 * 865 * On your life, do NOT mess with mirror.key_cur or your mirror 866 * target may become history. 867 */ 868 for (;;) { 869 mirror.count = 0; 870 mirror.pfs_id = pfs.pfs_id; 871 mirror.shared_uuid = pfs.ondisk->shared_uuid; 872 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 873 if (mirror.size <= 0) 874 break; 875 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 876 err(1, "Mirror-write %s failed", filesystem); 877 /* not reached */ 878 } 879 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 880 errx(1, "Mirror-write %s fatal error %d", 881 filesystem, mirror.head.error); 882 /* not reached */ 883 } 884 #if 0 885 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 886 errx(1, "Mirror-write %s interrupted by timer at" 887 " %016llx", 888 filesystem, 889 mirror.key_cur.obj_id); 890 /* not reached */ 891 } 892 #endif 893 } 894 895 /* 896 * Read and process the termination sync record. 897 */ 898 mrec = read_mrecord(0, &error, &pickup); 899 900 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 901 fprintf(stderr, "Mirror-write: received termination request\n"); 902 relpfs(fd, &pfs); 903 free(mrec); 904 free(buf); 905 return; 906 } 907 908 if (mrec == NULL || 909 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 910 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 911 mrec->head.rec_size != sizeof(mrec->sync)) { 912 errx(1, "Mirror-write %s: Did not get termination " 913 "sync record, or rec_size is wrong rt=%d", 914 filesystem, (mrec ? (int)mrec->head.type : -1)); 915 /* not reached */ 916 } 917 918 /* 919 * Update the PFS info on the target so the user has visibility 920 * into the new snapshot, and sync the target filesystem. 921 */ 922 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 923 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 924 925 bzero(&synctid, sizeof(synctid)); 926 synctid.op = HAMMER_SYNCTID_SYNC2; 927 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 928 929 if (VerboseOpt >= 2) { 930 fprintf(stderr, "Mirror-write %s: succeeded\n", 931 filesystem); 932 } 933 } 934 935 free(mrec); 936 mrec = NULL; 937 938 /* 939 * Report back to the originator. 940 */ 941 if (TwoWayPipeOpt) { 942 mrec_tmp.update.tid = mirror.tid_end; 943 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 944 &mrec_tmp, sizeof(mrec_tmp.update)); 945 } else { 946 printf("Source can update synctid to 0x%016jx\n", 947 (uintmax_t)mirror.tid_end); 948 } 949 relpfs(fd, &pfs); 950 goto again; 951 } 952 953 void 954 hammer_cmd_mirror_dump(char **av, int ac) 955 { 956 char *buf = malloc(SERIALBUF_SIZE); 957 struct hammer_ioc_mrecord_head pickup; 958 hammer_ioc_mrecord_any_t mrec; 959 int error; 960 int size; 961 int offset; 962 int bytes; 963 int header_only = 0; 964 965 if (ac == 1 && strcmp(*av, "header") == 0) { 966 header_only = 1; 967 } else if (ac != 0) { 968 mirror_usage(1); 969 /* not reached */ 970 } 971 972 /* 973 * Read and process the PFS header 974 */ 975 pickup.signature = 0; 976 pickup.type = 0; 977 978 mrec = read_mrecord(0, &error, &pickup); 979 980 /* 981 * Dump the PFS header. mirror-dump takes its input from the output 982 * of a mirror-read so getpfs() can't be used to get a fd to be passed 983 * to dump_pfsd(). 984 */ 985 if (header_only && mrec != NULL) { 986 dump_pfsd(&mrec->pfs.pfsd, -1); 987 free(mrec); 988 free(buf); 989 return; 990 } 991 free(mrec); 992 993 again: 994 /* 995 * Read and process bulk records 996 */ 997 for (;;) { 998 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 999 if (size <= 0) 1000 break; 1001 offset = 0; 1002 while (offset < size) { 1003 mrec = (void *)((char *)buf + offset); 1004 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 1005 if (offset + bytes > size) { 1006 errx(1, "Misaligned record"); 1007 /* not reached */ 1008 } 1009 1010 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 1011 case HAMMER_MREC_TYPE_REC_BADCRC: 1012 case HAMMER_MREC_TYPE_REC: 1013 printf("Record lo=%08x obj=%016jx key=%016jx " 1014 "rt=%02x ot=%02x", 1015 mrec->rec.leaf.base.localization, 1016 (uintmax_t)mrec->rec.leaf.base.obj_id, 1017 (uintmax_t)mrec->rec.leaf.base.key, 1018 mrec->rec.leaf.base.rec_type, 1019 mrec->rec.leaf.base.obj_type); 1020 if (mrec->head.type == 1021 HAMMER_MREC_TYPE_REC_BADCRC) { 1022 printf(" (BAD CRC)"); 1023 } 1024 printf("\n"); 1025 printf(" tids %016jx:%016jx data=%d\n", 1026 (uintmax_t)mrec->rec.leaf.base.create_tid, 1027 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1028 mrec->rec.leaf.data_len); 1029 break; 1030 case HAMMER_MREC_TYPE_PASS: 1031 printf("Pass lo=%08x obj=%016jx key=%016jx " 1032 "rt=%02x ot=%02x\n", 1033 mrec->rec.leaf.base.localization, 1034 (uintmax_t)mrec->rec.leaf.base.obj_id, 1035 (uintmax_t)mrec->rec.leaf.base.key, 1036 mrec->rec.leaf.base.rec_type, 1037 mrec->rec.leaf.base.obj_type); 1038 printf(" tids %016jx:%016jx data=%d\n", 1039 (uintmax_t)mrec->rec.leaf.base.create_tid, 1040 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1041 mrec->rec.leaf.data_len); 1042 break; 1043 case HAMMER_MREC_TYPE_SKIP: 1044 printf("Skip lo=%08x obj=%016jx key=%016jx rt=%02x to\n" 1045 " lo=%08x obj=%016jx key=%016jx rt=%02x\n", 1046 mrec->skip.skip_beg.localization, 1047 (uintmax_t)mrec->skip.skip_beg.obj_id, 1048 (uintmax_t)mrec->skip.skip_beg.key, 1049 mrec->skip.skip_beg.rec_type, 1050 mrec->skip.skip_end.localization, 1051 (uintmax_t)mrec->skip.skip_end.obj_id, 1052 (uintmax_t)mrec->skip.skip_end.key, 1053 mrec->skip.skip_end.rec_type); 1054 default: 1055 break; 1056 } 1057 offset += bytes; 1058 } 1059 } 1060 1061 /* 1062 * Read and process the termination sync record. 1063 */ 1064 mrec = read_mrecord(0, &error, &pickup); 1065 if (mrec == NULL || 1066 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 1067 mrec->head.type != HAMMER_MREC_TYPE_IDLE)) { 1068 fprintf(stderr, "Mirror-dump: Did not get termination " 1069 "sync record\n"); 1070 } 1071 free(mrec); 1072 1073 /* 1074 * Continue with more batches until EOF. 1075 */ 1076 mrec = read_mrecord(0, &error, &pickup); 1077 if (mrec) { 1078 free(mrec); 1079 goto again; 1080 } 1081 free(buf); 1082 } 1083 1084 void 1085 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 1086 { 1087 pid_t pid1; 1088 pid_t pid2; 1089 int fds[2]; 1090 const char *xav[32]; 1091 char tbuf[16]; 1092 char *sh, *user, *host, *rfs; 1093 int xac; 1094 1095 if (ac != 2) { 1096 mirror_usage(1); 1097 /* not reached */ 1098 } 1099 1100 TwoWayPipeOpt = 1; 1101 signal(SIGPIPE, SIG_IGN); 1102 1103 again: 1104 if (pipe(fds) < 0) { 1105 err(1, "pipe"); 1106 /* not reached */ 1107 } 1108 1109 /* 1110 * Source 1111 */ 1112 if ((pid1 = fork()) == 0) { 1113 signal(SIGPIPE, SIG_DFL); 1114 dup2(fds[0], 0); 1115 dup2(fds[0], 1); 1116 close(fds[0]); 1117 close(fds[1]); 1118 if ((rfs = strchr(av[0], ':')) != NULL) { 1119 xac = 0; 1120 1121 if((sh = getenv("HAMMER_RSH")) == NULL) 1122 xav[xac++] = "ssh"; 1123 else 1124 xav[xac++] = sh; 1125 1126 if (CompressOpt) 1127 xav[xac++] = "-C"; 1128 1129 if ((host = strchr(av[0], '@')) != NULL) { 1130 user = strndup( av[0], (host++ - av[0])); 1131 host = strndup( host, (rfs++ - host)); 1132 xav[xac++] = "-l"; 1133 xav[xac++] = user; 1134 xav[xac++] = host; 1135 } else { 1136 host = strndup( av[0], (rfs++ - av[0])); 1137 user = NULL; 1138 xav[xac++] = host; 1139 } 1140 1141 1142 if (SshPort) { 1143 xav[xac++] = "-p"; 1144 xav[xac++] = SshPort; 1145 } 1146 1147 xav[xac++] = "hammer"; 1148 1149 switch(VerboseOpt) { 1150 case 0: 1151 break; 1152 case 1: 1153 xav[xac++] = "-v"; 1154 break; 1155 case 2: 1156 xav[xac++] = "-vv"; 1157 break; 1158 default: 1159 xav[xac++] = "-vvv"; 1160 break; 1161 } 1162 if (ForceYesOpt) 1163 xav[xac++] = "-y"; 1164 xav[xac++] = "-2"; 1165 if (TimeoutOpt) { 1166 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 1167 xav[xac++] = "-t"; 1168 xav[xac++] = tbuf; 1169 } 1170 if (SplitupOptStr) { 1171 xav[xac++] = "-S"; 1172 xav[xac++] = SplitupOptStr; 1173 } 1174 if (streaming) 1175 xav[xac++] = "mirror-read-stream"; 1176 else 1177 xav[xac++] = "mirror-read"; 1178 xav[xac++] = rfs; 1179 xav[xac++] = NULL; 1180 execvp(*xav, (void *)xav); 1181 } else { 1182 hammer_cmd_mirror_read(av, 1, streaming); 1183 fflush(stdout); 1184 fflush(stderr); 1185 } 1186 _exit(1); 1187 } 1188 1189 /* 1190 * Target 1191 */ 1192 if ((pid2 = fork()) == 0) { 1193 signal(SIGPIPE, SIG_DFL); 1194 dup2(fds[1], 0); 1195 dup2(fds[1], 1); 1196 close(fds[0]); 1197 close(fds[1]); 1198 if ((rfs = strchr(av[1], ':')) != NULL) { 1199 xac = 0; 1200 1201 if((sh = getenv("HAMMER_RSH")) == NULL) 1202 xav[xac++] = "ssh"; 1203 else 1204 xav[xac++] = sh; 1205 1206 if (CompressOpt) 1207 xav[xac++] = "-C"; 1208 1209 if ((host = strchr(av[1], '@')) != NULL) { 1210 user = strndup( av[1], (host++ - av[1])); 1211 host = strndup( host, (rfs++ - host)); 1212 xav[xac++] = "-l"; 1213 xav[xac++] = user; 1214 xav[xac++] = host; 1215 } else { 1216 host = strndup( av[1], (rfs++ - av[1])); 1217 user = NULL; 1218 xav[xac++] = host; 1219 } 1220 1221 if (SshPort) { 1222 xav[xac++] = "-p"; 1223 xav[xac++] = SshPort; 1224 } 1225 1226 xav[xac++] = "hammer"; 1227 1228 switch(VerboseOpt) { 1229 case 0: 1230 break; 1231 case 1: 1232 xav[xac++] = "-v"; 1233 break; 1234 case 2: 1235 xav[xac++] = "-vv"; 1236 break; 1237 default: 1238 xav[xac++] = "-vvv"; 1239 break; 1240 } 1241 if (ForceYesOpt) 1242 xav[xac++] = "-y"; 1243 xav[xac++] = "-2"; 1244 xav[xac++] = "mirror-write"; 1245 xav[xac++] = rfs; 1246 xav[xac++] = NULL; 1247 execvp(*xav, (void *)xav); 1248 } else { 1249 hammer_cmd_mirror_write(av + 1, 1); 1250 fflush(stdout); 1251 fflush(stderr); 1252 } 1253 _exit(1); 1254 } 1255 close(fds[0]); 1256 close(fds[1]); 1257 1258 while (waitpid(pid1, NULL, 0) <= 0) 1259 ; 1260 while (waitpid(pid2, NULL, 0) <= 0) 1261 ; 1262 1263 /* 1264 * If the link is lost restart 1265 */ 1266 if (streaming) { 1267 if (VerboseOpt) { 1268 fprintf(stderr, "\nLost Link\n"); 1269 fflush(stderr); 1270 } 1271 sleep(15 + DelayOpt); 1272 goto again; 1273 } 1274 1275 } 1276 1277 /* 1278 * Read and return multiple mrecords 1279 */ 1280 static 1281 int 1282 read_mrecords(int fd, char *buf, u_int size, struct hammer_ioc_mrecord_head *pickup) 1283 { 1284 hammer_ioc_mrecord_any_t mrec; 1285 u_int count; 1286 size_t n; 1287 size_t i; 1288 size_t bytes; 1289 int type; 1290 1291 count = 0; 1292 while (size - count >= HAMMER_MREC_HEADSIZE) { 1293 /* 1294 * Cached the record header in case we run out of buffer 1295 * space. 1296 */ 1297 fflush(stdout); 1298 if (pickup->signature == 0) { 1299 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1300 i = read(fd, (char *)pickup + n, 1301 HAMMER_MREC_HEADSIZE - n); 1302 if (i <= 0) 1303 break; 1304 } 1305 if (n == 0) 1306 break; 1307 if (n != HAMMER_MREC_HEADSIZE) { 1308 errx(1, "read_mrecords: short read on pipe"); 1309 /* not reached */ 1310 } 1311 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1312 errx(1, "read_mrecords: malformed record on pipe, " 1313 "bad signature"); 1314 /* not reached */ 1315 } 1316 } 1317 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 1318 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 1319 errx(1, "read_mrecords: malformed record on pipe, " 1320 "illegal rec_size"); 1321 /* not reached */ 1322 } 1323 1324 /* 1325 * Stop if we have insufficient space for the record and data. 1326 */ 1327 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 1328 if (size - count < bytes) 1329 break; 1330 1331 /* 1332 * Stop if the record type is not a REC, SKIP, or PASS, 1333 * which are the only types the ioctl supports. Other types 1334 * are used only by the userland protocol. 1335 * 1336 * Ignore all flags. 1337 */ 1338 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 1339 if (type != HAMMER_MREC_TYPE_PFSD && 1340 type != HAMMER_MREC_TYPE_REC && 1341 type != HAMMER_MREC_TYPE_SKIP && 1342 type != HAMMER_MREC_TYPE_PASS) { 1343 break; 1344 } 1345 1346 /* 1347 * Read the remainder and clear the pickup signature. 1348 */ 1349 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 1350 i = read(fd, buf + count + n, bytes - n); 1351 if (i <= 0) 1352 break; 1353 } 1354 if (n != bytes) { 1355 errx(1, "read_mrecords: short read on pipe"); 1356 /* not reached */ 1357 } 1358 1359 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 1360 pickup->signature = 0; 1361 pickup->type = 0; 1362 mrec = (void *)(buf + count); 1363 1364 /* 1365 * Validate the completed record 1366 */ 1367 if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size)) { 1368 errx(1, "read_mrecords: malformed record on pipe, bad crc"); 1369 /* not reached */ 1370 } 1371 1372 /* 1373 * If its a B-Tree record validate the data crc. 1374 * 1375 * NOTE: If the VFS passes us an explicitly errorde mrec 1376 * we just pass it through. 1377 */ 1378 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 1379 1380 if (type == HAMMER_MREC_TYPE_REC) { 1381 if (mrec->head.rec_size < 1382 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 1383 errx(1, "read_mrecords: malformed record on " 1384 "pipe, illegal element data_len"); 1385 /* not reached */ 1386 } 1387 if (mrec->rec.leaf.data_len && 1388 mrec->rec.leaf.data_offset && 1389 hammer_crc_test_leaf(HammerVersion, &mrec->rec + 1, &mrec->rec.leaf) == 0) { 1390 fprintf(stderr, 1391 "read_mrecords: data_crc did not " 1392 "match data! obj=%016jx key=%016jx\n", 1393 (uintmax_t)mrec->rec.leaf.base.obj_id, 1394 (uintmax_t)mrec->rec.leaf.base.key); 1395 fprintf(stderr, 1396 "continuing, but there are problems\n"); 1397 } 1398 } 1399 count += bytes; 1400 } 1401 return(count); 1402 } 1403 1404 /* 1405 * Read and return a single mrecord. 1406 */ 1407 static 1408 hammer_ioc_mrecord_any_t 1409 read_mrecord(int fdin, int *errorp, struct hammer_ioc_mrecord_head *pickup) 1410 { 1411 hammer_ioc_mrecord_any_t mrec; 1412 struct hammer_ioc_mrecord_head mrechd; 1413 size_t bytes; 1414 size_t n; 1415 size_t i; 1416 1417 if (pickup && pickup->type != 0) { 1418 mrechd = *pickup; 1419 pickup->signature = 0; 1420 pickup->type = 0; 1421 n = HAMMER_MREC_HEADSIZE; 1422 } else { 1423 /* 1424 * Read in the PFSD header from the sender. 1425 */ 1426 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1427 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 1428 if (i <= 0) 1429 break; 1430 } 1431 if (n == 0) { 1432 *errorp = 0; /* EOF */ 1433 return(NULL); 1434 } 1435 if (n != HAMMER_MREC_HEADSIZE) { 1436 fprintf(stderr, "short read of mrecord header\n"); 1437 *errorp = EPIPE; 1438 return(NULL); 1439 } 1440 } 1441 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1442 fprintf(stderr, "read_mrecord: bad signature\n"); 1443 *errorp = EINVAL; 1444 return(NULL); 1445 } 1446 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 1447 assert(bytes >= sizeof(mrechd)); 1448 mrec = malloc(bytes); 1449 mrec->head = mrechd; 1450 1451 while (n < bytes) { 1452 i = read(fdin, (char *)mrec + n, bytes - n); 1453 if (i <= 0) 1454 break; 1455 n += i; 1456 } 1457 if (n != bytes) { 1458 fprintf(stderr, "read_mrecord: short read on payload\n"); 1459 *errorp = EPIPE; 1460 return(NULL); 1461 } 1462 if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size)) { 1463 fprintf(stderr, "read_mrecord: bad CRC\n"); 1464 *errorp = EINVAL; 1465 return(NULL); 1466 } 1467 *errorp = 0; 1468 return(mrec); 1469 } 1470 1471 static 1472 void 1473 write_mrecord(int fdout, uint32_t type, hammer_ioc_mrecord_any_t mrec, 1474 int bytes) 1475 { 1476 char zbuf[HAMMER_HEAD_ALIGN]; 1477 int pad; 1478 1479 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 1480 1481 assert(bytes >= (int)sizeof(mrec->head)); 1482 bzero(&mrec->head, sizeof(mrec->head)); 1483 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 1484 mrec->head.type = type; 1485 mrec->head.rec_size = bytes; 1486 hammer_crc_set_mrec_head(&mrec->head, bytes); 1487 if (write(fdout, mrec, bytes) != bytes) { 1488 err(1, "write_mrecord"); 1489 /* not reached */ 1490 } 1491 if (pad) { 1492 bzero(zbuf, pad); 1493 if (write(fdout, zbuf, pad) != pad) { 1494 err(1, "write_mrecord"); 1495 /* not reached */ 1496 } 1497 } 1498 } 1499 1500 /* 1501 * Generate a mirroring header with the pfs information of the 1502 * originating filesytem. 1503 */ 1504 static 1505 void 1506 generate_mrec_header(int fd, int pfs_id, 1507 union hammer_ioc_mrecord_any *mrec_tmp) 1508 { 1509 struct hammer_ioc_pseudofs_rw pfs; 1510 1511 bzero(mrec_tmp, sizeof(*mrec_tmp)); 1512 clrpfs(&pfs, &mrec_tmp->pfs.pfsd, pfs_id); 1513 1514 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1515 err(1, "Mirror-read: not a HAMMER fs/pseudofs!"); 1516 /* not reached */ 1517 } 1518 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1519 errx(1, "Mirror-read: HAMMER PFS version mismatch!"); 1520 /* not reached */ 1521 } 1522 mrec_tmp->pfs.version = pfs.version; 1523 } 1524 1525 /* 1526 * Validate the pfs information from the originating filesystem 1527 * against the target filesystem. shared_uuid must match. 1528 * 1529 * return -1 if we got a TERM record 1530 */ 1531 static 1532 int 1533 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1534 struct hammer_ioc_mrecord_head *pickup, 1535 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1536 { 1537 struct hammer_ioc_pseudofs_rw pfs; 1538 struct hammer_pseudofs_data pfsd; 1539 hammer_ioc_mrecord_any_t mrec; 1540 int error; 1541 1542 /* 1543 * Get the PFSD info from the target filesystem. 1544 */ 1545 clrpfs(&pfs, &pfsd, pfs_id); 1546 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1547 err(1, "mirror-write: not a HAMMER fs/pseudofs!"); 1548 /* not reached */ 1549 } 1550 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1551 errx(1, "mirror-write: HAMMER PFS version mismatch!"); 1552 /* not reached */ 1553 } 1554 1555 mrec = read_mrecord(fdin, &error, pickup); 1556 if (mrec == NULL) { 1557 if (error == 0) { 1558 errx(1, "validate_mrec_header: short read"); 1559 /* not reached */ 1560 } 1561 exit(1); 1562 } 1563 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1564 free(mrec); 1565 return(-1); 1566 } 1567 1568 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1569 errx(1, "validate_mrec_header: did not get expected " 1570 "PFSD record type"); 1571 /* not reached */ 1572 } 1573 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1574 errx(1, "validate_mrec_header: unexpected payload size"); 1575 /* not reached */ 1576 } 1577 if (mrec->pfs.version != pfs.version) { 1578 errx(1, "validate_mrec_header: Version mismatch"); 1579 /* not reached */ 1580 } 1581 1582 /* 1583 * Whew. Ok, is the read PFS info compatible with the target? 1584 */ 1585 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1586 sizeof(pfsd.shared_uuid)) != 0) { 1587 errx(1, "mirror-write: source and target have " 1588 "different shared-uuid's!"); 1589 /* not reached */ 1590 } 1591 if (is_target && hammer_is_pfs_master(&pfsd)) { 1592 errx(1, "mirror-write: target must be in slave mode"); 1593 /* not reached */ 1594 } 1595 if (tid_begp) 1596 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1597 if (tid_endp) 1598 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1599 free(mrec); 1600 return(0); 1601 } 1602 1603 static 1604 void 1605 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1606 { 1607 struct hammer_ioc_pseudofs_rw pfs; 1608 struct hammer_pseudofs_data pfsd; 1609 1610 clrpfs(&pfs, &pfsd, pfs_id); 1611 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1612 err(1, "update_pfs_snapshot (read)"); 1613 /* not reached */ 1614 } 1615 1616 if (pfsd.sync_end_tid != snapshot_tid) { 1617 pfsd.sync_end_tid = snapshot_tid; 1618 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1619 err(1, "update_pfs_snapshot (rewrite)"); 1620 /* not reached */ 1621 } 1622 if (VerboseOpt >= 2) { 1623 fprintf(stderr, 1624 "Mirror-write: Completed, updated snapshot " 1625 "to %016jx\n", 1626 (uintmax_t)snapshot_tid); 1627 fflush(stderr); 1628 } 1629 } 1630 } 1631 1632 /* 1633 * Bandwidth-limited write in chunks 1634 */ 1635 static 1636 ssize_t 1637 writebw(int fd, const void *buf, size_t nbytes, 1638 uint64_t *bwcount, struct timeval *tv1) 1639 { 1640 struct timeval tv2; 1641 size_t n; 1642 ssize_t r; 1643 ssize_t a; 1644 int usec; 1645 1646 a = 0; 1647 r = 0; 1648 while (nbytes) { 1649 if (*bwcount + nbytes > BandwidthOpt) 1650 n = BandwidthOpt - *bwcount; 1651 else 1652 n = nbytes; 1653 if (n) 1654 r = write(fd, buf, n); 1655 if (r >= 0) { 1656 a += r; 1657 nbytes -= r; 1658 buf = (const char *)buf + r; 1659 } 1660 if ((size_t)r != n) 1661 break; 1662 *bwcount += n; 1663 if (*bwcount >= BandwidthOpt) { 1664 gettimeofday(&tv2, NULL); 1665 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1666 (int)(tv2.tv_usec - tv1->tv_usec); 1667 if (usec >= 0 && usec < 1000000) 1668 usleep(1000000 - usec); 1669 gettimeofday(tv1, NULL); 1670 *bwcount -= BandwidthOpt; 1671 } 1672 } 1673 return(a ? a : r); 1674 } 1675 1676 /* 1677 * Get a yes or no answer from the terminal. The program may be run as 1678 * part of a two-way pipe so we cannot use stdin for this operation. 1679 */ 1680 static 1681 int 1682 getyntty(void) 1683 { 1684 char buf[256]; 1685 FILE *fp; 1686 int result; 1687 1688 fp = fopen("/dev/tty", "r"); 1689 if (fp == NULL) { 1690 fprintf(stderr, "No terminal for response\n"); 1691 return(-1); 1692 } 1693 result = -1; 1694 while (fgets(buf, sizeof(buf), fp) != NULL) { 1695 if (buf[0] == 'y' || buf[0] == 'Y') { 1696 result = 1; 1697 break; 1698 } 1699 if (buf[0] == 'n' || buf[0] == 'N') { 1700 result = 0; 1701 break; 1702 } 1703 fprintf(stderr, "Response not understood\n"); 1704 break; 1705 } 1706 fclose(fp); 1707 return(result); 1708 } 1709 1710 static 1711 void 1712 score_printf(size_t i, size_t w, const char *ctl, ...) 1713 { 1714 va_list va; 1715 size_t n; 1716 static size_t SSize; 1717 static int SFd = -1; 1718 static char ScoreBuf[1024]; 1719 1720 if (ScoreBoardFile == NULL) 1721 return; 1722 assert(i + w < sizeof(ScoreBuf)); 1723 if (SFd < 0) { 1724 SFd = open(ScoreBoardFile, O_RDWR|O_CREAT|O_TRUNC, 0644); 1725 if (SFd < 0) 1726 return; 1727 SSize = 0; 1728 } 1729 for (n = 0; n < i; ++n) { 1730 if (ScoreBuf[n] == 0) 1731 ScoreBuf[n] = ' '; 1732 } 1733 va_start(va, ctl); 1734 vsnprintf(ScoreBuf + i, w - 1, ctl, va); 1735 va_end(va); 1736 n = strlen(ScoreBuf + i); 1737 while (n < w - 1) { 1738 ScoreBuf[i + n] = ' '; 1739 ++n; 1740 } 1741 ScoreBuf[i + n] = '\n'; 1742 if (SSize < i + w) 1743 SSize = i + w; 1744 pwrite(SFd, ScoreBuf, SSize, 0); 1745 } 1746 1747 static 1748 void 1749 hammer_check_restrict(const char *filesystem) 1750 { 1751 size_t rlen; 1752 int atslash; 1753 1754 if (RestrictTarget == NULL) 1755 return; 1756 rlen = strlen(RestrictTarget); 1757 if (strncmp(filesystem, RestrictTarget, rlen) != 0) { 1758 errx(1, "hammer-remote: restricted target"); 1759 /* not reached */ 1760 } 1761 1762 atslash = 1; 1763 while (filesystem[rlen]) { 1764 if (atslash && 1765 filesystem[rlen] == '.' && 1766 filesystem[rlen+1] == '.') { 1767 errx(1, "hammer-remote: '..' not allowed"); 1768 /* not reached */ 1769 } 1770 if (filesystem[rlen] == '/') 1771 atslash = 1; 1772 else 1773 atslash = 0; 1774 ++rlen; 1775 } 1776 } 1777 1778 static 1779 void 1780 mirror_usage(int code) 1781 { 1782 fprintf(stderr, 1783 "hammer mirror-read <filesystem> [begin-tid]\n" 1784 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1785 "hammer mirror-write <filesystem>\n" 1786 "hammer mirror-dump [header]\n" 1787 "hammer mirror-copy [[user@]host:]<filesystem>" 1788 " [[user@]host:]<filesystem>\n" 1789 "hammer mirror-stream [[user@]host:]<filesystem>" 1790 " [[user@]host:]<filesystem>\n" 1791 ); 1792 exit(code); 1793 } 1794 1795