1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 35 #include "hammer.h" 36 37 #define LINE1 0,20 38 #define LINE2 20,78 39 #define LINE3 90,70 40 41 #define SERIALBUF_SIZE (512 * 1024) 42 43 typedef struct histogram { 44 hammer_tid_t tid; 45 uint64_t bytes; 46 } *histogram_t; 47 48 const char *ScoreBoardFile; 49 const char *RestrictTarget; 50 51 static int read_mrecords(int fd, char *buf, u_int size, 52 struct hammer_ioc_mrecord_head *pickup); 53 static int generate_histogram(int fd, const char *filesystem, 54 histogram_t *histogram_ary, 55 struct hammer_ioc_mirror_rw *mirror_base, 56 int *repeatp); 57 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 58 struct hammer_ioc_mrecord_head *pickup); 59 static void write_mrecord(int fdout, uint32_t type, 60 hammer_ioc_mrecord_any_t mrec, int bytes); 61 static void generate_mrec_header(int fd, int pfs_id, 62 union hammer_ioc_mrecord_any *mrec_tmp); 63 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 64 struct hammer_ioc_mrecord_head *pickup, 65 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 66 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 67 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 68 uint64_t *bwcount, struct timeval *tv1); 69 static int getyntty(void); 70 static void score_printf(size_t i, size_t w, const char *ctl, ...); 71 static void hammer_check_restrict(const char *filesystem); 72 static void mirror_usage(int code); 73 74 /* 75 * Generate a mirroring data stream from the specific source over the 76 * entire key range, but restricted to the specified transaction range. 77 * 78 * The HAMMER VFS does most of the work, we add a few new mrecord 79 * types to negotiate the TID ranges and verify that the entire 80 * stream made it to the destination. 81 * 82 * streaming will be 0 for mirror-read, 1 for mirror-stream. The code will 83 * set up a fake value of -1 when running the histogram for mirror-read. 84 */ 85 void 86 hammer_cmd_mirror_read(char **av, int ac, int streaming) 87 { 88 struct hammer_ioc_mirror_rw mirror; 89 struct hammer_ioc_pseudofs_rw pfs; 90 union hammer_ioc_mrecord_any mrec_tmp; 91 struct hammer_ioc_mrecord_head pickup; 92 hammer_ioc_mrecord_any_t mrec; 93 hammer_tid_t sync_tid; 94 histogram_t histogram_ary; 95 const char *filesystem; 96 char *buf = malloc(SERIALBUF_SIZE); 97 int interrupted = 0; 98 int error; 99 int fd; 100 int n; 101 int didwork; 102 int histogram; 103 int histindex; 104 int histmax; 105 int repeat = 0; 106 int sameline; 107 int64_t total_bytes; 108 time_t base_t = time(NULL); 109 struct timeval bwtv; 110 uint64_t bwcount; 111 uint64_t estbytes; 112 113 if (ac == 0 || ac > 2) { 114 mirror_usage(1); 115 /* not reached */ 116 } 117 filesystem = av[0]; 118 hammer_check_restrict(filesystem); 119 120 pickup.signature = 0; 121 pickup.type = 0; 122 histogram = 0; 123 histindex = 0; 124 histmax = 0; 125 histogram_ary = NULL; 126 sameline = 0; 127 128 again: 129 bzero(&mirror, sizeof(mirror)); 130 hammer_key_beg_init(&mirror.key_beg); 131 hammer_key_end_init(&mirror.key_end); 132 133 fd = getpfs(&pfs, filesystem); 134 135 if (streaming >= 0) 136 score_printf(LINE1, "Running"); 137 138 if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) { 139 fprintf(stderr, "%cRunning \b\b", (sameline ? '\r' : '\n')); 140 fflush(stderr); 141 sameline = 1; 142 } 143 sameline = 1; 144 total_bytes = 0; 145 gettimeofday(&bwtv, NULL); 146 bwcount = 0; 147 148 /* 149 * Send initial header for the purpose of determining the 150 * shared-uuid. 151 */ 152 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 153 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 154 &mrec_tmp, sizeof(mrec_tmp.pfs)); 155 156 /* 157 * In 2-way mode the target will send us a PFS info packet 158 * first. Use the target's current snapshot TID as our default 159 * begin TID. 160 */ 161 if (TwoWayPipeOpt) { 162 mirror.tid_beg = 0; 163 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 164 NULL, &mirror.tid_beg); 165 if (n < 0) { /* got TERM record */ 166 relpfs(fd, &pfs); 167 free(buf); 168 free(histogram_ary); 169 return; 170 } 171 ++mirror.tid_beg; 172 } else if (streaming && histogram) { 173 mirror.tid_beg = histogram_ary[histindex].tid + 1; 174 } else { 175 mirror.tid_beg = 0; 176 } 177 178 /* 179 * Write out the PFS header, tid_beg will be updated if our PFS 180 * has a larger begin sync. tid_end is set to the latest source 181 * TID whos flush cycle has completed. 182 */ 183 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 184 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 185 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 186 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 187 mirror.ubuf = buf; 188 mirror.size = SERIALBUF_SIZE; 189 mirror.pfs_id = pfs.pfs_id; 190 mirror.shared_uuid = pfs.ondisk->shared_uuid; 191 192 /* 193 * XXX If the histogram is exhausted and the TID delta is large 194 * the stream might have been offline for a while and is 195 * now picking it up again. Do another histogram. 196 */ 197 #if 0 198 if (streaming && histogram && histindex == histend) { 199 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM) 200 histogram = 0; 201 } 202 #endif 203 204 /* 205 * Initial bulk startup control, try to do some incremental 206 * mirroring in order to allow the stream to be killed and 207 * restarted without having to start over. 208 */ 209 if (histogram == 0 && BulkOpt == 0) { 210 if (VerboseOpt && repeat == 0) { 211 fprintf(stderr, "\n"); 212 sameline = 0; 213 } 214 histmax = generate_histogram(fd, filesystem, 215 &histogram_ary, &mirror, 216 &repeat); 217 histindex = 0; 218 histogram = 1; 219 220 /* 221 * Just stream the histogram, then stop 222 */ 223 if (streaming == 0) 224 streaming = -1; 225 } 226 227 if (streaming && histogram) { 228 ++histindex; 229 mirror.tid_end = histogram_ary[histindex].tid; 230 estbytes = histogram_ary[histindex-1].bytes; 231 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end; 232 } else { 233 estbytes = 0; 234 } 235 236 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 237 &mrec_tmp, sizeof(mrec_tmp.pfs)); 238 239 /* 240 * A cycle file overrides the beginning TID only if we are 241 * not operating in two-way or histogram mode. 242 */ 243 if (TwoWayPipeOpt == 0 && histogram == 0) 244 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 245 246 /* 247 * An additional argument overrides the beginning TID regardless 248 * of what mode we are in. This is not recommending if operating 249 * in two-way mode. 250 */ 251 if (ac == 2) 252 mirror.tid_beg = strtoull(av[1], NULL, 0); 253 254 if (streaming == 0 || VerboseOpt >= 2) { 255 fprintf(stderr, 256 "Mirror-read: Mirror %016jx to %016jx", 257 (uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end); 258 if (histogram) 259 fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes); 260 fprintf(stderr, "\n"); 261 fflush(stderr); 262 } 263 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 264 fprintf(stderr, "Mirror-read: Resuming at object %016jx\n", 265 (uintmax_t)mirror.key_beg.obj_id); 266 } 267 268 /* 269 * Nothing to do if begin equals end. 270 */ 271 if (mirror.tid_beg >= mirror.tid_end) { 272 if (streaming == 0 || VerboseOpt >= 2) 273 fprintf(stderr, "Mirror-read: No work to do\n"); 274 sleep(DelayOpt); 275 didwork = 0; 276 histogram = 0; 277 goto done; 278 } 279 didwork = 1; 280 281 /* 282 * Write out bulk records 283 */ 284 mirror.ubuf = buf; 285 mirror.size = SERIALBUF_SIZE; 286 287 do { 288 mirror.count = 0; 289 mirror.pfs_id = pfs.pfs_id; 290 mirror.shared_uuid = pfs.ondisk->shared_uuid; 291 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 292 score_printf(LINE3, "Mirror-read %s failed: %s", 293 filesystem, strerror(errno)); 294 err(1, "Mirror-read %s failed", filesystem); 295 /* not reached */ 296 } 297 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 298 score_printf(LINE3, "Mirror-read %s fatal error %d", 299 filesystem, mirror.head.error); 300 errx(1, "Mirror-read %s fatal error %d", 301 filesystem, mirror.head.error); 302 /* not reached */ 303 } 304 if (mirror.count) { 305 if (BandwidthOpt) { 306 n = writebw(1, mirror.ubuf, mirror.count, 307 &bwcount, &bwtv); 308 } else { 309 n = write(1, mirror.ubuf, mirror.count); 310 } 311 if (n != mirror.count) { 312 score_printf(LINE3, 313 "Mirror-read %s failed: " 314 "short write", 315 filesystem); 316 errx(1, "Mirror-read %s failed: short write", 317 filesystem); 318 /* not reached */ 319 } 320 } 321 total_bytes += mirror.count; 322 if (streaming && VerboseOpt) { 323 fprintf(stderr, 324 "\rscan obj=%016jx tids=%016jx:%016jx %11jd", 325 (uintmax_t)mirror.key_cur.obj_id, 326 (uintmax_t)mirror.tid_beg, 327 (uintmax_t)mirror.tid_end, 328 (intmax_t)total_bytes); 329 fflush(stderr); 330 sameline = 0; 331 } else if (streaming) { 332 score_printf(LINE2, 333 "obj=%016jx tids=%016jx:%016jx %11jd", 334 (uintmax_t)mirror.key_cur.obj_id, 335 (uintmax_t)mirror.tid_beg, 336 (uintmax_t)mirror.tid_end, 337 (intmax_t)total_bytes); 338 } 339 mirror.key_beg = mirror.key_cur; 340 341 /* 342 * Deal with time limit option 343 */ 344 if (TimeoutOpt && 345 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 346 score_printf(LINE3, 347 "Mirror-read %s interrupted by timer at" 348 " %016jx", 349 filesystem, 350 (uintmax_t)mirror.key_cur.obj_id); 351 fprintf(stderr, 352 "Mirror-read %s interrupted by timer at" 353 " %016jx\n", 354 filesystem, 355 (uintmax_t)mirror.key_cur.obj_id); 356 interrupted = 1; 357 break; 358 } 359 } while (mirror.count != 0); 360 361 done: 362 if (streaming && VerboseOpt && sameline == 0) { 363 fprintf(stderr, "\n"); 364 fflush(stderr); 365 sameline = 1; 366 } 367 368 /* 369 * Write out the termination sync record - only if not interrupted 370 */ 371 if (interrupted == 0) { 372 if (didwork) { 373 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 374 &mrec_tmp, sizeof(mrec_tmp.sync)); 375 } else { 376 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 377 &mrec_tmp, sizeof(mrec_tmp.sync)); 378 } 379 } 380 381 /* 382 * If the -2 option was given (automatic when doing mirror-copy), 383 * a two-way pipe is assumed and we expect a response mrec from 384 * the target. 385 */ 386 if (TwoWayPipeOpt) { 387 mrec = read_mrecord(0, &error, &pickup); 388 if (mrec == NULL || 389 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 390 mrec->head.rec_size != sizeof(mrec->update)) { 391 errx(1, "mirror_read: Did not get final " 392 "acknowledgement packet from target"); 393 /* not reached */ 394 } 395 if (interrupted) { 396 if (CyclePath) { 397 hammer_set_cycle(&mirror.key_cur, 398 mirror.tid_beg); 399 fprintf(stderr, "Cyclefile %s updated for " 400 "continuation\n", CyclePath); 401 } 402 } else { 403 sync_tid = mrec->update.tid; 404 if (CyclePath) { 405 hammer_key_beg_init(&mirror.key_beg); 406 hammer_set_cycle(&mirror.key_beg, sync_tid); 407 fprintf(stderr, 408 "Cyclefile %s updated to 0x%016jx\n", 409 CyclePath, (uintmax_t)sync_tid); 410 } 411 } 412 free(mrec); 413 } else if (CyclePath) { 414 /* NOTE! mirror.tid_beg cannot be updated */ 415 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 416 "fully updated unless you use mirror-copy\n"); 417 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 418 } 419 if (streaming && interrupted == 0) { 420 time_t t1 = time(NULL); 421 time_t t2; 422 423 /* 424 * Try to break down large bulk transfers into smaller ones 425 * so it can sync the transaction id on the slave. This 426 * way if we get interrupted a restart doesn't have to 427 * start from scratch. 428 */ 429 if (streaming && histogram) { 430 if (histindex != histmax) { 431 if (VerboseOpt && VerboseOpt < 2 && 432 streaming >= 0) { 433 fprintf(stderr, " (bulk incremental)"); 434 } 435 relpfs(fd, &pfs); 436 goto again; 437 } 438 } 439 440 if (VerboseOpt && streaming >= 0) { 441 fprintf(stderr, " W"); 442 fflush(stderr); 443 } else if (streaming >= 0) { 444 score_printf(LINE1, "Waiting"); 445 } 446 pfs.ondisk->sync_end_tid = mirror.tid_end; 447 if (streaming < 0) { 448 /* 449 * Fake streaming mode when using a histogram to 450 * break up a mirror-read, do not wait on source. 451 */ 452 streaming = 0; 453 } else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 454 score_printf(LINE3, 455 "Mirror-read %s: cannot stream: %s\n", 456 filesystem, strerror(errno)); 457 fprintf(stderr, 458 "Mirror-read %s: cannot stream: %s\n", 459 filesystem, strerror(errno)); 460 } else { 461 t2 = time(NULL) - t1; 462 if (t2 >= 0 && t2 < DelayOpt) { 463 if (VerboseOpt) { 464 fprintf(stderr, "\bD"); 465 fflush(stderr); 466 } 467 sleep(DelayOpt - t2); 468 } 469 if (VerboseOpt) { 470 fprintf(stderr, "\b "); 471 fflush(stderr); 472 } 473 relpfs(fd, &pfs); 474 goto again; 475 } 476 } 477 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 478 &mrec_tmp, sizeof(mrec_tmp.sync)); 479 relpfs(fd, &pfs); 480 free(buf); 481 free(histogram_ary); 482 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 483 } 484 485 /* 486 * What we are trying to do here is figure out how much data is 487 * going to be sent for the TID range and to break the TID range 488 * down into reasonably-sized slices (from the point of view of 489 * data sent) so a lost connection can restart at a reasonable 490 * place and not all the way back at the beginning. 491 * 492 * An entry's TID serves as the end_tid for the prior entry 493 * So we have to offset the calculation by 1 so that TID falls into 494 * the previous entry when populating entries. 495 * 496 * Because the transaction id space is bursty we need a relatively 497 * large number of buckets (like a million) to do a reasonable job 498 * for things like an initial bulk mirrors on a very large filesystem. 499 */ 500 #define HIST_COUNT (1024 * 1024) 501 502 static 503 int 504 generate_histogram(int fd, const char *filesystem, 505 histogram_t *histogram_ary, 506 struct hammer_ioc_mirror_rw *mirror_base, 507 int *repeatp) 508 { 509 struct hammer_ioc_mirror_rw mirror; 510 union hammer_ioc_mrecord_any *mrec; 511 hammer_tid_t tid_beg; 512 hammer_tid_t tid_end; 513 hammer_tid_t tid; 514 hammer_tid_t tidx; 515 uint64_t *tid_bytes; 516 uint64_t total; 517 uint64_t accum; 518 int chunkno; 519 int i; 520 int res; 521 int off; 522 int len; 523 524 mirror = *mirror_base; 525 tid_beg = mirror.tid_beg; 526 tid_end = mirror.tid_end; 527 mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA; 528 529 if (*histogram_ary == NULL) { 530 *histogram_ary = malloc(sizeof(struct histogram) * 531 (HIST_COUNT + 2)); 532 } 533 if (tid_beg >= tid_end) 534 return(0); 535 536 /* needs 2 extra */ 537 tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2)); 538 bzero(tid_bytes, sizeof(*tid_bytes) * (HIST_COUNT + 2)); 539 540 if (*repeatp == 0) { 541 fprintf(stderr, "Prescan to break up bulk transfer"); 542 if (VerboseOpt > 1) { 543 fprintf(stderr, " (%juMB chunks)", 544 (uintmax_t)(SplitupOpt / (1024 * 1024))); 545 } 546 fprintf(stderr, "\n"); 547 } 548 549 /* 550 * Note: (tid_beg,tid_end), range is inclusive of both beg & end. 551 * 552 * Note: Estimates can be off when the mirror is way behind due 553 * to skips. 554 */ 555 total = 0; 556 accum = 0; 557 chunkno = 0; 558 for (;;) { 559 mirror.count = 0; 560 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 561 err(1, "Mirror-read %s failed", filesystem); 562 /* not reached */ 563 } 564 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 565 errx(1, "Mirror-read %s fatal error %d", 566 filesystem, mirror.head.error); 567 /* not reached */ 568 } 569 for (off = 0; 570 off < mirror.count; 571 off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size)) { 572 mrec = (void *)((char *)mirror.ubuf + off); 573 574 /* 575 * We only care about general RECs and PASS 576 * records. We ignore SKIPs. 577 */ 578 switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) { 579 case HAMMER_MREC_TYPE_REC: 580 case HAMMER_MREC_TYPE_PASS: 581 break; 582 default: 583 continue; 584 } 585 586 /* 587 * Calculate for two indices, create_tid and 588 * delete_tid. Record data only applies to 589 * the create_tid. 590 * 591 * When tid is exactly on the boundary it really 592 * belongs to the previous entry because scans 593 * are inclusive of the ending entry. 594 */ 595 tid = mrec->rec.leaf.base.delete_tid; 596 if (tid && tid >= tid_beg && tid <= tid_end) { 597 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 598 if (mrec->head.type == 599 HAMMER_MREC_TYPE_REC) { 600 len -= HAMMER_HEAD_DOALIGN( 601 mrec->rec.leaf.data_len); 602 assert(len > 0); 603 } 604 i = (tid - tid_beg) * HIST_COUNT / 605 (tid_end - tid_beg); 606 tidx = tid_beg + i * (tid_end - tid_beg) / 607 HIST_COUNT; 608 if (tid == tidx && i) 609 --i; 610 assert(i >= 0 && i < HIST_COUNT); 611 tid_bytes[i] += len; 612 total += len; 613 accum += len; 614 } 615 616 tid = mrec->rec.leaf.base.create_tid; 617 if (tid && tid >= tid_beg && tid <= tid_end) { 618 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 619 if (mrec->head.type == 620 HAMMER_MREC_TYPE_REC_NODATA) { 621 len += HAMMER_HEAD_DOALIGN( 622 mrec->rec.leaf.data_len); 623 } 624 i = (tid - tid_beg) * HIST_COUNT / 625 (tid_end - tid_beg); 626 tidx = tid_beg + i * (tid_end - tid_beg) / 627 HIST_COUNT; 628 if (tid == tidx && i) 629 --i; 630 assert(i >= 0 && i < HIST_COUNT); 631 tid_bytes[i] += len; 632 total += len; 633 accum += len; 634 } 635 } 636 if (*repeatp == 0 && accum > SplitupOpt) { 637 if (VerboseOpt > 1) { 638 fprintf(stderr, "."); 639 fflush(stderr); 640 } 641 ++chunkno; 642 score_printf(LINE2, "Prescan chunk %d", chunkno); 643 accum = 0; 644 } 645 if (mirror.count == 0) 646 break; 647 mirror.key_beg = mirror.key_cur; 648 } 649 650 /* 651 * Reduce to SplitupOpt (default 4GB) chunks. This code may 652 * use up to two additional elements. Do the array in-place. 653 * 654 * Inefficient degenerate cases can occur if we do not accumulate 655 * at least the requested split amount, so error on the side of 656 * going over a bit. 657 */ 658 res = 0; 659 (*histogram_ary)[res].tid = tid_beg; 660 (*histogram_ary)[res].bytes = tid_bytes[0]; 661 for (i = 1; i < HIST_COUNT; ++i) { 662 if ((*histogram_ary)[res].bytes >= SplitupOpt) { 663 ++res; 664 (*histogram_ary)[res].tid = tid_beg + 665 i * (tid_end - tid_beg) / 666 HIST_COUNT; 667 (*histogram_ary)[res].bytes = 0; 668 669 } 670 (*histogram_ary)[res].bytes += tid_bytes[i]; 671 } 672 ++res; 673 (*histogram_ary)[res].tid = tid_end; 674 (*histogram_ary)[res].bytes = -1; 675 676 if (*repeatp == 0) { 677 if (VerboseOpt > 1) 678 fprintf(stderr, "\n"); /* newline after ... */ 679 score_printf(LINE3, "Prescan %d chunks, total %ju MBytes", 680 res, (uintmax_t)total / (1024 * 1024)); 681 fprintf(stderr, "Prescan %d chunks, total %ju MBytes (", 682 res, (uintmax_t)total / (1024 * 1024)); 683 for (i = 0; i < res && i < 3; ++i) { 684 if (i) 685 fprintf(stderr, ", "); 686 fprintf(stderr, "%ju", 687 (uintmax_t)(*histogram_ary)[i].bytes); 688 } 689 if (i < res) 690 fprintf(stderr, ", ..."); 691 fprintf(stderr, ")\n"); 692 } 693 assert(res <= HIST_COUNT); 694 *repeatp = 1; 695 696 free(tid_bytes); 697 return(res); 698 } 699 700 static 701 void 702 create_pfs(const char *filesystem, hammer_uuid_t *s_uuid) 703 { 704 if (ForceYesOpt == 1) { 705 fprintf(stderr, "PFS slave %s does not exist. " 706 "Auto create new slave PFS!\n", filesystem); 707 708 } else { 709 fprintf(stderr, "PFS slave %s does not exist.\n" 710 "Do you want to create a new slave PFS? [y/n] ", 711 filesystem); 712 fflush(stderr); 713 if (getyntty() != 1) { 714 errx(1, "Aborting operation"); 715 /* not reached */ 716 } 717 } 718 719 char *shared_uuid = NULL; 720 hammer_uuid_to_string(s_uuid, &shared_uuid); 721 722 char *cmd = NULL; 723 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 724 filesystem, shared_uuid); 725 free(shared_uuid); 726 727 if (cmd == NULL) { 728 errx(1, "Failed to alloc memory"); 729 /* not reached */ 730 } 731 if (system(cmd) != 0) 732 fprintf(stderr, "Failed to create PFS\n"); 733 free(cmd); 734 } 735 736 /* 737 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 738 * some additional packet types to negotiate TID ranges and to verify 739 * completion. The HAMMER VFS does most of the work. 740 * 741 * It is important to note that the mirror.key_{beg,end} range must 742 * match the ranged used by the original. For now both sides use 743 * range the entire key space. 744 * 745 * It is even more important that the records in the stream conform 746 * to the TID range also supplied in the stream. The HAMMER VFS will 747 * use the REC, PASS, and SKIP record types to track the portions of 748 * the B-Tree being scanned in order to be able to proactively delete 749 * records on the target within those active areas that are not mentioned 750 * by the source. 751 * 752 * The mirror.key_cur field is used by the VFS to do this tracking. It 753 * must be initialized to key_beg but then is persistently updated by 754 * the HAMMER VFS on each successive ioctl() call. If you blow up this 755 * field you will blow up the mirror target, possibly to the point of 756 * deleting everything. As a safety measure the HAMMER VFS simply marks 757 * the records that the source has destroyed as deleted on the target, 758 * and normal pruning operations will deal with their final disposition 759 * at some later time. 760 */ 761 void 762 hammer_cmd_mirror_write(char **av, int ac) 763 { 764 struct hammer_ioc_mirror_rw mirror; 765 const char *filesystem; 766 char *buf = malloc(SERIALBUF_SIZE); 767 struct hammer_ioc_pseudofs_rw pfs; 768 struct hammer_ioc_mrecord_head pickup; 769 struct hammer_ioc_synctid synctid; 770 union hammer_ioc_mrecord_any mrec_tmp; 771 hammer_ioc_mrecord_any_t mrec; 772 struct stat st; 773 int error; 774 int fd; 775 int n; 776 777 if (ac != 1) { 778 mirror_usage(1); 779 /* not reached */ 780 } 781 filesystem = av[0]; 782 hammer_check_restrict(filesystem); 783 784 pickup.signature = 0; 785 pickup.type = 0; 786 787 again: 788 bzero(&mirror, sizeof(mirror)); 789 hammer_key_beg_init(&mirror.key_beg); 790 hammer_key_end_init(&mirror.key_end); 791 mirror.key_end = mirror.key_beg; 792 793 /* 794 * Read initial packet 795 */ 796 mrec = read_mrecord(0, &error, &pickup); 797 if (mrec == NULL) { 798 if (error == 0) { 799 errx(1, "validate_mrec_header: short read"); 800 /* not reached */ 801 } 802 exit(1); 803 } 804 /* 805 * Validate packet 806 */ 807 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 808 free(buf); 809 return; 810 } 811 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 812 errx(1, "validate_mrec_header: did not get expected " 813 "PFSD record type"); 814 /* not reached */ 815 } 816 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 817 errx(1, "validate_mrec_header: unexpected payload size"); 818 /* not reached */ 819 } 820 821 /* 822 * Create slave PFS if it doesn't yet exist 823 */ 824 if (lstat(filesystem, &st) != 0) 825 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 826 free(mrec); 827 mrec = NULL; 828 829 fd = getpfs(&pfs, filesystem); 830 831 /* 832 * In two-way mode the target writes out a PFS packet first. 833 * The source uses our tid_end as its tid_beg by default, 834 * picking up where it left off. 835 */ 836 mirror.tid_beg = 0; 837 if (TwoWayPipeOpt) { 838 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 839 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 840 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 841 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 842 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 843 &mrec_tmp, sizeof(mrec_tmp.pfs)); 844 } 845 846 /* 847 * Read and process the PFS header. The source informs us of 848 * the TID range the stream represents. 849 */ 850 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 851 &mirror.tid_beg, &mirror.tid_end); 852 if (n < 0) { /* got TERM record */ 853 relpfs(fd, &pfs); 854 free(buf); 855 return; 856 } 857 858 mirror.ubuf = buf; 859 mirror.size = SERIALBUF_SIZE; 860 861 /* 862 * Read and process bulk records (REC, PASS, and SKIP types). 863 * 864 * On your life, do NOT mess with mirror.key_cur or your mirror 865 * target may become history. 866 */ 867 for (;;) { 868 mirror.count = 0; 869 mirror.pfs_id = pfs.pfs_id; 870 mirror.shared_uuid = pfs.ondisk->shared_uuid; 871 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 872 if (mirror.size <= 0) 873 break; 874 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 875 err(1, "Mirror-write %s failed", filesystem); 876 /* not reached */ 877 } 878 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 879 errx(1, "Mirror-write %s fatal error %d", 880 filesystem, mirror.head.error); 881 /* not reached */ 882 } 883 #if 0 884 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 885 errx(1, "Mirror-write %s interrupted by timer at" 886 " %016llx", 887 filesystem, 888 mirror.key_cur.obj_id); 889 /* not reached */ 890 } 891 #endif 892 } 893 894 /* 895 * Read and process the termination sync record. 896 */ 897 mrec = read_mrecord(0, &error, &pickup); 898 899 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 900 fprintf(stderr, "Mirror-write: received termination request\n"); 901 relpfs(fd, &pfs); 902 free(mrec); 903 free(buf); 904 return; 905 } 906 907 if (mrec == NULL || 908 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 909 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 910 mrec->head.rec_size != sizeof(mrec->sync)) { 911 errx(1, "Mirror-write %s: Did not get termination " 912 "sync record, or rec_size is wrong rt=%d", 913 filesystem, (mrec ? (int)mrec->head.type : -1)); 914 /* not reached */ 915 } 916 917 /* 918 * Update the PFS info on the target so the user has visibility 919 * into the new snapshot, and sync the target filesystem. 920 */ 921 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 922 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 923 924 bzero(&synctid, sizeof(synctid)); 925 synctid.op = HAMMER_SYNCTID_SYNC2; 926 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 927 928 if (VerboseOpt >= 2) { 929 fprintf(stderr, "Mirror-write %s: succeeded\n", 930 filesystem); 931 } 932 } 933 934 free(mrec); 935 mrec = NULL; 936 937 /* 938 * Report back to the originator. 939 */ 940 if (TwoWayPipeOpt) { 941 mrec_tmp.update.tid = mirror.tid_end; 942 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 943 &mrec_tmp, sizeof(mrec_tmp.update)); 944 } else { 945 printf("Source can update synctid to 0x%016jx\n", 946 (uintmax_t)mirror.tid_end); 947 } 948 relpfs(fd, &pfs); 949 goto again; 950 } 951 952 void 953 hammer_cmd_mirror_dump(char **av, int ac) 954 { 955 char *buf = malloc(SERIALBUF_SIZE); 956 struct hammer_ioc_mrecord_head pickup; 957 hammer_ioc_mrecord_any_t mrec; 958 int error; 959 int size; 960 int offset; 961 int bytes; 962 int header_only = 0; 963 964 if (ac == 1 && strcmp(*av, "header") == 0) { 965 header_only = 1; 966 } else if (ac != 0) { 967 mirror_usage(1); 968 /* not reached */ 969 } 970 971 /* 972 * Read and process the PFS header 973 */ 974 pickup.signature = 0; 975 pickup.type = 0; 976 977 mrec = read_mrecord(0, &error, &pickup); 978 979 /* 980 * Dump the PFS header. mirror-dump takes its input from the output 981 * of a mirror-read so getpfs() can't be used to get a fd to be passed 982 * to dump_pfsd(). 983 */ 984 if (header_only && mrec != NULL) { 985 dump_pfsd(&mrec->pfs.pfsd, -1); 986 free(mrec); 987 free(buf); 988 return; 989 } 990 free(mrec); 991 992 again: 993 /* 994 * Read and process bulk records 995 */ 996 for (;;) { 997 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 998 if (size <= 0) 999 break; 1000 offset = 0; 1001 while (offset < size) { 1002 mrec = (void *)((char *)buf + offset); 1003 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 1004 if (offset + bytes > size) { 1005 errx(1, "Misaligned record"); 1006 /* not reached */ 1007 } 1008 1009 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 1010 case HAMMER_MREC_TYPE_REC_BADCRC: 1011 case HAMMER_MREC_TYPE_REC: 1012 printf("Record lo=%08x obj=%016jx key=%016jx " 1013 "rt=%02x ot=%02x", 1014 mrec->rec.leaf.base.localization, 1015 (uintmax_t)mrec->rec.leaf.base.obj_id, 1016 (uintmax_t)mrec->rec.leaf.base.key, 1017 mrec->rec.leaf.base.rec_type, 1018 mrec->rec.leaf.base.obj_type); 1019 if (mrec->head.type == 1020 HAMMER_MREC_TYPE_REC_BADCRC) { 1021 printf(" (BAD CRC)"); 1022 } 1023 printf("\n"); 1024 printf(" tids %016jx:%016jx data=%d\n", 1025 (uintmax_t)mrec->rec.leaf.base.create_tid, 1026 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1027 mrec->rec.leaf.data_len); 1028 break; 1029 case HAMMER_MREC_TYPE_PASS: 1030 printf("Pass lo=%08x obj=%016jx key=%016jx " 1031 "rt=%02x ot=%02x\n", 1032 mrec->rec.leaf.base.localization, 1033 (uintmax_t)mrec->rec.leaf.base.obj_id, 1034 (uintmax_t)mrec->rec.leaf.base.key, 1035 mrec->rec.leaf.base.rec_type, 1036 mrec->rec.leaf.base.obj_type); 1037 printf(" tids %016jx:%016jx data=%d\n", 1038 (uintmax_t)mrec->rec.leaf.base.create_tid, 1039 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1040 mrec->rec.leaf.data_len); 1041 break; 1042 case HAMMER_MREC_TYPE_SKIP: 1043 printf("Skip lo=%08x obj=%016jx key=%016jx rt=%02x to\n" 1044 " lo=%08x obj=%016jx key=%016jx rt=%02x\n", 1045 mrec->skip.skip_beg.localization, 1046 (uintmax_t)mrec->skip.skip_beg.obj_id, 1047 (uintmax_t)mrec->skip.skip_beg.key, 1048 mrec->skip.skip_beg.rec_type, 1049 mrec->skip.skip_end.localization, 1050 (uintmax_t)mrec->skip.skip_end.obj_id, 1051 (uintmax_t)mrec->skip.skip_end.key, 1052 mrec->skip.skip_end.rec_type); 1053 default: 1054 break; 1055 } 1056 offset += bytes; 1057 } 1058 } 1059 1060 /* 1061 * Read and process the termination sync record. 1062 */ 1063 mrec = read_mrecord(0, &error, &pickup); 1064 if (mrec == NULL || 1065 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 1066 mrec->head.type != HAMMER_MREC_TYPE_IDLE)) { 1067 fprintf(stderr, "Mirror-dump: Did not get termination " 1068 "sync record\n"); 1069 } 1070 free(mrec); 1071 1072 /* 1073 * Continue with more batches until EOF. 1074 */ 1075 mrec = read_mrecord(0, &error, &pickup); 1076 if (mrec) { 1077 free(mrec); 1078 goto again; 1079 } 1080 free(buf); 1081 } 1082 1083 void 1084 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 1085 { 1086 pid_t pid1; 1087 pid_t pid2; 1088 int fds[2]; 1089 const char *xav[32]; 1090 char tbuf[16]; 1091 char *sh, *user, *host, *rfs; 1092 int xac; 1093 1094 if (ac != 2) { 1095 mirror_usage(1); 1096 /* not reached */ 1097 } 1098 1099 TwoWayPipeOpt = 1; 1100 signal(SIGPIPE, SIG_IGN); 1101 1102 again: 1103 if (pipe(fds) < 0) { 1104 err(1, "pipe"); 1105 /* not reached */ 1106 } 1107 1108 /* 1109 * Source 1110 */ 1111 if ((pid1 = fork()) == 0) { 1112 signal(SIGPIPE, SIG_DFL); 1113 dup2(fds[0], 0); 1114 dup2(fds[0], 1); 1115 close(fds[0]); 1116 close(fds[1]); 1117 if ((rfs = strchr(av[0], ':')) != NULL) { 1118 xac = 0; 1119 1120 if((sh = getenv("HAMMER_RSH")) == NULL) 1121 xav[xac++] = "ssh"; 1122 else 1123 xav[xac++] = sh; 1124 1125 if (CompressOpt) 1126 xav[xac++] = "-C"; 1127 1128 if ((host = strchr(av[0], '@')) != NULL) { 1129 user = strndup( av[0], (host++ - av[0])); 1130 host = strndup( host, (rfs++ - host)); 1131 xav[xac++] = "-l"; 1132 xav[xac++] = user; 1133 xav[xac++] = host; 1134 } else { 1135 host = strndup( av[0], (rfs++ - av[0])); 1136 user = NULL; 1137 xav[xac++] = host; 1138 } 1139 1140 1141 if (SshPort) { 1142 xav[xac++] = "-p"; 1143 xav[xac++] = SshPort; 1144 } 1145 1146 xav[xac++] = "hammer"; 1147 1148 switch(VerboseOpt) { 1149 case 0: 1150 break; 1151 case 1: 1152 xav[xac++] = "-v"; 1153 break; 1154 case 2: 1155 xav[xac++] = "-vv"; 1156 break; 1157 default: 1158 xav[xac++] = "-vvv"; 1159 break; 1160 } 1161 if (ForceYesOpt) 1162 xav[xac++] = "-y"; 1163 xav[xac++] = "-2"; 1164 if (TimeoutOpt) { 1165 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 1166 xav[xac++] = "-t"; 1167 xav[xac++] = tbuf; 1168 } 1169 if (SplitupOptStr) { 1170 xav[xac++] = "-S"; 1171 xav[xac++] = SplitupOptStr; 1172 } 1173 if (streaming) 1174 xav[xac++] = "mirror-read-stream"; 1175 else 1176 xav[xac++] = "mirror-read"; 1177 xav[xac++] = rfs; 1178 xav[xac++] = NULL; 1179 execvp(*xav, (void *)xav); 1180 } else { 1181 hammer_cmd_mirror_read(av, 1, streaming); 1182 fflush(stdout); 1183 fflush(stderr); 1184 } 1185 _exit(1); 1186 } 1187 1188 /* 1189 * Target 1190 */ 1191 if ((pid2 = fork()) == 0) { 1192 signal(SIGPIPE, SIG_DFL); 1193 dup2(fds[1], 0); 1194 dup2(fds[1], 1); 1195 close(fds[0]); 1196 close(fds[1]); 1197 if ((rfs = strchr(av[1], ':')) != NULL) { 1198 xac = 0; 1199 1200 if((sh = getenv("HAMMER_RSH")) == NULL) 1201 xav[xac++] = "ssh"; 1202 else 1203 xav[xac++] = sh; 1204 1205 if (CompressOpt) 1206 xav[xac++] = "-C"; 1207 1208 if ((host = strchr(av[1], '@')) != NULL) { 1209 user = strndup( av[1], (host++ - av[1])); 1210 host = strndup( host, (rfs++ - host)); 1211 xav[xac++] = "-l"; 1212 xav[xac++] = user; 1213 xav[xac++] = host; 1214 } else { 1215 host = strndup( av[1], (rfs++ - av[1])); 1216 user = NULL; 1217 xav[xac++] = host; 1218 } 1219 1220 if (SshPort) { 1221 xav[xac++] = "-p"; 1222 xav[xac++] = SshPort; 1223 } 1224 1225 xav[xac++] = "hammer"; 1226 1227 switch(VerboseOpt) { 1228 case 0: 1229 break; 1230 case 1: 1231 xav[xac++] = "-v"; 1232 break; 1233 case 2: 1234 xav[xac++] = "-vv"; 1235 break; 1236 default: 1237 xav[xac++] = "-vvv"; 1238 break; 1239 } 1240 if (ForceYesOpt) 1241 xav[xac++] = "-y"; 1242 xav[xac++] = "-2"; 1243 xav[xac++] = "mirror-write"; 1244 xav[xac++] = rfs; 1245 xav[xac++] = NULL; 1246 execvp(*xav, (void *)xav); 1247 } else { 1248 hammer_cmd_mirror_write(av + 1, 1); 1249 fflush(stdout); 1250 fflush(stderr); 1251 } 1252 _exit(1); 1253 } 1254 close(fds[0]); 1255 close(fds[1]); 1256 1257 while (waitpid(pid1, NULL, 0) <= 0) 1258 ; 1259 while (waitpid(pid2, NULL, 0) <= 0) 1260 ; 1261 1262 /* 1263 * If the link is lost restart 1264 */ 1265 if (streaming) { 1266 if (VerboseOpt) { 1267 fprintf(stderr, "\nLost Link\n"); 1268 fflush(stderr); 1269 } 1270 sleep(15 + DelayOpt); 1271 goto again; 1272 } 1273 1274 } 1275 1276 /* 1277 * Read and return multiple mrecords 1278 */ 1279 static 1280 int 1281 read_mrecords(int fd, char *buf, u_int size, struct hammer_ioc_mrecord_head *pickup) 1282 { 1283 hammer_ioc_mrecord_any_t mrec; 1284 u_int count; 1285 size_t n; 1286 size_t i; 1287 size_t bytes; 1288 int type; 1289 1290 count = 0; 1291 while (size - count >= HAMMER_MREC_HEADSIZE) { 1292 /* 1293 * Cached the record header in case we run out of buffer 1294 * space. 1295 */ 1296 fflush(stdout); 1297 if (pickup->signature == 0) { 1298 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1299 i = read(fd, (char *)pickup + n, 1300 HAMMER_MREC_HEADSIZE - n); 1301 if (i <= 0) 1302 break; 1303 } 1304 if (n == 0) 1305 break; 1306 if (n != HAMMER_MREC_HEADSIZE) { 1307 errx(1, "read_mrecords: short read on pipe"); 1308 /* not reached */ 1309 } 1310 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1311 errx(1, "read_mrecords: malformed record on pipe, " 1312 "bad signature"); 1313 /* not reached */ 1314 } 1315 } 1316 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 1317 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 1318 errx(1, "read_mrecords: malformed record on pipe, " 1319 "illegal rec_size"); 1320 /* not reached */ 1321 } 1322 1323 /* 1324 * Stop if we have insufficient space for the record and data. 1325 */ 1326 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 1327 if (size - count < bytes) 1328 break; 1329 1330 /* 1331 * Stop if the record type is not a REC, SKIP, or PASS, 1332 * which are the only types the ioctl supports. Other types 1333 * are used only by the userland protocol. 1334 * 1335 * Ignore all flags. 1336 */ 1337 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 1338 if (type != HAMMER_MREC_TYPE_PFSD && 1339 type != HAMMER_MREC_TYPE_REC && 1340 type != HAMMER_MREC_TYPE_SKIP && 1341 type != HAMMER_MREC_TYPE_PASS) { 1342 break; 1343 } 1344 1345 /* 1346 * Read the remainder and clear the pickup signature. 1347 */ 1348 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 1349 i = read(fd, buf + count + n, bytes - n); 1350 if (i <= 0) 1351 break; 1352 } 1353 if (n != bytes) { 1354 errx(1, "read_mrecords: short read on pipe"); 1355 /* not reached */ 1356 } 1357 1358 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 1359 pickup->signature = 0; 1360 pickup->type = 0; 1361 mrec = (void *)(buf + count); 1362 1363 /* 1364 * Validate the completed record 1365 */ 1366 if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size)) { 1367 errx(1, "read_mrecords: malformed record on pipe, bad crc"); 1368 /* not reached */ 1369 } 1370 1371 /* 1372 * If its a B-Tree record validate the data crc. 1373 * 1374 * NOTE: If the VFS passes us an explicitly errorde mrec 1375 * we just pass it through. 1376 */ 1377 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 1378 1379 if (type == HAMMER_MREC_TYPE_REC) { 1380 if (mrec->head.rec_size < 1381 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 1382 errx(1, "read_mrecords: malformed record on " 1383 "pipe, illegal element data_len"); 1384 /* not reached */ 1385 } 1386 if (mrec->rec.leaf.data_len && 1387 mrec->rec.leaf.data_offset && 1388 hammer_crc_test_leaf(HammerVersion, &mrec->rec + 1, &mrec->rec.leaf) == 0) { 1389 fprintf(stderr, 1390 "read_mrecords: data_crc did not " 1391 "match data! obj=%016jx key=%016jx\n", 1392 (uintmax_t)mrec->rec.leaf.base.obj_id, 1393 (uintmax_t)mrec->rec.leaf.base.key); 1394 fprintf(stderr, 1395 "continuing, but there are problems\n"); 1396 } 1397 } 1398 count += bytes; 1399 } 1400 return(count); 1401 } 1402 1403 /* 1404 * Read and return a single mrecord. 1405 */ 1406 static 1407 hammer_ioc_mrecord_any_t 1408 read_mrecord(int fdin, int *errorp, struct hammer_ioc_mrecord_head *pickup) 1409 { 1410 hammer_ioc_mrecord_any_t mrec; 1411 struct hammer_ioc_mrecord_head mrechd; 1412 size_t bytes; 1413 size_t n; 1414 size_t i; 1415 1416 if (pickup && pickup->type != 0) { 1417 mrechd = *pickup; 1418 pickup->signature = 0; 1419 pickup->type = 0; 1420 n = HAMMER_MREC_HEADSIZE; 1421 } else { 1422 /* 1423 * Read in the PFSD header from the sender. 1424 */ 1425 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1426 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 1427 if (i <= 0) 1428 break; 1429 } 1430 if (n == 0) { 1431 *errorp = 0; /* EOF */ 1432 return(NULL); 1433 } 1434 if (n != HAMMER_MREC_HEADSIZE) { 1435 fprintf(stderr, "short read of mrecord header\n"); 1436 *errorp = EPIPE; 1437 return(NULL); 1438 } 1439 } 1440 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1441 fprintf(stderr, "read_mrecord: bad signature\n"); 1442 *errorp = EINVAL; 1443 return(NULL); 1444 } 1445 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 1446 assert(bytes >= sizeof(mrechd)); 1447 mrec = malloc(bytes); 1448 mrec->head = mrechd; 1449 1450 while (n < bytes) { 1451 i = read(fdin, (char *)mrec + n, bytes - n); 1452 if (i <= 0) 1453 break; 1454 n += i; 1455 } 1456 if (n != bytes) { 1457 fprintf(stderr, "read_mrecord: short read on payload\n"); 1458 *errorp = EPIPE; 1459 return(NULL); 1460 } 1461 if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size)) { 1462 fprintf(stderr, "read_mrecord: bad CRC\n"); 1463 *errorp = EINVAL; 1464 return(NULL); 1465 } 1466 *errorp = 0; 1467 return(mrec); 1468 } 1469 1470 static 1471 void 1472 write_mrecord(int fdout, uint32_t type, hammer_ioc_mrecord_any_t mrec, 1473 int bytes) 1474 { 1475 char zbuf[HAMMER_HEAD_ALIGN]; 1476 int pad; 1477 1478 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 1479 1480 assert(bytes >= (int)sizeof(mrec->head)); 1481 bzero(&mrec->head, sizeof(mrec->head)); 1482 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 1483 mrec->head.type = type; 1484 mrec->head.rec_size = bytes; 1485 hammer_crc_set_mrec_head(&mrec->head, bytes); 1486 if (write(fdout, mrec, bytes) != bytes) { 1487 err(1, "write_mrecord"); 1488 /* not reached */ 1489 } 1490 if (pad) { 1491 bzero(zbuf, pad); 1492 if (write(fdout, zbuf, pad) != pad) { 1493 err(1, "write_mrecord"); 1494 /* not reached */ 1495 } 1496 } 1497 } 1498 1499 /* 1500 * Generate a mirroring header with the pfs information of the 1501 * originating filesytem. 1502 */ 1503 static 1504 void 1505 generate_mrec_header(int fd, int pfs_id, 1506 union hammer_ioc_mrecord_any *mrec_tmp) 1507 { 1508 struct hammer_ioc_pseudofs_rw pfs; 1509 1510 bzero(mrec_tmp, sizeof(*mrec_tmp)); 1511 clrpfs(&pfs, &mrec_tmp->pfs.pfsd, pfs_id); 1512 1513 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1514 err(1, "Mirror-read: not a HAMMER fs/pseudofs!"); 1515 /* not reached */ 1516 } 1517 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1518 errx(1, "Mirror-read: HAMMER PFS version mismatch!"); 1519 /* not reached */ 1520 } 1521 mrec_tmp->pfs.version = pfs.version; 1522 } 1523 1524 /* 1525 * Validate the pfs information from the originating filesystem 1526 * against the target filesystem. shared_uuid must match. 1527 * 1528 * return -1 if we got a TERM record 1529 */ 1530 static 1531 int 1532 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1533 struct hammer_ioc_mrecord_head *pickup, 1534 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1535 { 1536 struct hammer_ioc_pseudofs_rw pfs; 1537 struct hammer_pseudofs_data pfsd; 1538 hammer_ioc_mrecord_any_t mrec; 1539 int error; 1540 1541 /* 1542 * Get the PFSD info from the target filesystem. 1543 */ 1544 clrpfs(&pfs, &pfsd, pfs_id); 1545 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1546 err(1, "mirror-write: not a HAMMER fs/pseudofs!"); 1547 /* not reached */ 1548 } 1549 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1550 errx(1, "mirror-write: HAMMER PFS version mismatch!"); 1551 /* not reached */ 1552 } 1553 1554 mrec = read_mrecord(fdin, &error, pickup); 1555 if (mrec == NULL) { 1556 if (error == 0) { 1557 errx(1, "validate_mrec_header: short read"); 1558 /* not reached */ 1559 } 1560 exit(1); 1561 } 1562 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1563 free(mrec); 1564 return(-1); 1565 } 1566 1567 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1568 errx(1, "validate_mrec_header: did not get expected " 1569 "PFSD record type"); 1570 /* not reached */ 1571 } 1572 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1573 errx(1, "validate_mrec_header: unexpected payload size"); 1574 /* not reached */ 1575 } 1576 if (mrec->pfs.version != pfs.version) { 1577 errx(1, "validate_mrec_header: Version mismatch"); 1578 /* not reached */ 1579 } 1580 1581 /* 1582 * Whew. Ok, is the read PFS info compatible with the target? 1583 */ 1584 if (hammer_uuid_compare(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid)) { 1585 errx(1, "mirror-write: source and target have " 1586 "different shared-uuid's!"); 1587 /* not reached */ 1588 } 1589 if (is_target && hammer_is_pfs_master(&pfsd)) { 1590 errx(1, "mirror-write: target must be in slave mode"); 1591 /* not reached */ 1592 } 1593 if (tid_begp) 1594 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1595 if (tid_endp) 1596 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1597 free(mrec); 1598 return(0); 1599 } 1600 1601 static 1602 void 1603 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1604 { 1605 struct hammer_ioc_pseudofs_rw pfs; 1606 struct hammer_pseudofs_data pfsd; 1607 1608 clrpfs(&pfs, &pfsd, pfs_id); 1609 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1610 err(1, "update_pfs_snapshot (read)"); 1611 /* not reached */ 1612 } 1613 1614 if (pfsd.sync_end_tid != snapshot_tid) { 1615 pfsd.sync_end_tid = snapshot_tid; 1616 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1617 err(1, "update_pfs_snapshot (rewrite)"); 1618 /* not reached */ 1619 } 1620 if (VerboseOpt >= 2) { 1621 fprintf(stderr, 1622 "Mirror-write: Completed, updated snapshot " 1623 "to %016jx\n", 1624 (uintmax_t)snapshot_tid); 1625 fflush(stderr); 1626 } 1627 } 1628 } 1629 1630 /* 1631 * Bandwidth-limited write in chunks 1632 */ 1633 static 1634 ssize_t 1635 writebw(int fd, const void *buf, size_t nbytes, 1636 uint64_t *bwcount, struct timeval *tv1) 1637 { 1638 struct timeval tv2; 1639 size_t n; 1640 ssize_t r; 1641 ssize_t a; 1642 int usec; 1643 1644 a = 0; 1645 r = 0; 1646 while (nbytes) { 1647 if (*bwcount + nbytes > BandwidthOpt) 1648 n = BandwidthOpt - *bwcount; 1649 else 1650 n = nbytes; 1651 if (n) 1652 r = write(fd, buf, n); 1653 if (r >= 0) { 1654 a += r; 1655 nbytes -= r; 1656 buf = (const char *)buf + r; 1657 } 1658 if ((size_t)r != n) 1659 break; 1660 *bwcount += n; 1661 if (*bwcount >= BandwidthOpt) { 1662 gettimeofday(&tv2, NULL); 1663 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1664 (int)(tv2.tv_usec - tv1->tv_usec); 1665 if (usec >= 0 && usec < 1000000) 1666 usleep(1000000 - usec); 1667 gettimeofday(tv1, NULL); 1668 *bwcount -= BandwidthOpt; 1669 } 1670 } 1671 return(a ? a : r); 1672 } 1673 1674 /* 1675 * Get a yes or no answer from the terminal. The program may be run as 1676 * part of a two-way pipe so we cannot use stdin for this operation. 1677 */ 1678 static 1679 int 1680 getyntty(void) 1681 { 1682 char buf[256]; 1683 FILE *fp; 1684 int result; 1685 1686 fp = fopen("/dev/tty", "r"); 1687 if (fp == NULL) { 1688 fprintf(stderr, "No terminal for response\n"); 1689 return(-1); 1690 } 1691 result = -1; 1692 while (fgets(buf, sizeof(buf), fp) != NULL) { 1693 if (buf[0] == 'y' || buf[0] == 'Y') { 1694 result = 1; 1695 break; 1696 } 1697 if (buf[0] == 'n' || buf[0] == 'N') { 1698 result = 0; 1699 break; 1700 } 1701 fprintf(stderr, "Response not understood\n"); 1702 break; 1703 } 1704 fclose(fp); 1705 return(result); 1706 } 1707 1708 static 1709 void 1710 score_printf(size_t i, size_t w, const char *ctl, ...) 1711 { 1712 va_list va; 1713 size_t n; 1714 static size_t SSize; 1715 static int SFd = -1; 1716 static char ScoreBuf[1024]; 1717 1718 if (ScoreBoardFile == NULL) 1719 return; 1720 assert(i + w < sizeof(ScoreBuf)); 1721 if (SFd < 0) { 1722 SFd = open(ScoreBoardFile, O_RDWR|O_CREAT|O_TRUNC, 0644); 1723 if (SFd < 0) 1724 return; 1725 SSize = 0; 1726 } 1727 for (n = 0; n < i; ++n) { 1728 if (ScoreBuf[n] == 0) 1729 ScoreBuf[n] = ' '; 1730 } 1731 va_start(va, ctl); 1732 vsnprintf(ScoreBuf + i, w - 1, ctl, va); 1733 va_end(va); 1734 n = strlen(ScoreBuf + i); 1735 while (n < w - 1) { 1736 ScoreBuf[i + n] = ' '; 1737 ++n; 1738 } 1739 ScoreBuf[i + n] = '\n'; 1740 if (SSize < i + w) 1741 SSize = i + w; 1742 pwrite(SFd, ScoreBuf, SSize, 0); 1743 } 1744 1745 static 1746 void 1747 hammer_check_restrict(const char *filesystem) 1748 { 1749 size_t rlen; 1750 int atslash; 1751 1752 if (RestrictTarget == NULL) 1753 return; 1754 rlen = strlen(RestrictTarget); 1755 if (strncmp(filesystem, RestrictTarget, rlen) != 0) { 1756 errx(1, "hammer-remote: restricted target"); 1757 /* not reached */ 1758 } 1759 1760 atslash = 1; 1761 while (filesystem[rlen]) { 1762 if (atslash && 1763 filesystem[rlen] == '.' && 1764 filesystem[rlen+1] == '.') { 1765 errx(1, "hammer-remote: '..' not allowed"); 1766 /* not reached */ 1767 } 1768 if (filesystem[rlen] == '/') 1769 atslash = 1; 1770 else 1771 atslash = 0; 1772 ++rlen; 1773 } 1774 } 1775 1776 static 1777 void 1778 mirror_usage(int code) 1779 { 1780 fprintf(stderr, 1781 "hammer mirror-read <filesystem> [begin-tid]\n" 1782 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1783 "hammer mirror-write <filesystem>\n" 1784 "hammer mirror-dump [header]\n" 1785 "hammer mirror-copy [[user@]host:]<filesystem>" 1786 " [[user@]host:]<filesystem>\n" 1787 "hammer mirror-stream [[user@]host:]<filesystem>" 1788 " [[user@]host:]<filesystem>\n" 1789 ); 1790 exit(code); 1791 } 1792 1793