1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 35 #include "hammer.h" 36 37 #define LINE1 0,20 38 #define LINE2 20,78 39 #define LINE3 90,70 40 41 #define SERIALBUF_SIZE (512 * 1024) 42 43 typedef struct histogram { 44 hammer_tid_t tid; 45 uint64_t bytes; 46 } *histogram_t; 47 48 const char *ScoreBoardFile; 49 const char *RestrictTarget; 50 51 static int read_mrecords(int fd, char *buf, u_int size, 52 hammer_ioc_mrecord_head_t pickup); 53 static int generate_histogram(int fd, const char *filesystem, 54 histogram_t *histogram_ary, 55 struct hammer_ioc_mirror_rw *mirror_base, 56 int *repeatp); 57 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 58 hammer_ioc_mrecord_head_t pickup); 59 static void write_mrecord(int fdout, uint32_t type, 60 hammer_ioc_mrecord_any_t mrec, int bytes); 61 static void generate_mrec_header(int fd, int pfs_id, 62 union hammer_ioc_mrecord_any *mrec_tmp); 63 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 64 struct hammer_ioc_mrecord_head *pickup, 65 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 66 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 67 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 68 uint64_t *bwcount, struct timeval *tv1); 69 static int getyntty(void); 70 static void score_printf(size_t i, size_t w, const char *ctl, ...); 71 static void hammer_check_restrict(const char *filesystem); 72 static void mirror_usage(int code); 73 74 /* 75 * Generate a mirroring data stream from the specific source over the 76 * entire key range, but restricted to the specified transaction range. 77 * 78 * The HAMMER VFS does most of the work, we add a few new mrecord 79 * types to negotiate the TID ranges and verify that the entire 80 * stream made it to the destination. 81 * 82 * streaming will be 0 for mirror-read, 1 for mirror-stream. The code will 83 * set up a fake value of -1 when running the histogram for mirror-read. 84 */ 85 void 86 hammer_cmd_mirror_read(char **av, int ac, int streaming) 87 { 88 struct hammer_ioc_mirror_rw mirror; 89 struct hammer_ioc_pseudofs_rw pfs; 90 union hammer_ioc_mrecord_any mrec_tmp; 91 struct hammer_ioc_mrecord_head pickup; 92 hammer_ioc_mrecord_any_t mrec; 93 hammer_tid_t sync_tid; 94 histogram_t histogram_ary; 95 char *filesystem; 96 char *buf = malloc(SERIALBUF_SIZE); 97 int interrupted = 0; 98 int error; 99 int fd; 100 int n; 101 int didwork; 102 int histogram; 103 int histindex; 104 int histmax; 105 int repeat = 0; 106 int sameline; 107 int64_t total_bytes; 108 time_t base_t = time(NULL); 109 struct timeval bwtv; 110 uint64_t bwcount; 111 uint64_t estbytes; 112 113 if (ac == 0 || ac > 2) 114 mirror_usage(1); 115 filesystem = av[0]; 116 hammer_check_restrict(filesystem); 117 118 pickup.signature = 0; 119 pickup.type = 0; 120 histogram = 0; 121 histindex = 0; 122 histmax = 0; 123 histogram_ary = NULL; 124 sameline = 0; 125 126 again: 127 bzero(&mirror, sizeof(mirror)); 128 hammer_key_beg_init(&mirror.key_beg); 129 hammer_key_end_init(&mirror.key_end); 130 131 fd = getpfs(&pfs, filesystem); 132 133 if (streaming >= 0) 134 score_printf(LINE1, "Running"); 135 136 if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) { 137 fprintf(stderr, "%cRunning \b\b", (sameline ? '\r' : '\n')); 138 fflush(stderr); 139 sameline = 1; 140 } 141 sameline = 1; 142 total_bytes = 0; 143 gettimeofday(&bwtv, NULL); 144 bwcount = 0; 145 146 /* 147 * Send initial header for the purpose of determining the 148 * shared-uuid. 149 */ 150 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 151 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 152 &mrec_tmp, sizeof(mrec_tmp.pfs)); 153 154 /* 155 * In 2-way mode the target will send us a PFS info packet 156 * first. Use the target's current snapshot TID as our default 157 * begin TID. 158 */ 159 if (TwoWayPipeOpt) { 160 mirror.tid_beg = 0; 161 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 162 NULL, &mirror.tid_beg); 163 if (n < 0) { /* got TERM record */ 164 relpfs(fd, &pfs); 165 free(buf); 166 free(histogram_ary); 167 return; 168 } 169 ++mirror.tid_beg; 170 } else if (streaming && histogram) { 171 mirror.tid_beg = histogram_ary[histindex].tid + 1; 172 } else { 173 mirror.tid_beg = 0; 174 } 175 176 /* 177 * Write out the PFS header, tid_beg will be updated if our PFS 178 * has a larger begin sync. tid_end is set to the latest source 179 * TID whos flush cycle has completed. 180 */ 181 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 182 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 183 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 184 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 185 mirror.ubuf = buf; 186 mirror.size = SERIALBUF_SIZE; 187 mirror.pfs_id = pfs.pfs_id; 188 mirror.shared_uuid = pfs.ondisk->shared_uuid; 189 190 /* 191 * XXX If the histogram is exhausted and the TID delta is large 192 * the stream might have been offline for a while and is 193 * now picking it up again. Do another histogram. 194 */ 195 #if 0 196 if (streaming && histogram && histindex == histend) { 197 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM) 198 histogram = 0; 199 } 200 #endif 201 202 /* 203 * Initial bulk startup control, try to do some incremental 204 * mirroring in order to allow the stream to be killed and 205 * restarted without having to start over. 206 */ 207 if (histogram == 0 && BulkOpt == 0) { 208 if (VerboseOpt && repeat == 0) { 209 fprintf(stderr, "\n"); 210 sameline = 0; 211 } 212 histmax = generate_histogram(fd, filesystem, 213 &histogram_ary, &mirror, 214 &repeat); 215 histindex = 0; 216 histogram = 1; 217 218 /* 219 * Just stream the histogram, then stop 220 */ 221 if (streaming == 0) 222 streaming = -1; 223 } 224 225 if (streaming && histogram) { 226 ++histindex; 227 mirror.tid_end = histogram_ary[histindex].tid; 228 estbytes = histogram_ary[histindex-1].bytes; 229 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end; 230 } else { 231 estbytes = 0; 232 } 233 234 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 235 &mrec_tmp, sizeof(mrec_tmp.pfs)); 236 237 /* 238 * A cycle file overrides the beginning TID only if we are 239 * not operating in two-way or histogram mode. 240 */ 241 if (TwoWayPipeOpt == 0 && histogram == 0) { 242 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 243 } 244 245 /* 246 * An additional argument overrides the beginning TID regardless 247 * of what mode we are in. This is not recommending if operating 248 * in two-way mode. 249 */ 250 if (ac == 2) 251 mirror.tid_beg = strtoull(av[1], NULL, 0); 252 253 if (streaming == 0 || VerboseOpt >= 2) { 254 fprintf(stderr, 255 "Mirror-read: Mirror %016jx to %016jx", 256 (uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end); 257 if (histogram) 258 fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes); 259 fprintf(stderr, "\n"); 260 fflush(stderr); 261 } 262 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 263 fprintf(stderr, "Mirror-read: Resuming at object %016jx\n", 264 (uintmax_t)mirror.key_beg.obj_id); 265 } 266 267 /* 268 * Nothing to do if begin equals end. 269 */ 270 if (mirror.tid_beg >= mirror.tid_end) { 271 if (streaming == 0 || VerboseOpt >= 2) 272 fprintf(stderr, "Mirror-read: No work to do\n"); 273 sleep(DelayOpt); 274 didwork = 0; 275 histogram = 0; 276 goto done; 277 } 278 didwork = 1; 279 280 /* 281 * Write out bulk records 282 */ 283 mirror.ubuf = buf; 284 mirror.size = SERIALBUF_SIZE; 285 286 do { 287 mirror.count = 0; 288 mirror.pfs_id = pfs.pfs_id; 289 mirror.shared_uuid = pfs.ondisk->shared_uuid; 290 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 291 score_printf(LINE3, "Mirror-read %s failed: %s", 292 filesystem, strerror(errno)); 293 fprintf(stderr, "Mirror-read %s failed: %s\n", 294 filesystem, strerror(errno)); 295 exit(1); 296 } 297 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 298 score_printf(LINE3, "Mirror-read %s fatal error %d", 299 filesystem, mirror.head.error); 300 fprintf(stderr, 301 "Mirror-read %s fatal error %d\n", 302 filesystem, mirror.head.error); 303 exit(1); 304 } 305 if (mirror.count) { 306 if (BandwidthOpt) { 307 n = writebw(1, mirror.ubuf, mirror.count, 308 &bwcount, &bwtv); 309 } else { 310 n = write(1, mirror.ubuf, mirror.count); 311 } 312 if (n != mirror.count) { 313 score_printf(LINE3, 314 "Mirror-read %s failed: " 315 "short write", 316 filesystem); 317 fprintf(stderr, 318 "Mirror-read %s failed: " 319 "short write\n", 320 filesystem); 321 exit(1); 322 } 323 } 324 total_bytes += mirror.count; 325 if (streaming && VerboseOpt) { 326 fprintf(stderr, 327 "\rscan obj=%016jx tids=%016jx:%016jx %11jd", 328 (uintmax_t)mirror.key_cur.obj_id, 329 (uintmax_t)mirror.tid_beg, 330 (uintmax_t)mirror.tid_end, 331 (intmax_t)total_bytes); 332 fflush(stderr); 333 sameline = 0; 334 } else if (streaming) { 335 score_printf(LINE2, 336 "obj=%016jx tids=%016jx:%016jx %11jd", 337 (uintmax_t)mirror.key_cur.obj_id, 338 (uintmax_t)mirror.tid_beg, 339 (uintmax_t)mirror.tid_end, 340 (intmax_t)total_bytes); 341 } 342 mirror.key_beg = mirror.key_cur; 343 344 /* 345 * Deal with time limit option 346 */ 347 if (TimeoutOpt && 348 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 349 score_printf(LINE3, 350 "Mirror-read %s interrupted by timer at" 351 " %016jx", 352 filesystem, 353 (uintmax_t)mirror.key_cur.obj_id); 354 fprintf(stderr, 355 "Mirror-read %s interrupted by timer at" 356 " %016jx\n", 357 filesystem, 358 (uintmax_t)mirror.key_cur.obj_id); 359 interrupted = 1; 360 break; 361 } 362 } while (mirror.count != 0); 363 364 done: 365 if (streaming && VerboseOpt && sameline == 0) { 366 fprintf(stderr, "\n"); 367 fflush(stderr); 368 sameline = 1; 369 } 370 371 /* 372 * Write out the termination sync record - only if not interrupted 373 */ 374 if (interrupted == 0) { 375 if (didwork) { 376 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 377 &mrec_tmp, sizeof(mrec_tmp.sync)); 378 } else { 379 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 380 &mrec_tmp, sizeof(mrec_tmp.sync)); 381 } 382 } 383 384 /* 385 * If the -2 option was given (automatic when doing mirror-copy), 386 * a two-way pipe is assumed and we expect a response mrec from 387 * the target. 388 */ 389 if (TwoWayPipeOpt) { 390 mrec = read_mrecord(0, &error, &pickup); 391 if (mrec == NULL || 392 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 393 mrec->head.rec_size != sizeof(mrec->update)) { 394 fprintf(stderr, "mirror_read: Did not get final " 395 "acknowledgement packet from target\n"); 396 exit(1); 397 } 398 if (interrupted) { 399 if (CyclePath) { 400 hammer_set_cycle(&mirror.key_cur, 401 mirror.tid_beg); 402 fprintf(stderr, "Cyclefile %s updated for " 403 "continuation\n", CyclePath); 404 } 405 } else { 406 sync_tid = mrec->update.tid; 407 if (CyclePath) { 408 hammer_key_beg_init(&mirror.key_beg); 409 hammer_set_cycle(&mirror.key_beg, sync_tid); 410 fprintf(stderr, 411 "Cyclefile %s updated to 0x%016jx\n", 412 CyclePath, (uintmax_t)sync_tid); 413 } 414 } 415 free(mrec); 416 } else if (CyclePath) { 417 /* NOTE! mirror.tid_beg cannot be updated */ 418 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 419 "fully updated unless you use mirror-copy\n"); 420 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 421 } 422 if (streaming && interrupted == 0) { 423 time_t t1 = time(NULL); 424 time_t t2; 425 426 /* 427 * Try to break down large bulk transfers into smaller ones 428 * so it can sync the transaction id on the slave. This 429 * way if we get interrupted a restart doesn't have to 430 * start from scratch. 431 */ 432 if (streaming && histogram) { 433 if (histindex != histmax) { 434 if (VerboseOpt && VerboseOpt < 2 && 435 streaming >= 0) { 436 fprintf(stderr, " (bulk incremental)"); 437 } 438 relpfs(fd, &pfs); 439 goto again; 440 } 441 } 442 443 if (VerboseOpt && streaming >= 0) { 444 fprintf(stderr, " W"); 445 fflush(stderr); 446 } else if (streaming >= 0) { 447 score_printf(LINE1, "Waiting"); 448 } 449 pfs.ondisk->sync_end_tid = mirror.tid_end; 450 if (streaming < 0) { 451 /* 452 * Fake streaming mode when using a histogram to 453 * break up a mirror-read, do not wait on source. 454 */ 455 streaming = 0; 456 } else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 457 score_printf(LINE3, 458 "Mirror-read %s: cannot stream: %s\n", 459 filesystem, strerror(errno)); 460 fprintf(stderr, 461 "Mirror-read %s: cannot stream: %s\n", 462 filesystem, strerror(errno)); 463 } else { 464 t2 = time(NULL) - t1; 465 if (t2 >= 0 && t2 < DelayOpt) { 466 if (VerboseOpt) { 467 fprintf(stderr, "\bD"); 468 fflush(stderr); 469 } 470 sleep(DelayOpt - t2); 471 } 472 if (VerboseOpt) { 473 fprintf(stderr, "\b "); 474 fflush(stderr); 475 } 476 relpfs(fd, &pfs); 477 goto again; 478 } 479 } 480 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 481 &mrec_tmp, sizeof(mrec_tmp.sync)); 482 relpfs(fd, &pfs); 483 free(buf); 484 free(histogram_ary); 485 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 486 } 487 488 /* 489 * What we are trying to do here is figure out how much data is 490 * going to be sent for the TID range and to break the TID range 491 * down into reasonably-sized slices (from the point of view of 492 * data sent) so a lost connection can restart at a reasonable 493 * place and not all the way back at the beginning. 494 * 495 * An entry's TID serves as the end_tid for the prior entry 496 * So we have to offset the calculation by 1 so that TID falls into 497 * the previous entry when populating entries. 498 * 499 * Because the transaction id space is bursty we need a relatively 500 * large number of buckets (like a million) to do a reasonable job 501 * for things like an initial bulk mirrors on a very large filesystem. 502 */ 503 #define HIST_COUNT (1024 * 1024) 504 505 static int 506 generate_histogram(int fd, const char *filesystem, 507 histogram_t *histogram_ary, 508 struct hammer_ioc_mirror_rw *mirror_base, 509 int *repeatp) 510 { 511 struct hammer_ioc_mirror_rw mirror; 512 union hammer_ioc_mrecord_any *mrec; 513 hammer_tid_t tid_beg; 514 hammer_tid_t tid_end; 515 hammer_tid_t tid; 516 hammer_tid_t tidx; 517 uint64_t *tid_bytes; 518 uint64_t total; 519 uint64_t accum; 520 int chunkno; 521 int i; 522 int res; 523 int off; 524 int len; 525 526 mirror = *mirror_base; 527 tid_beg = mirror.tid_beg; 528 tid_end = mirror.tid_end; 529 mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA; 530 531 if (*histogram_ary == NULL) { 532 *histogram_ary = malloc(sizeof(struct histogram) * 533 (HIST_COUNT + 2)); 534 } 535 if (tid_beg >= tid_end) 536 return(0); 537 538 /* needs 2 extra */ 539 tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2)); 540 bzero(tid_bytes, sizeof(*tid_bytes) * (HIST_COUNT + 2)); 541 542 if (*repeatp == 0) { 543 fprintf(stderr, "Prescan to break up bulk transfer"); 544 if (VerboseOpt > 1) 545 fprintf(stderr, " (%juMB chunks)", 546 (uintmax_t)(SplitupOpt / (1024 * 1024))); 547 fprintf(stderr, "\n"); 548 } 549 550 /* 551 * Note: (tid_beg,tid_end), range is inclusive of both beg & end. 552 * 553 * Note: Estimates can be off when the mirror is way behind due 554 * to skips. 555 */ 556 total = 0; 557 accum = 0; 558 chunkno = 0; 559 for (;;) { 560 mirror.count = 0; 561 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 562 fprintf(stderr, "Mirror-read %s failed: %s\n", 563 filesystem, strerror(errno)); 564 exit(1); 565 } 566 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 567 fprintf(stderr, 568 "Mirror-read %s fatal error %d\n", 569 filesystem, mirror.head.error); 570 exit(1); 571 } 572 for (off = 0; 573 off < mirror.count; 574 off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size)) { 575 mrec = (void *)((char *)mirror.ubuf + off); 576 577 /* 578 * We only care about general RECs and PASS 579 * records. We ignore SKIPs. 580 */ 581 switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) { 582 case HAMMER_MREC_TYPE_REC: 583 case HAMMER_MREC_TYPE_PASS: 584 break; 585 default: 586 continue; 587 } 588 589 /* 590 * Calculate for two indices, create_tid and 591 * delete_tid. Record data only applies to 592 * the create_tid. 593 * 594 * When tid is exactly on the boundary it really 595 * belongs to the previous entry because scans 596 * are inclusive of the ending entry. 597 */ 598 tid = mrec->rec.leaf.base.delete_tid; 599 if (tid && tid >= tid_beg && tid <= tid_end) { 600 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 601 if (mrec->head.type == 602 HAMMER_MREC_TYPE_REC) { 603 len -= HAMMER_HEAD_DOALIGN( 604 mrec->rec.leaf.data_len); 605 assert(len > 0); 606 } 607 i = (tid - tid_beg) * HIST_COUNT / 608 (tid_end - tid_beg); 609 tidx = tid_beg + i * (tid_end - tid_beg) / 610 HIST_COUNT; 611 if (tid == tidx && i) 612 --i; 613 assert(i >= 0 && i < HIST_COUNT); 614 tid_bytes[i] += len; 615 total += len; 616 accum += len; 617 } 618 619 tid = mrec->rec.leaf.base.create_tid; 620 if (tid && tid >= tid_beg && tid <= tid_end) { 621 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 622 if (mrec->head.type == 623 HAMMER_MREC_TYPE_REC_NODATA) { 624 len += HAMMER_HEAD_DOALIGN( 625 mrec->rec.leaf.data_len); 626 } 627 i = (tid - tid_beg) * HIST_COUNT / 628 (tid_end - tid_beg); 629 tidx = tid_beg + i * (tid_end - tid_beg) / 630 HIST_COUNT; 631 if (tid == tidx && i) 632 --i; 633 assert(i >= 0 && i < HIST_COUNT); 634 tid_bytes[i] += len; 635 total += len; 636 accum += len; 637 } 638 } 639 if (*repeatp == 0 && accum > SplitupOpt) { 640 if (VerboseOpt > 1) { 641 fprintf(stderr, "."); 642 fflush(stderr); 643 } 644 ++chunkno; 645 score_printf(LINE2, "Prescan chunk %d", chunkno); 646 accum = 0; 647 } 648 if (mirror.count == 0) 649 break; 650 mirror.key_beg = mirror.key_cur; 651 } 652 653 /* 654 * Reduce to SplitupOpt (default 4GB) chunks. This code may 655 * use up to two additional elements. Do the array in-place. 656 * 657 * Inefficient degenerate cases can occur if we do not accumulate 658 * at least the requested split amount, so error on the side of 659 * going over a bit. 660 */ 661 res = 0; 662 (*histogram_ary)[res].tid = tid_beg; 663 (*histogram_ary)[res].bytes = tid_bytes[0]; 664 for (i = 1; i < HIST_COUNT; ++i) { 665 if ((*histogram_ary)[res].bytes >= SplitupOpt) { 666 ++res; 667 (*histogram_ary)[res].tid = tid_beg + 668 i * (tid_end - tid_beg) / 669 HIST_COUNT; 670 (*histogram_ary)[res].bytes = 0; 671 672 } 673 (*histogram_ary)[res].bytes += tid_bytes[i]; 674 } 675 ++res; 676 (*histogram_ary)[res].tid = tid_end; 677 (*histogram_ary)[res].bytes = -1; 678 679 if (*repeatp == 0) { 680 if (VerboseOpt > 1) 681 fprintf(stderr, "\n"); /* newline after ... */ 682 score_printf(LINE3, "Prescan %d chunks, total %ju MBytes", 683 res, (uintmax_t)total / (1024 * 1024)); 684 fprintf(stderr, "Prescan %d chunks, total %ju MBytes (", 685 res, (uintmax_t)total / (1024 * 1024)); 686 for (i = 0; i < res && i < 3; ++i) { 687 if (i) 688 fprintf(stderr, ", "); 689 fprintf(stderr, "%ju", 690 (uintmax_t)(*histogram_ary)[i].bytes); 691 } 692 if (i < res) 693 fprintf(stderr, ", ..."); 694 fprintf(stderr, ")\n"); 695 } 696 assert(res <= HIST_COUNT); 697 *repeatp = 1; 698 699 free(tid_bytes); 700 return(res); 701 } 702 703 static void 704 create_pfs(const char *filesystem, uuid_t *s_uuid) 705 { 706 if (ForceYesOpt == 1) { 707 fprintf(stderr, "PFS slave %s does not exist. " 708 "Auto create new slave PFS!\n", filesystem); 709 710 } else { 711 fprintf(stderr, "PFS slave %s does not exist.\n" 712 "Do you want to create a new slave PFS? (yes|no) ", 713 filesystem); 714 fflush(stderr); 715 if (getyntty() != 1) { 716 fprintf(stderr, "Aborting operation\n"); 717 exit(1); 718 } 719 } 720 721 uint32_t status; 722 char *shared_uuid = NULL; 723 uuid_to_string(s_uuid, &shared_uuid, &status); 724 725 char *cmd = NULL; 726 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 727 filesystem, shared_uuid); 728 free(shared_uuid); 729 730 if (cmd == NULL) { 731 fprintf(stderr, "Failed to alloc memory\n"); 732 exit(1); 733 } 734 if (system(cmd) != 0) { 735 fprintf(stderr, "Failed to create PFS\n"); 736 } 737 free(cmd); 738 } 739 740 /* 741 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 742 * some additional packet types to negotiate TID ranges and to verify 743 * completion. The HAMMER VFS does most of the work. 744 * 745 * It is important to note that the mirror.key_{beg,end} range must 746 * match the ranged used by the original. For now both sides use 747 * range the entire key space. 748 * 749 * It is even more important that the records in the stream conform 750 * to the TID range also supplied in the stream. The HAMMER VFS will 751 * use the REC, PASS, and SKIP record types to track the portions of 752 * the B-Tree being scanned in order to be able to proactively delete 753 * records on the target within those active areas that are not mentioned 754 * by the source. 755 * 756 * The mirror.key_cur field is used by the VFS to do this tracking. It 757 * must be initialized to key_beg but then is persistently updated by 758 * the HAMMER VFS on each successive ioctl() call. If you blow up this 759 * field you will blow up the mirror target, possibly to the point of 760 * deleting everything. As a safety measure the HAMMER VFS simply marks 761 * the records that the source has destroyed as deleted on the target, 762 * and normal pruning operations will deal with their final disposition 763 * at some later time. 764 */ 765 void 766 hammer_cmd_mirror_write(char **av, int ac) 767 { 768 struct hammer_ioc_mirror_rw mirror; 769 char *filesystem; 770 char *buf = malloc(SERIALBUF_SIZE); 771 struct hammer_ioc_pseudofs_rw pfs; 772 struct hammer_ioc_mrecord_head pickup; 773 struct hammer_ioc_synctid synctid; 774 union hammer_ioc_mrecord_any mrec_tmp; 775 hammer_ioc_mrecord_any_t mrec; 776 struct stat st; 777 int error; 778 int fd; 779 int n; 780 781 if (ac != 1) 782 mirror_usage(1); 783 filesystem = av[0]; 784 hammer_check_restrict(filesystem); 785 786 pickup.signature = 0; 787 pickup.type = 0; 788 789 again: 790 bzero(&mirror, sizeof(mirror)); 791 hammer_key_beg_init(&mirror.key_beg); 792 hammer_key_end_init(&mirror.key_end); 793 mirror.key_end = mirror.key_beg; 794 795 /* 796 * Read initial packet 797 */ 798 mrec = read_mrecord(0, &error, &pickup); 799 if (mrec == NULL) { 800 if (error == 0) 801 fprintf(stderr, "validate_mrec_header: short read\n"); 802 exit(1); 803 } 804 /* 805 * Validate packet 806 */ 807 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 808 free(buf); 809 return; 810 } 811 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 812 fprintf(stderr, "validate_mrec_header: did not get expected " 813 "PFSD record type\n"); 814 exit(1); 815 } 816 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 817 fprintf(stderr, "validate_mrec_header: unexpected payload " 818 "size\n"); 819 exit(1); 820 } 821 822 /* 823 * Create slave PFS if it doesn't yet exist 824 */ 825 if (lstat(filesystem, &st) != 0) { 826 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 827 } 828 free(mrec); 829 mrec = NULL; 830 831 fd = getpfs(&pfs, filesystem); 832 833 /* 834 * In two-way mode the target writes out a PFS packet first. 835 * The source uses our tid_end as its tid_beg by default, 836 * picking up where it left off. 837 */ 838 mirror.tid_beg = 0; 839 if (TwoWayPipeOpt) { 840 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 841 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 842 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 843 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 844 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 845 &mrec_tmp, sizeof(mrec_tmp.pfs)); 846 } 847 848 /* 849 * Read and process the PFS header. The source informs us of 850 * the TID range the stream represents. 851 */ 852 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 853 &mirror.tid_beg, &mirror.tid_end); 854 if (n < 0) { /* got TERM record */ 855 relpfs(fd, &pfs); 856 free(buf); 857 return; 858 } 859 860 mirror.ubuf = buf; 861 mirror.size = SERIALBUF_SIZE; 862 863 /* 864 * Read and process bulk records (REC, PASS, and SKIP types). 865 * 866 * On your life, do NOT mess with mirror.key_cur or your mirror 867 * target may become history. 868 */ 869 for (;;) { 870 mirror.count = 0; 871 mirror.pfs_id = pfs.pfs_id; 872 mirror.shared_uuid = pfs.ondisk->shared_uuid; 873 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 874 if (mirror.size <= 0) 875 break; 876 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 877 fprintf(stderr, "Mirror-write %s failed: %s\n", 878 filesystem, strerror(errno)); 879 exit(1); 880 } 881 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 882 fprintf(stderr, 883 "Mirror-write %s fatal error %d\n", 884 filesystem, mirror.head.error); 885 exit(1); 886 } 887 #if 0 888 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 889 fprintf(stderr, 890 "Mirror-write %s interrupted by timer at" 891 " %016llx\n", 892 filesystem, 893 mirror.key_cur.obj_id); 894 exit(0); 895 } 896 #endif 897 } 898 899 /* 900 * Read and process the termination sync record. 901 */ 902 mrec = read_mrecord(0, &error, &pickup); 903 904 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 905 fprintf(stderr, "Mirror-write: received termination request\n"); 906 relpfs(fd, &pfs); 907 free(mrec); 908 free(buf); 909 return; 910 } 911 912 if (mrec == NULL || 913 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 914 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 915 mrec->head.rec_size != sizeof(mrec->sync)) { 916 fprintf(stderr, "Mirror-write %s: Did not get termination " 917 "sync record, or rec_size is wrong rt=%d\n", 918 filesystem, 919 (mrec ? (int)mrec->head.type : -1)); 920 exit(1); 921 } 922 923 /* 924 * Update the PFS info on the target so the user has visibility 925 * into the new snapshot, and sync the target filesystem. 926 */ 927 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 928 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 929 930 bzero(&synctid, sizeof(synctid)); 931 synctid.op = HAMMER_SYNCTID_SYNC2; 932 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 933 934 if (VerboseOpt >= 2) { 935 fprintf(stderr, "Mirror-write %s: succeeded\n", 936 filesystem); 937 } 938 } 939 940 free(mrec); 941 mrec = NULL; 942 943 /* 944 * Report back to the originator. 945 */ 946 if (TwoWayPipeOpt) { 947 mrec_tmp.update.tid = mirror.tid_end; 948 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 949 &mrec_tmp, sizeof(mrec_tmp.update)); 950 } else { 951 printf("Source can update synctid to 0x%016jx\n", 952 (uintmax_t)mirror.tid_end); 953 } 954 relpfs(fd, &pfs); 955 goto again; 956 } 957 958 void 959 hammer_cmd_mirror_dump(char **av, int ac) 960 { 961 char *buf = malloc(SERIALBUF_SIZE); 962 struct hammer_ioc_mrecord_head pickup; 963 hammer_ioc_mrecord_any_t mrec; 964 int error; 965 int size; 966 int offset; 967 int bytes; 968 int header_only = 0; 969 970 if (ac == 1 && strcmp(*av, "header") == 0) 971 header_only = 1; 972 else if (ac != 0) 973 mirror_usage(1); 974 975 /* 976 * Read and process the PFS header 977 */ 978 pickup.signature = 0; 979 pickup.type = 0; 980 981 mrec = read_mrecord(0, &error, &pickup); 982 983 /* 984 * Dump the PFS header. mirror-dump takes its input from the output 985 * of a mirror-read so getpfs() can't be used to get a fd to be passed 986 * to dump_pfsd(). 987 */ 988 if (header_only && mrec != NULL) { 989 dump_pfsd(&mrec->pfs.pfsd, -1); 990 free(mrec); 991 free(buf); 992 return; 993 } 994 free(mrec); 995 996 again: 997 /* 998 * Read and process bulk records 999 */ 1000 for (;;) { 1001 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 1002 if (size <= 0) 1003 break; 1004 offset = 0; 1005 while (offset < size) { 1006 mrec = (void *)((char *)buf + offset); 1007 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 1008 if (offset + bytes > size) { 1009 fprintf(stderr, "Misaligned record\n"); 1010 exit(1); 1011 } 1012 1013 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 1014 case HAMMER_MREC_TYPE_REC_BADCRC: 1015 case HAMMER_MREC_TYPE_REC: 1016 printf("Record lo=%08x obj=%016jx key=%016jx " 1017 "rt=%02x ot=%02x", 1018 mrec->rec.leaf.base.localization, 1019 (uintmax_t)mrec->rec.leaf.base.obj_id, 1020 (uintmax_t)mrec->rec.leaf.base.key, 1021 mrec->rec.leaf.base.rec_type, 1022 mrec->rec.leaf.base.obj_type); 1023 if (mrec->head.type == 1024 HAMMER_MREC_TYPE_REC_BADCRC) { 1025 printf(" (BAD CRC)"); 1026 } 1027 printf("\n"); 1028 printf(" tids %016jx:%016jx data=%d\n", 1029 (uintmax_t)mrec->rec.leaf.base.create_tid, 1030 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1031 mrec->rec.leaf.data_len); 1032 break; 1033 case HAMMER_MREC_TYPE_PASS: 1034 printf("Pass lo=%08x obj=%016jx key=%016jx " 1035 "rt=%02x ot=%02x\n", 1036 mrec->rec.leaf.base.localization, 1037 (uintmax_t)mrec->rec.leaf.base.obj_id, 1038 (uintmax_t)mrec->rec.leaf.base.key, 1039 mrec->rec.leaf.base.rec_type, 1040 mrec->rec.leaf.base.obj_type); 1041 printf(" tids %016jx:%016jx data=%d\n", 1042 (uintmax_t)mrec->rec.leaf.base.create_tid, 1043 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1044 mrec->rec.leaf.data_len); 1045 break; 1046 case HAMMER_MREC_TYPE_SKIP: 1047 printf("Skip lo=%08x obj=%016jx key=%016jx rt=%02x to\n" 1048 " lo=%08x obj=%016jx key=%016jx rt=%02x\n", 1049 mrec->skip.skip_beg.localization, 1050 (uintmax_t)mrec->skip.skip_beg.obj_id, 1051 (uintmax_t)mrec->skip.skip_beg.key, 1052 mrec->skip.skip_beg.rec_type, 1053 mrec->skip.skip_end.localization, 1054 (uintmax_t)mrec->skip.skip_end.obj_id, 1055 (uintmax_t)mrec->skip.skip_end.key, 1056 mrec->skip.skip_end.rec_type); 1057 default: 1058 break; 1059 } 1060 offset += bytes; 1061 } 1062 } 1063 1064 /* 1065 * Read and process the termination sync record. 1066 */ 1067 mrec = read_mrecord(0, &error, &pickup); 1068 if (mrec == NULL || 1069 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 1070 mrec->head.type != HAMMER_MREC_TYPE_IDLE)) { 1071 fprintf(stderr, "Mirror-dump: Did not get termination " 1072 "sync record\n"); 1073 } 1074 free(mrec); 1075 1076 /* 1077 * Continue with more batches until EOF. 1078 */ 1079 mrec = read_mrecord(0, &error, &pickup); 1080 if (mrec) { 1081 free(mrec); 1082 goto again; 1083 } 1084 free(buf); 1085 } 1086 1087 void 1088 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 1089 { 1090 pid_t pid1; 1091 pid_t pid2; 1092 int fds[2]; 1093 const char *xav[32]; 1094 char tbuf[16]; 1095 char *sh, *user, *host, *rfs; 1096 int xac; 1097 1098 if (ac != 2) 1099 mirror_usage(1); 1100 1101 TwoWayPipeOpt = 1; 1102 signal(SIGPIPE, SIG_IGN); 1103 1104 again: 1105 if (pipe(fds) < 0) { 1106 perror("pipe"); 1107 exit(1); 1108 } 1109 1110 /* 1111 * Source 1112 */ 1113 if ((pid1 = fork()) == 0) { 1114 signal(SIGPIPE, SIG_DFL); 1115 dup2(fds[0], 0); 1116 dup2(fds[0], 1); 1117 close(fds[0]); 1118 close(fds[1]); 1119 if ((rfs = strchr(av[0], ':')) != NULL) { 1120 xac = 0; 1121 1122 if((sh = getenv("HAMMER_RSH")) == NULL) 1123 xav[xac++] = "ssh"; 1124 else 1125 xav[xac++] = sh; 1126 1127 if (CompressOpt) 1128 xav[xac++] = "-C"; 1129 1130 if ((host = strchr(av[0], '@')) != NULL) { 1131 user = strndup( av[0], (host++ - av[0])); 1132 host = strndup( host, (rfs++ - host)); 1133 xav[xac++] = "-l"; 1134 xav[xac++] = user; 1135 xav[xac++] = host; 1136 } 1137 else { 1138 host = strndup( av[0], (rfs++ - av[0])); 1139 user = NULL; 1140 xav[xac++] = host; 1141 } 1142 1143 1144 if (SshPort) { 1145 xav[xac++] = "-p"; 1146 xav[xac++] = SshPort; 1147 } 1148 1149 xav[xac++] = "hammer"; 1150 1151 switch(VerboseOpt) { 1152 case 0: 1153 break; 1154 case 1: 1155 xav[xac++] = "-v"; 1156 break; 1157 case 2: 1158 xav[xac++] = "-vv"; 1159 break; 1160 default: 1161 xav[xac++] = "-vvv"; 1162 break; 1163 } 1164 if (ForceYesOpt) { 1165 xav[xac++] = "-y"; 1166 } 1167 xav[xac++] = "-2"; 1168 if (TimeoutOpt) { 1169 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 1170 xav[xac++] = "-t"; 1171 xav[xac++] = tbuf; 1172 } 1173 if (SplitupOptStr) { 1174 xav[xac++] = "-S"; 1175 xav[xac++] = SplitupOptStr; 1176 } 1177 if (streaming) 1178 xav[xac++] = "mirror-read-stream"; 1179 else 1180 xav[xac++] = "mirror-read"; 1181 xav[xac++] = rfs; 1182 xav[xac++] = NULL; 1183 execvp(*xav, (void *)xav); 1184 } else { 1185 hammer_cmd_mirror_read(av, 1, streaming); 1186 fflush(stdout); 1187 fflush(stderr); 1188 } 1189 _exit(1); 1190 } 1191 1192 /* 1193 * Target 1194 */ 1195 if ((pid2 = fork()) == 0) { 1196 signal(SIGPIPE, SIG_DFL); 1197 dup2(fds[1], 0); 1198 dup2(fds[1], 1); 1199 close(fds[0]); 1200 close(fds[1]); 1201 if ((rfs = strchr(av[1], ':')) != NULL) { 1202 xac = 0; 1203 1204 if((sh = getenv("HAMMER_RSH")) == NULL) 1205 xav[xac++] = "ssh"; 1206 else 1207 xav[xac++] = sh; 1208 1209 if (CompressOpt) 1210 xav[xac++] = "-C"; 1211 1212 if ((host = strchr(av[1], '@')) != NULL) { 1213 user = strndup( av[1], (host++ - av[1])); 1214 host = strndup( host, (rfs++ - host)); 1215 xav[xac++] = "-l"; 1216 xav[xac++] = user; 1217 xav[xac++] = host; 1218 } 1219 else { 1220 host = strndup( av[1], (rfs++ - av[1])); 1221 user = NULL; 1222 xav[xac++] = host; 1223 } 1224 1225 if (SshPort) { 1226 xav[xac++] = "-p"; 1227 xav[xac++] = SshPort; 1228 } 1229 1230 xav[xac++] = "hammer"; 1231 1232 switch(VerboseOpt) { 1233 case 0: 1234 break; 1235 case 1: 1236 xav[xac++] = "-v"; 1237 break; 1238 case 2: 1239 xav[xac++] = "-vv"; 1240 break; 1241 default: 1242 xav[xac++] = "-vvv"; 1243 break; 1244 } 1245 if (ForceYesOpt) { 1246 xav[xac++] = "-y"; 1247 } 1248 xav[xac++] = "-2"; 1249 xav[xac++] = "mirror-write"; 1250 xav[xac++] = rfs; 1251 xav[xac++] = NULL; 1252 execvp(*xav, (void *)xav); 1253 } else { 1254 hammer_cmd_mirror_write(av + 1, 1); 1255 fflush(stdout); 1256 fflush(stderr); 1257 } 1258 _exit(1); 1259 } 1260 close(fds[0]); 1261 close(fds[1]); 1262 1263 while (waitpid(pid1, NULL, 0) <= 0) 1264 ; 1265 while (waitpid(pid2, NULL, 0) <= 0) 1266 ; 1267 1268 /* 1269 * If the link is lost restart 1270 */ 1271 if (streaming) { 1272 if (VerboseOpt) { 1273 fprintf(stderr, "\nLost Link\n"); 1274 fflush(stderr); 1275 } 1276 sleep(15 + DelayOpt); 1277 goto again; 1278 } 1279 1280 } 1281 1282 /* 1283 * Read and return multiple mrecords 1284 */ 1285 static int 1286 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 1287 { 1288 hammer_ioc_mrecord_any_t mrec; 1289 u_int count; 1290 size_t n; 1291 size_t i; 1292 size_t bytes; 1293 int type; 1294 1295 count = 0; 1296 while (size - count >= HAMMER_MREC_HEADSIZE) { 1297 /* 1298 * Cached the record header in case we run out of buffer 1299 * space. 1300 */ 1301 fflush(stdout); 1302 if (pickup->signature == 0) { 1303 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1304 i = read(fd, (char *)pickup + n, 1305 HAMMER_MREC_HEADSIZE - n); 1306 if (i <= 0) 1307 break; 1308 } 1309 if (n == 0) 1310 break; 1311 if (n != HAMMER_MREC_HEADSIZE) { 1312 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1313 exit(1); 1314 } 1315 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1316 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1317 "bad signature\n"); 1318 exit(1); 1319 } 1320 } 1321 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 1322 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 1323 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1324 "illegal rec_size\n"); 1325 exit(1); 1326 } 1327 1328 /* 1329 * Stop if we have insufficient space for the record and data. 1330 */ 1331 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 1332 if (size - count < bytes) 1333 break; 1334 1335 /* 1336 * Stop if the record type is not a REC, SKIP, or PASS, 1337 * which are the only types the ioctl supports. Other types 1338 * are used only by the userland protocol. 1339 * 1340 * Ignore all flags. 1341 */ 1342 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 1343 if (type != HAMMER_MREC_TYPE_PFSD && 1344 type != HAMMER_MREC_TYPE_REC && 1345 type != HAMMER_MREC_TYPE_SKIP && 1346 type != HAMMER_MREC_TYPE_PASS) { 1347 break; 1348 } 1349 1350 /* 1351 * Read the remainder and clear the pickup signature. 1352 */ 1353 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 1354 i = read(fd, buf + count + n, bytes - n); 1355 if (i <= 0) 1356 break; 1357 } 1358 if (n != bytes) { 1359 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1360 exit(1); 1361 } 1362 1363 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 1364 pickup->signature = 0; 1365 pickup->type = 0; 1366 mrec = (void *)(buf + count); 1367 1368 /* 1369 * Validate the completed record 1370 */ 1371 if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size)) { 1372 fprintf(stderr, "read_mrecords: malformed record " 1373 "on pipe, bad crc\n"); 1374 exit(1); 1375 } 1376 1377 /* 1378 * If its a B-Tree record validate the data crc. 1379 * 1380 * NOTE: If the VFS passes us an explicitly errorde mrec 1381 * we just pass it through. 1382 */ 1383 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 1384 1385 if (type == HAMMER_MREC_TYPE_REC) { 1386 if (mrec->head.rec_size < 1387 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 1388 fprintf(stderr, 1389 "read_mrecords: malformed record on " 1390 "pipe, illegal element data_len\n"); 1391 exit(1); 1392 } 1393 if (mrec->rec.leaf.data_len && 1394 mrec->rec.leaf.data_offset && 1395 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 1396 fprintf(stderr, 1397 "read_mrecords: data_crc did not " 1398 "match data! obj=%016jx key=%016jx\n", 1399 (uintmax_t)mrec->rec.leaf.base.obj_id, 1400 (uintmax_t)mrec->rec.leaf.base.key); 1401 fprintf(stderr, 1402 "continuing, but there are problems\n"); 1403 } 1404 } 1405 count += bytes; 1406 } 1407 return(count); 1408 } 1409 1410 /* 1411 * Read and return a single mrecord. 1412 */ 1413 static 1414 hammer_ioc_mrecord_any_t 1415 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 1416 { 1417 hammer_ioc_mrecord_any_t mrec; 1418 struct hammer_ioc_mrecord_head mrechd; 1419 size_t bytes; 1420 size_t n; 1421 size_t i; 1422 1423 if (pickup && pickup->type != 0) { 1424 mrechd = *pickup; 1425 pickup->signature = 0; 1426 pickup->type = 0; 1427 n = HAMMER_MREC_HEADSIZE; 1428 } else { 1429 /* 1430 * Read in the PFSD header from the sender. 1431 */ 1432 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1433 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 1434 if (i <= 0) 1435 break; 1436 } 1437 if (n == 0) { 1438 *errorp = 0; /* EOF */ 1439 return(NULL); 1440 } 1441 if (n != HAMMER_MREC_HEADSIZE) { 1442 fprintf(stderr, "short read of mrecord header\n"); 1443 *errorp = EPIPE; 1444 return(NULL); 1445 } 1446 } 1447 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1448 fprintf(stderr, "read_mrecord: bad signature\n"); 1449 *errorp = EINVAL; 1450 return(NULL); 1451 } 1452 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 1453 assert(bytes >= sizeof(mrechd)); 1454 mrec = malloc(bytes); 1455 mrec->head = mrechd; 1456 1457 while (n < bytes) { 1458 i = read(fdin, (char *)mrec + n, bytes - n); 1459 if (i <= 0) 1460 break; 1461 n += i; 1462 } 1463 if (n != bytes) { 1464 fprintf(stderr, "read_mrecord: short read on payload\n"); 1465 *errorp = EPIPE; 1466 return(NULL); 1467 } 1468 if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size)) { 1469 fprintf(stderr, "read_mrecord: bad CRC\n"); 1470 *errorp = EINVAL; 1471 return(NULL); 1472 } 1473 *errorp = 0; 1474 return(mrec); 1475 } 1476 1477 static 1478 void 1479 write_mrecord(int fdout, uint32_t type, hammer_ioc_mrecord_any_t mrec, 1480 int bytes) 1481 { 1482 char zbuf[HAMMER_HEAD_ALIGN]; 1483 int pad; 1484 1485 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 1486 1487 assert(bytes >= (int)sizeof(mrec->head)); 1488 bzero(&mrec->head, sizeof(mrec->head)); 1489 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 1490 mrec->head.type = type; 1491 mrec->head.rec_size = bytes; 1492 hammer_crc_set_mrec_head(&mrec->head, bytes); 1493 if (write(fdout, mrec, bytes) != bytes) { 1494 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1495 errno, strerror(errno)); 1496 exit(1); 1497 } 1498 if (pad) { 1499 bzero(zbuf, pad); 1500 if (write(fdout, zbuf, pad) != pad) { 1501 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1502 errno, strerror(errno)); 1503 exit(1); 1504 } 1505 } 1506 } 1507 1508 /* 1509 * Generate a mirroring header with the pfs information of the 1510 * originating filesytem. 1511 */ 1512 static void 1513 generate_mrec_header(int fd, int pfs_id, 1514 union hammer_ioc_mrecord_any *mrec_tmp) 1515 { 1516 struct hammer_ioc_pseudofs_rw pfs; 1517 1518 bzero(mrec_tmp, sizeof(*mrec_tmp)); 1519 clrpfs(&pfs, &mrec_tmp->pfs.pfsd, pfs_id); 1520 1521 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1522 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 1523 exit(1); 1524 } 1525 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1526 fprintf(stderr, "Mirror-read: HAMMER PFS version mismatch!\n"); 1527 exit(1); 1528 } 1529 mrec_tmp->pfs.version = pfs.version; 1530 } 1531 1532 /* 1533 * Validate the pfs information from the originating filesystem 1534 * against the target filesystem. shared_uuid must match. 1535 * 1536 * return -1 if we got a TERM record 1537 */ 1538 static int 1539 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1540 struct hammer_ioc_mrecord_head *pickup, 1541 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1542 { 1543 struct hammer_ioc_pseudofs_rw pfs; 1544 struct hammer_pseudofs_data pfsd; 1545 hammer_ioc_mrecord_any_t mrec; 1546 int error; 1547 1548 /* 1549 * Get the PFSD info from the target filesystem. 1550 */ 1551 clrpfs(&pfs, &pfsd, pfs_id); 1552 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1553 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 1554 exit(1); 1555 } 1556 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1557 fprintf(stderr, "mirror-write: HAMMER PFS version mismatch!\n"); 1558 exit(1); 1559 } 1560 1561 mrec = read_mrecord(fdin, &error, pickup); 1562 if (mrec == NULL) { 1563 if (error == 0) 1564 fprintf(stderr, "validate_mrec_header: short read\n"); 1565 exit(1); 1566 } 1567 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1568 free(mrec); 1569 return(-1); 1570 } 1571 1572 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1573 fprintf(stderr, "validate_mrec_header: did not get expected " 1574 "PFSD record type\n"); 1575 exit(1); 1576 } 1577 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1578 fprintf(stderr, "validate_mrec_header: unexpected payload " 1579 "size\n"); 1580 exit(1); 1581 } 1582 if (mrec->pfs.version != pfs.version) { 1583 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 1584 exit(1); 1585 } 1586 1587 /* 1588 * Whew. Ok, is the read PFS info compatible with the target? 1589 */ 1590 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1591 sizeof(pfsd.shared_uuid)) != 0) { 1592 fprintf(stderr, 1593 "mirror-write: source and target have " 1594 "different shared-uuid's!\n"); 1595 exit(1); 1596 } 1597 if (is_target && hammer_is_pfs_master(&pfsd)) { 1598 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1599 exit(1); 1600 } 1601 if (tid_begp) 1602 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1603 if (tid_endp) 1604 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1605 free(mrec); 1606 return(0); 1607 } 1608 1609 static void 1610 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1611 { 1612 struct hammer_ioc_pseudofs_rw pfs; 1613 struct hammer_pseudofs_data pfsd; 1614 1615 clrpfs(&pfs, &pfsd, pfs_id); 1616 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1617 perror("update_pfs_snapshot (read)"); 1618 exit(1); 1619 } 1620 if (pfsd.sync_end_tid != snapshot_tid) { 1621 pfsd.sync_end_tid = snapshot_tid; 1622 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1623 perror("update_pfs_snapshot (rewrite)"); 1624 exit(1); 1625 } 1626 if (VerboseOpt >= 2) { 1627 fprintf(stderr, 1628 "Mirror-write: Completed, updated snapshot " 1629 "to %016jx\n", 1630 (uintmax_t)snapshot_tid); 1631 fflush(stderr); 1632 } 1633 } 1634 } 1635 1636 /* 1637 * Bandwidth-limited write in chunks 1638 */ 1639 static 1640 ssize_t 1641 writebw(int fd, const void *buf, size_t nbytes, 1642 uint64_t *bwcount, struct timeval *tv1) 1643 { 1644 struct timeval tv2; 1645 size_t n; 1646 ssize_t r; 1647 ssize_t a; 1648 int usec; 1649 1650 a = 0; 1651 r = 0; 1652 while (nbytes) { 1653 if (*bwcount + nbytes > BandwidthOpt) 1654 n = BandwidthOpt - *bwcount; 1655 else 1656 n = nbytes; 1657 if (n) 1658 r = write(fd, buf, n); 1659 if (r >= 0) { 1660 a += r; 1661 nbytes -= r; 1662 buf = (const char *)buf + r; 1663 } 1664 if ((size_t)r != n) 1665 break; 1666 *bwcount += n; 1667 if (*bwcount >= BandwidthOpt) { 1668 gettimeofday(&tv2, NULL); 1669 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1670 (int)(tv2.tv_usec - tv1->tv_usec); 1671 if (usec >= 0 && usec < 1000000) 1672 usleep(1000000 - usec); 1673 gettimeofday(tv1, NULL); 1674 *bwcount -= BandwidthOpt; 1675 } 1676 } 1677 return(a ? a : r); 1678 } 1679 1680 /* 1681 * Get a yes or no answer from the terminal. The program may be run as 1682 * part of a two-way pipe so we cannot use stdin for this operation. 1683 */ 1684 static int 1685 getyntty(void) 1686 { 1687 char buf[256]; 1688 FILE *fp; 1689 int result; 1690 1691 fp = fopen("/dev/tty", "r"); 1692 if (fp == NULL) { 1693 fprintf(stderr, "No terminal for response\n"); 1694 return(-1); 1695 } 1696 result = -1; 1697 while (fgets(buf, sizeof(buf), fp) != NULL) { 1698 if (buf[0] == 'y' || buf[0] == 'Y') { 1699 result = 1; 1700 break; 1701 } 1702 if (buf[0] == 'n' || buf[0] == 'N') { 1703 result = 0; 1704 break; 1705 } 1706 fprintf(stderr, "Response not understood\n"); 1707 break; 1708 } 1709 fclose(fp); 1710 return(result); 1711 } 1712 1713 static void 1714 score_printf(size_t i, size_t w, const char *ctl, ...) 1715 { 1716 va_list va; 1717 size_t n; 1718 static size_t SSize; 1719 static int SFd = -1; 1720 static char ScoreBuf[1024]; 1721 1722 if (ScoreBoardFile == NULL) 1723 return; 1724 assert(i + w < sizeof(ScoreBuf)); 1725 if (SFd < 0) { 1726 SFd = open(ScoreBoardFile, O_RDWR|O_CREAT|O_TRUNC, 0644); 1727 if (SFd < 0) 1728 return; 1729 SSize = 0; 1730 } 1731 for (n = 0; n < i; ++n) { 1732 if (ScoreBuf[n] == 0) 1733 ScoreBuf[n] = ' '; 1734 } 1735 va_start(va, ctl); 1736 vsnprintf(ScoreBuf + i, w - 1, ctl, va); 1737 va_end(va); 1738 n = strlen(ScoreBuf + i); 1739 while (n < w - 1) { 1740 ScoreBuf[i + n] = ' '; 1741 ++n; 1742 } 1743 ScoreBuf[i + n] = '\n'; 1744 if (SSize < i + w) 1745 SSize = i + w; 1746 pwrite(SFd, ScoreBuf, SSize, 0); 1747 } 1748 1749 static void 1750 hammer_check_restrict(const char *filesystem) 1751 { 1752 size_t rlen; 1753 int atslash; 1754 1755 if (RestrictTarget == NULL) 1756 return; 1757 rlen = strlen(RestrictTarget); 1758 if (strncmp(filesystem, RestrictTarget, rlen) != 0) { 1759 fprintf(stderr, "hammer-remote: restricted target\n"); 1760 exit(1); 1761 } 1762 atslash = 1; 1763 while (filesystem[rlen]) { 1764 if (atslash && 1765 filesystem[rlen] == '.' && 1766 filesystem[rlen+1] == '.') { 1767 fprintf(stderr, "hammer-remote: '..' not allowed\n"); 1768 exit(1); 1769 } 1770 if (filesystem[rlen] == '/') 1771 atslash = 1; 1772 else 1773 atslash = 0; 1774 ++rlen; 1775 } 1776 } 1777 1778 static void 1779 mirror_usage(int code) 1780 { 1781 fprintf(stderr, 1782 "hammer mirror-read <filesystem> [begin-tid]\n" 1783 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1784 "hammer mirror-write <filesystem>\n" 1785 "hammer mirror-dump [header]\n" 1786 "hammer mirror-copy [[user@]host:]<filesystem>" 1787 " [[user@]host:]<filesystem>\n" 1788 "hammer mirror-stream [[user@]host:]<filesystem>" 1789 " [[user@]host:]<filesystem>\n" 1790 ); 1791 exit(code); 1792 } 1793 1794