1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.16 2008/11/09 05:22:56 dillon Exp $ 35 */ 36 37 #include "hammer.h" 38 39 #define SERIALBUF_SIZE (512 * 1024) 40 41 typedef struct histogram { 42 hammer_tid_t tid; 43 u_int64_t bytes; 44 } *histogram_t; 45 46 static int read_mrecords(int fd, char *buf, u_int size, 47 hammer_ioc_mrecord_head_t pickup); 48 static int generate_histogram(int fd, const char *filesystem, 49 histogram_t *histogram_ary, 50 struct hammer_ioc_mirror_rw *mirror_base, 51 int *repeatp); 52 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 53 hammer_ioc_mrecord_head_t pickup); 54 static void write_mrecord(int fdout, u_int32_t type, 55 hammer_ioc_mrecord_any_t mrec, int bytes); 56 static void generate_mrec_header(int fd, int pfs_id, 57 union hammer_ioc_mrecord_any *mrec_tmp); 58 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 59 struct hammer_ioc_mrecord_head *pickup, 60 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 61 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 62 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 63 u_int64_t *bwcount, struct timeval *tv1); 64 static int getyn(void); 65 static void mirror_usage(int code); 66 67 /* 68 * Generate a mirroring data stream from the specific source over the 69 * entire key range, but restricted to the specified transaction range. 70 * 71 * The HAMMER VFS does most of the work, we add a few new mrecord 72 * types to negotiate the TID ranges and verify that the entire 73 * stream made it to the destination. 74 * 75 * streaming will be 0 for mirror-read, 1 for mirror-stream. The code will 76 * set up a fake value of -1 when running the histogram for mirror-read. 77 */ 78 void 79 hammer_cmd_mirror_read(char **av, int ac, int streaming) 80 { 81 struct hammer_ioc_mirror_rw mirror; 82 struct hammer_ioc_pseudofs_rw pfs; 83 union hammer_ioc_mrecord_any mrec_tmp; 84 struct hammer_ioc_mrecord_head pickup; 85 hammer_ioc_mrecord_any_t mrec; 86 hammer_tid_t sync_tid; 87 histogram_t histogram_ary; 88 const char *filesystem; 89 char *buf = malloc(SERIALBUF_SIZE); 90 int interrupted = 0; 91 int error; 92 int fd; 93 int n; 94 int didwork; 95 int histogram; 96 int histindex; 97 int histmax; 98 int repeat = 0; 99 int sameline; 100 int64_t total_bytes; 101 time_t base_t = time(NULL); 102 struct timeval bwtv; 103 u_int64_t bwcount; 104 u_int64_t estbytes; 105 106 if (ac == 0 || ac > 2) 107 mirror_usage(1); 108 filesystem = av[0]; 109 110 pickup.signature = 0; 111 pickup.type = 0; 112 histogram = 0; 113 histindex = 0; 114 histmax = 0; 115 histogram_ary = NULL; 116 sameline = 0; 117 118 again: 119 bzero(&mirror, sizeof(mirror)); 120 hammer_key_beg_init(&mirror.key_beg); 121 hammer_key_end_init(&mirror.key_end); 122 123 fd = getpfs(&pfs, filesystem); 124 125 if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) { 126 fprintf(stderr, "%cRunning \b\b", (sameline ? '\r' : '\n')); 127 fflush(stderr); 128 sameline = 1; 129 } 130 sameline = 1; 131 total_bytes = 0; 132 gettimeofday(&bwtv, NULL); 133 bwcount = 0; 134 135 /* 136 * Send initial header for the purpose of determining the 137 * shared-uuid. 138 */ 139 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 140 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 141 &mrec_tmp, sizeof(mrec_tmp.pfs)); 142 143 /* 144 * In 2-way mode the target will send us a PFS info packet 145 * first. Use the target's current snapshot TID as our default 146 * begin TID. 147 */ 148 if (TwoWayPipeOpt) { 149 mirror.tid_beg = 0; 150 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 151 NULL, &mirror.tid_beg); 152 if (n < 0) { /* got TERM record */ 153 relpfs(fd, &pfs); 154 return; 155 } 156 ++mirror.tid_beg; 157 } else if (streaming && histogram) { 158 mirror.tid_beg = histogram_ary[histindex].tid + 1; 159 } else { 160 mirror.tid_beg = 0; 161 } 162 163 /* 164 * Write out the PFS header, tid_beg will be updated if our PFS 165 * has a larger begin sync. tid_end is set to the latest source 166 * TID whos flush cycle has completed. 167 */ 168 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 169 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 170 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 171 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 172 mirror.ubuf = buf; 173 mirror.size = SERIALBUF_SIZE; 174 mirror.pfs_id = pfs.pfs_id; 175 mirror.shared_uuid = pfs.ondisk->shared_uuid; 176 177 /* 178 * XXX If the histogram is exhausted and the TID delta is large 179 * the stream might have been offline for a while and is 180 * now picking it up again. Do another histogram. 181 */ 182 #if 0 183 if (streaming && histogram && histindex == histend) { 184 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM) 185 histogram = 0; 186 } 187 #endif 188 189 /* 190 * Initial bulk startup control, try to do some incremental 191 * mirroring in order to allow the stream to be killed and 192 * restarted without having to start over. 193 */ 194 if (histogram == 0 && BulkOpt == 0) { 195 if (VerboseOpt && repeat == 0) { 196 fprintf(stderr, "\n"); 197 sameline = 0; 198 } 199 histmax = generate_histogram(fd, filesystem, 200 &histogram_ary, &mirror, 201 &repeat); 202 histindex = 0; 203 histogram = 1; 204 205 /* 206 * Just stream the histogram, then stop 207 */ 208 if (streaming == 0) 209 streaming = -1; 210 } 211 212 if (streaming && histogram) { 213 ++histindex; 214 mirror.tid_end = histogram_ary[histindex].tid; 215 estbytes = histogram_ary[histindex-1].bytes; 216 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end; 217 } else { 218 estbytes = 0; 219 } 220 221 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 222 &mrec_tmp, sizeof(mrec_tmp.pfs)); 223 224 /* 225 * A cycle file overrides the beginning TID only if we are 226 * not operating in two-way or histogram mode. 227 */ 228 if (TwoWayPipeOpt == 0 && histogram == 0) { 229 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 230 } 231 232 /* 233 * An additional argument overrides the beginning TID regardless 234 * of what mode we are in. This is not recommending if operating 235 * in two-way mode. 236 */ 237 if (ac == 2) 238 mirror.tid_beg = strtoull(av[1], NULL, 0); 239 240 if (streaming == 0 || VerboseOpt >= 2) { 241 fprintf(stderr, 242 "Mirror-read: Mirror %016jx to %016jx", 243 (uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end); 244 if (histogram) 245 fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes); 246 fprintf(stderr, "\n"); 247 fflush(stderr); 248 } 249 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 250 fprintf(stderr, "Mirror-read: Resuming at object %016jx\n", 251 (uintmax_t)mirror.key_beg.obj_id); 252 } 253 254 /* 255 * Nothing to do if begin equals end. 256 */ 257 if (mirror.tid_beg >= mirror.tid_end) { 258 if (streaming == 0 || VerboseOpt >= 2) 259 fprintf(stderr, "Mirror-read: No work to do\n"); 260 sleep(DelayOpt); 261 didwork = 0; 262 histogram = 0; 263 goto done; 264 } 265 didwork = 1; 266 267 /* 268 * Write out bulk records 269 */ 270 mirror.ubuf = buf; 271 mirror.size = SERIALBUF_SIZE; 272 273 do { 274 mirror.count = 0; 275 mirror.pfs_id = pfs.pfs_id; 276 mirror.shared_uuid = pfs.ondisk->shared_uuid; 277 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 278 fprintf(stderr, "Mirror-read %s failed: %s\n", 279 filesystem, strerror(errno)); 280 exit(1); 281 } 282 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 283 fprintf(stderr, 284 "Mirror-read %s fatal error %d\n", 285 filesystem, mirror.head.error); 286 exit(1); 287 } 288 if (mirror.count) { 289 if (BandwidthOpt) { 290 n = writebw(1, mirror.ubuf, mirror.count, 291 &bwcount, &bwtv); 292 } else { 293 n = write(1, mirror.ubuf, mirror.count); 294 } 295 if (n != mirror.count) { 296 fprintf(stderr, "Mirror-read %s failed: " 297 "short write\n", 298 filesystem); 299 exit(1); 300 } 301 } 302 total_bytes += mirror.count; 303 if (streaming && VerboseOpt) { 304 fprintf(stderr, 305 "\rscan obj=%016jx tids=%016jx:%016jx %11jd", 306 (uintmax_t)mirror.key_cur.obj_id, 307 (uintmax_t)mirror.tid_beg, 308 (uintmax_t)mirror.tid_end, 309 (intmax_t)total_bytes); 310 fflush(stderr); 311 sameline = 0; 312 } 313 mirror.key_beg = mirror.key_cur; 314 315 /* 316 * Deal with time limit option 317 */ 318 if (TimeoutOpt && 319 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 320 fprintf(stderr, 321 "Mirror-read %s interrupted by timer at" 322 " %016jx\n", 323 filesystem, 324 (uintmax_t)mirror.key_cur.obj_id); 325 interrupted = 1; 326 break; 327 } 328 } while (mirror.count != 0); 329 330 done: 331 if (streaming && VerboseOpt && sameline == 0) { 332 fprintf(stderr, "\n"); 333 fflush(stderr); 334 sameline = 1; 335 } 336 337 /* 338 * Write out the termination sync record - only if not interrupted 339 */ 340 if (interrupted == 0) { 341 if (didwork) { 342 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 343 &mrec_tmp, sizeof(mrec_tmp.sync)); 344 } else { 345 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 346 &mrec_tmp, sizeof(mrec_tmp.sync)); 347 } 348 } 349 350 /* 351 * If the -2 option was given (automatic when doing mirror-copy), 352 * a two-way pipe is assumed and we expect a response mrec from 353 * the target. 354 */ 355 if (TwoWayPipeOpt) { 356 mrec = read_mrecord(0, &error, &pickup); 357 if (mrec == NULL || 358 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 359 mrec->head.rec_size != sizeof(mrec->update)) { 360 fprintf(stderr, "mirror_read: Did not get final " 361 "acknowledgement packet from target\n"); 362 exit(1); 363 } 364 if (interrupted) { 365 if (CyclePath) { 366 hammer_set_cycle(&mirror.key_cur, 367 mirror.tid_beg); 368 fprintf(stderr, "Cyclefile %s updated for " 369 "continuation\n", CyclePath); 370 } 371 } else { 372 sync_tid = mrec->update.tid; 373 if (CyclePath) { 374 hammer_key_beg_init(&mirror.key_beg); 375 hammer_set_cycle(&mirror.key_beg, sync_tid); 376 fprintf(stderr, 377 "Cyclefile %s updated to 0x%016jx\n", 378 CyclePath, (uintmax_t)sync_tid); 379 } 380 } 381 } else if (CyclePath) { 382 /* NOTE! mirror.tid_beg cannot be updated */ 383 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 384 "fully updated unless you use mirror-copy\n"); 385 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 386 } 387 if (streaming && interrupted == 0) { 388 time_t t1 = time(NULL); 389 time_t t2; 390 391 /* 392 * Try to break down large bulk transfers into smaller ones 393 * so it can sync the transaction id on the slave. This 394 * way if we get interrupted a restart doesn't have to 395 * start from scratch. 396 */ 397 if (streaming && histogram) { 398 if (histindex != histmax) { 399 if (VerboseOpt && VerboseOpt < 2 && 400 streaming >= 0) { 401 fprintf(stderr, " (bulk incremental)"); 402 } 403 relpfs(fd, &pfs); 404 goto again; 405 } 406 } 407 408 if (VerboseOpt && streaming >= 0) { 409 fprintf(stderr, " W"); 410 fflush(stderr); 411 } 412 pfs.ondisk->sync_end_tid = mirror.tid_end; 413 if (streaming < 0) { 414 /* 415 * Fake streaming mode when using a histogram to 416 * break up a mirror-read, do not wait on source. 417 */ 418 streaming = 0; 419 } else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 420 fprintf(stderr, "Mirror-read %s: cannot stream: %s\n", 421 filesystem, strerror(errno)); 422 } else { 423 t2 = time(NULL) - t1; 424 if (t2 >= 0 && t2 < DelayOpt) { 425 if (VerboseOpt) { 426 fprintf(stderr, "\bD"); 427 fflush(stderr); 428 } 429 sleep(DelayOpt - t2); 430 } 431 if (VerboseOpt) { 432 fprintf(stderr, "\b "); 433 fflush(stderr); 434 } 435 relpfs(fd, &pfs); 436 goto again; 437 } 438 } 439 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 440 &mrec_tmp, sizeof(mrec_tmp.sync)); 441 relpfs(fd, &pfs); 442 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 443 } 444 445 /* 446 * What we are trying to do here is figure out how much data is 447 * going to be sent for the TID range and to break the TID range 448 * down into reasonably-sized slices (from the point of view of 449 * data sent) so a lost connection can restart at a reasonable 450 * place and not all the way back at the beginning. 451 * 452 * An entry's TID serves as the end_tid for the prior entry 453 * So we have to offset the calculation by 1 so that TID falls into 454 * the previous entry when populating entries. 455 * 456 * Because the transaction id space is bursty we need a relatively 457 * large number of buckets (like a million) to do a reasonable job 458 * for things like an initial bulk mirrors on a very large filesystem. 459 */ 460 #define HIST_COUNT (1024 * 1024) 461 462 static int 463 generate_histogram(int fd, const char *filesystem, 464 histogram_t *histogram_ary, 465 struct hammer_ioc_mirror_rw *mirror_base, 466 int *repeatp) 467 { 468 struct hammer_ioc_mirror_rw mirror; 469 union hammer_ioc_mrecord_any *mrec; 470 hammer_tid_t tid_beg; 471 hammer_tid_t tid_end; 472 hammer_tid_t tid; 473 hammer_tid_t tidx; 474 u_int64_t *tid_bytes; 475 u_int64_t total; 476 u_int64_t accum; 477 int i; 478 int res; 479 int off; 480 int len; 481 482 mirror = *mirror_base; 483 tid_beg = mirror.tid_beg; 484 tid_end = mirror.tid_end; 485 mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA; 486 487 if (*histogram_ary == NULL) { 488 *histogram_ary = malloc(sizeof(struct histogram) * 489 (HIST_COUNT + 2)); 490 } 491 if (tid_beg >= tid_end) 492 return(0); 493 494 /* needs 2 extra */ 495 tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2)); 496 bzero(tid_bytes, sizeof(tid_bytes)); 497 498 if (*repeatp == 0) { 499 fprintf(stderr, "Prescan to break up bulk transfer"); 500 if (VerboseOpt > 1) 501 fprintf(stderr, " (%juMB chunks)", 502 (uintmax_t)(SplitupOpt / (1024 * 1024))); 503 fprintf(stderr, "\n"); 504 } 505 506 /* 507 * Note: (tid_beg,tid_end), range is inclusive of both beg & end. 508 * 509 * Note: Estimates can be off when the mirror is way behind due 510 * to skips. 511 */ 512 total = 0; 513 accum = 0; 514 for (;;) { 515 mirror.count = 0; 516 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 517 fprintf(stderr, "Mirror-read %s failed: %s\n", 518 filesystem, strerror(errno)); 519 exit(1); 520 } 521 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 522 fprintf(stderr, 523 "Mirror-read %s fatal error %d\n", 524 filesystem, mirror.head.error); 525 exit(1); 526 } 527 for (off = 0; 528 off < mirror.count; 529 off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size) 530 ) { 531 mrec = (void *)((char *)mirror.ubuf + off); 532 533 /* 534 * We only care about general RECs and PASS 535 * records. We ignore SKIPs. 536 */ 537 switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) { 538 case HAMMER_MREC_TYPE_REC: 539 case HAMMER_MREC_TYPE_PASS: 540 break; 541 default: 542 continue; 543 } 544 545 /* 546 * Calculate for two indices, create_tid and 547 * delete_tid. Record data only applies to 548 * the create_tid. 549 * 550 * When tid is exactly on the boundary it really 551 * belongs to the previous entry because scans 552 * are inclusive of the ending entry. 553 */ 554 tid = mrec->rec.leaf.base.delete_tid; 555 if (tid && tid >= tid_beg && tid <= tid_end) { 556 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 557 if (mrec->head.type == 558 HAMMER_MREC_TYPE_REC) { 559 len -= HAMMER_HEAD_DOALIGN( 560 mrec->rec.leaf.data_len); 561 assert(len > 0); 562 } 563 i = (tid - tid_beg) * HIST_COUNT / 564 (tid_end - tid_beg); 565 tidx = tid_beg + i * (tid_end - tid_beg) / 566 HIST_COUNT; 567 if (tid == tidx && i) 568 --i; 569 assert(i >= 0 && i < HIST_COUNT); 570 tid_bytes[i] += len; 571 total += len; 572 accum += len; 573 } 574 575 tid = mrec->rec.leaf.base.create_tid; 576 if (tid && tid >= tid_beg && tid <= tid_end) { 577 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 578 if (mrec->head.type == 579 HAMMER_MREC_TYPE_REC_NODATA) { 580 len += HAMMER_HEAD_DOALIGN( 581 mrec->rec.leaf.data_len); 582 } 583 i = (tid - tid_beg) * HIST_COUNT / 584 (tid_end - tid_beg); 585 tidx = tid_beg + i * (tid_end - tid_beg) / 586 HIST_COUNT; 587 if (tid == tidx && i) 588 --i; 589 assert(i >= 0 && i < HIST_COUNT); 590 tid_bytes[i] += len; 591 total += len; 592 accum += len; 593 } 594 } 595 if (VerboseOpt > 1) { 596 if (*repeatp == 0 && accum > SplitupOpt) { 597 fprintf(stderr, "."); 598 fflush(stderr); 599 accum = 0; 600 } 601 } 602 if (mirror.count == 0) 603 break; 604 mirror.key_beg = mirror.key_cur; 605 } 606 607 /* 608 * Reduce to SplitupOpt (default 4GB) chunks. This code may 609 * use up to two additional elements. Do the array in-place. 610 * 611 * Inefficient degenerate cases can occur if we do not accumulate 612 * at least the requested split amount, so error on the side of 613 * going over a bit. 614 */ 615 res = 0; 616 (*histogram_ary)[res].tid = tid_beg; 617 (*histogram_ary)[res].bytes = tid_bytes[0]; 618 for (i = 1; i < HIST_COUNT; ++i) { 619 if ((*histogram_ary)[res].bytes >= SplitupOpt) { 620 ++res; 621 (*histogram_ary)[res].tid = tid_beg + 622 i * (tid_end - tid_beg) / 623 HIST_COUNT; 624 (*histogram_ary)[res].bytes = 0; 625 626 } 627 (*histogram_ary)[res].bytes += tid_bytes[i]; 628 } 629 ++res; 630 (*histogram_ary)[res].tid = tid_end; 631 (*histogram_ary)[res].bytes = -1; 632 633 if (*repeatp == 0) { 634 if (VerboseOpt > 1) 635 fprintf(stderr, "\n"); /* newline after ... */ 636 fprintf(stderr, "Prescan %d chunks, total %ju MBytes (", 637 res, (uintmax_t)total / (1024 * 1024)); 638 for (i = 0; i < res && i < 3; ++i) { 639 if (i) 640 fprintf(stderr, ", "); 641 fprintf(stderr, "%ju", 642 (uintmax_t)(*histogram_ary)[i].bytes); 643 } 644 if (i < res) 645 fprintf(stderr, ", ..."); 646 fprintf(stderr, ")\n"); 647 } 648 assert(res <= HIST_COUNT); 649 *repeatp = 1; 650 651 free(tid_bytes); 652 return(res); 653 } 654 655 static void 656 create_pfs(const char *filesystem, uuid_t *s_uuid) 657 { 658 if (ForceYesOpt == 1) { 659 fprintf(stderr, "PFS slave %s does not exist. " 660 "Auto create new slave PFS!\n", filesystem); 661 662 } else { 663 fprintf(stderr, "PFS slave %s does not exist.\n" 664 "Do you want to create a new slave PFS? (yes|no) ", 665 filesystem); 666 fflush(stderr); 667 if (getyn() != 1) { 668 fprintf(stderr, "Aborting operation\n"); 669 exit(1); 670 } 671 } 672 673 u_int32_t status; 674 char *shared_uuid = NULL; 675 uuid_to_string(s_uuid, &shared_uuid, &status); 676 677 char *cmd = NULL; 678 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 679 filesystem, shared_uuid); 680 free(shared_uuid); 681 682 if (cmd == NULL) { 683 fprintf(stderr, "Failed to alloc memory\n"); 684 exit(1); 685 } 686 if (system(cmd) != 0) { 687 fprintf(stderr, "Failed to create PFS\n"); 688 } 689 free(cmd); 690 } 691 692 /* 693 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 694 * some additional packet types to negotiate TID ranges and to verify 695 * completion. The HAMMER VFS does most of the work. 696 * 697 * It is important to note that the mirror.key_{beg,end} range must 698 * match the ranged used by the original. For now both sides use 699 * range the entire key space. 700 * 701 * It is even more important that the records in the stream conform 702 * to the TID range also supplied in the stream. The HAMMER VFS will 703 * use the REC, PASS, and SKIP record types to track the portions of 704 * the B-Tree being scanned in order to be able to proactively delete 705 * records on the target within those active areas that are not mentioned 706 * by the source. 707 * 708 * The mirror.key_cur field is used by the VFS to do this tracking. It 709 * must be initialized to key_beg but then is persistently updated by 710 * the HAMMER VFS on each successive ioctl() call. If you blow up this 711 * field you will blow up the mirror target, possibly to the point of 712 * deleting everything. As a safety measure the HAMMER VFS simply marks 713 * the records that the source has destroyed as deleted on the target, 714 * and normal pruning operations will deal with their final disposition 715 * at some later time. 716 */ 717 void 718 hammer_cmd_mirror_write(char **av, int ac) 719 { 720 struct hammer_ioc_mirror_rw mirror; 721 const char *filesystem; 722 char *buf = malloc(SERIALBUF_SIZE); 723 struct hammer_ioc_pseudofs_rw pfs; 724 struct hammer_ioc_mrecord_head pickup; 725 struct hammer_ioc_synctid synctid; 726 union hammer_ioc_mrecord_any mrec_tmp; 727 hammer_ioc_mrecord_any_t mrec; 728 struct stat st; 729 int error; 730 int fd; 731 int n; 732 733 if (ac != 1) 734 mirror_usage(1); 735 filesystem = av[0]; 736 737 pickup.signature = 0; 738 pickup.type = 0; 739 740 again: 741 bzero(&mirror, sizeof(mirror)); 742 hammer_key_beg_init(&mirror.key_beg); 743 hammer_key_end_init(&mirror.key_end); 744 mirror.key_end = mirror.key_beg; 745 746 /* 747 * Read initial packet 748 */ 749 mrec = read_mrecord(0, &error, &pickup); 750 if (mrec == NULL) { 751 if (error == 0) 752 fprintf(stderr, "validate_mrec_header: short read\n"); 753 exit(1); 754 } 755 /* 756 * Validate packet 757 */ 758 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 759 return; 760 } 761 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 762 fprintf(stderr, "validate_mrec_header: did not get expected " 763 "PFSD record type\n"); 764 exit(1); 765 } 766 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 767 fprintf(stderr, "validate_mrec_header: unexpected payload " 768 "size\n"); 769 exit(1); 770 } 771 772 /* 773 * Create slave PFS if it doesn't yet exist 774 */ 775 if (lstat(filesystem, &st) != 0) { 776 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 777 } 778 free(mrec); 779 mrec = NULL; 780 781 fd = getpfs(&pfs, filesystem); 782 783 /* 784 * In two-way mode the target writes out a PFS packet first. 785 * The source uses our tid_end as its tid_beg by default, 786 * picking up where it left off. 787 */ 788 mirror.tid_beg = 0; 789 if (TwoWayPipeOpt) { 790 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 791 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 792 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 793 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 794 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 795 &mrec_tmp, sizeof(mrec_tmp.pfs)); 796 } 797 798 /* 799 * Read and process the PFS header. The source informs us of 800 * the TID range the stream represents. 801 */ 802 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 803 &mirror.tid_beg, &mirror.tid_end); 804 if (n < 0) { /* got TERM record */ 805 relpfs(fd, &pfs); 806 return; 807 } 808 809 mirror.ubuf = buf; 810 mirror.size = SERIALBUF_SIZE; 811 812 /* 813 * Read and process bulk records (REC, PASS, and SKIP types). 814 * 815 * On your life, do NOT mess with mirror.key_cur or your mirror 816 * target may become history. 817 */ 818 for (;;) { 819 mirror.count = 0; 820 mirror.pfs_id = pfs.pfs_id; 821 mirror.shared_uuid = pfs.ondisk->shared_uuid; 822 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 823 if (mirror.size <= 0) 824 break; 825 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 826 fprintf(stderr, "Mirror-write %s failed: %s\n", 827 filesystem, strerror(errno)); 828 exit(1); 829 } 830 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 831 fprintf(stderr, 832 "Mirror-write %s fatal error %d\n", 833 filesystem, mirror.head.error); 834 exit(1); 835 } 836 #if 0 837 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 838 fprintf(stderr, 839 "Mirror-write %s interrupted by timer at" 840 " %016llx\n", 841 filesystem, 842 mirror.key_cur.obj_id); 843 exit(0); 844 } 845 #endif 846 } 847 848 /* 849 * Read and process the termination sync record. 850 */ 851 mrec = read_mrecord(0, &error, &pickup); 852 853 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 854 fprintf(stderr, "Mirror-write: received termination request\n"); 855 free(mrec); 856 return; 857 } 858 859 if (mrec == NULL || 860 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 861 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 862 mrec->head.rec_size != sizeof(mrec->sync)) { 863 fprintf(stderr, "Mirror-write %s: Did not get termination " 864 "sync record, or rec_size is wrong rt=%d\n", 865 filesystem, mrec->head.type); 866 exit(1); 867 } 868 869 /* 870 * Update the PFS info on the target so the user has visibility 871 * into the new snapshot, and sync the target filesystem. 872 */ 873 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 874 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 875 876 bzero(&synctid, sizeof(synctid)); 877 synctid.op = HAMMER_SYNCTID_SYNC2; 878 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 879 880 if (VerboseOpt >= 2) { 881 fprintf(stderr, "Mirror-write %s: succeeded\n", 882 filesystem); 883 } 884 } 885 886 free(mrec); 887 mrec = NULL; 888 889 /* 890 * Report back to the originator. 891 */ 892 if (TwoWayPipeOpt) { 893 mrec_tmp.update.tid = mirror.tid_end; 894 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 895 &mrec_tmp, sizeof(mrec_tmp.update)); 896 } else { 897 printf("Source can update synctid to 0x%016jx\n", 898 (uintmax_t)mirror.tid_end); 899 } 900 relpfs(fd, &pfs); 901 goto again; 902 } 903 904 void 905 hammer_cmd_mirror_dump(void) 906 { 907 char *buf = malloc(SERIALBUF_SIZE); 908 struct hammer_ioc_mrecord_head pickup; 909 hammer_ioc_mrecord_any_t mrec; 910 int error; 911 int size; 912 int offset; 913 int bytes; 914 915 /* 916 * Read and process the PFS header 917 */ 918 pickup.signature = 0; 919 pickup.type = 0; 920 921 mrec = read_mrecord(0, &error, &pickup); 922 923 again: 924 /* 925 * Read and process bulk records 926 */ 927 for (;;) { 928 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 929 if (size <= 0) 930 break; 931 offset = 0; 932 while (offset < size) { 933 mrec = (void *)((char *)buf + offset); 934 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 935 if (offset + bytes > size) { 936 fprintf(stderr, "Misaligned record\n"); 937 exit(1); 938 } 939 940 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 941 case HAMMER_MREC_TYPE_REC_BADCRC: 942 case HAMMER_MREC_TYPE_REC: 943 printf("Record obj=%016jx key=%016jx " 944 "rt=%02x ot=%02x", 945 (uintmax_t)mrec->rec.leaf.base.obj_id, 946 (uintmax_t)mrec->rec.leaf.base.key, 947 mrec->rec.leaf.base.rec_type, 948 mrec->rec.leaf.base.obj_type); 949 if (mrec->head.type == 950 HAMMER_MREC_TYPE_REC_BADCRC) { 951 printf(" (BAD CRC)"); 952 } 953 printf("\n"); 954 printf(" tids %016jx:%016jx data=%d\n", 955 (uintmax_t)mrec->rec.leaf.base.create_tid, 956 (uintmax_t)mrec->rec.leaf.base.delete_tid, 957 mrec->rec.leaf.data_len); 958 break; 959 case HAMMER_MREC_TYPE_PASS: 960 printf("Pass obj=%016jx key=%016jx " 961 "rt=%02x ot=%02x\n", 962 (uintmax_t)mrec->rec.leaf.base.obj_id, 963 (uintmax_t)mrec->rec.leaf.base.key, 964 mrec->rec.leaf.base.rec_type, 965 mrec->rec.leaf.base.obj_type); 966 printf(" tids %016jx:%016jx data=%d\n", 967 (uintmax_t)mrec->rec.leaf.base.create_tid, 968 (uintmax_t)mrec->rec.leaf.base.delete_tid, 969 mrec->rec.leaf.data_len); 970 break; 971 case HAMMER_MREC_TYPE_SKIP: 972 printf("Skip obj=%016jx key=%016jx rt=%02x to\n" 973 " obj=%016jx key=%016jx rt=%02x\n", 974 (uintmax_t)mrec->skip.skip_beg.obj_id, 975 (uintmax_t)mrec->skip.skip_beg.key, 976 mrec->skip.skip_beg.rec_type, 977 (uintmax_t)mrec->skip.skip_end.obj_id, 978 (uintmax_t)mrec->skip.skip_end.key, 979 mrec->skip.skip_end.rec_type); 980 default: 981 break; 982 } 983 offset += bytes; 984 } 985 } 986 987 /* 988 * Read and process the termination sync record. 989 */ 990 mrec = read_mrecord(0, &error, &pickup); 991 if (mrec == NULL || 992 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 993 mrec->head.type != HAMMER_MREC_TYPE_IDLE) 994 ) { 995 fprintf(stderr, "Mirror-dump: Did not get termination " 996 "sync record\n"); 997 } 998 999 /* 1000 * Continue with more batches until EOF. 1001 */ 1002 mrec = read_mrecord(0, &error, &pickup); 1003 if (mrec) 1004 goto again; 1005 } 1006 1007 void 1008 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 1009 { 1010 pid_t pid1; 1011 pid_t pid2; 1012 int fds[2]; 1013 const char *xav[32]; 1014 char tbuf[16]; 1015 char *ptr; 1016 int xac; 1017 1018 if (ac != 2) 1019 mirror_usage(1); 1020 1021 TwoWayPipeOpt = 1; 1022 signal(SIGPIPE, SIG_IGN); 1023 1024 again: 1025 if (pipe(fds) < 0) { 1026 perror("pipe"); 1027 exit(1); 1028 } 1029 1030 /* 1031 * Source 1032 */ 1033 if ((pid1 = fork()) == 0) { 1034 signal(SIGPIPE, SIG_DFL); 1035 dup2(fds[0], 0); 1036 dup2(fds[0], 1); 1037 close(fds[0]); 1038 close(fds[1]); 1039 if ((ptr = strchr(av[0], ':')) != NULL) { 1040 *ptr++ = 0; 1041 xac = 0; 1042 xav[xac++] = "ssh"; 1043 if (CompressOpt) 1044 xav[xac++] = "-C"; 1045 if (SshPort) { 1046 xav[xac++] = "-p"; 1047 xav[xac++] = SshPort; 1048 } 1049 xav[xac++] = av[0]; 1050 xav[xac++] = "hammer"; 1051 1052 switch(VerboseOpt) { 1053 case 0: 1054 break; 1055 case 1: 1056 xav[xac++] = "-v"; 1057 break; 1058 case 2: 1059 xav[xac++] = "-vv"; 1060 break; 1061 default: 1062 xav[xac++] = "-vvv"; 1063 break; 1064 } 1065 if (ForceYesOpt) { 1066 xav[xac++] = "-y"; 1067 } 1068 xav[xac++] = "-2"; 1069 if (TimeoutOpt) { 1070 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 1071 xav[xac++] = "-t"; 1072 xav[xac++] = tbuf; 1073 } 1074 if (SplitupOptStr) { 1075 xav[xac++] = "-S"; 1076 xav[xac++] = SplitupOptStr; 1077 } 1078 if (streaming) 1079 xav[xac++] = "mirror-read-stream"; 1080 else 1081 xav[xac++] = "mirror-read"; 1082 xav[xac++] = ptr; 1083 xav[xac++] = NULL; 1084 execv("/usr/bin/ssh", (void *)xav); 1085 } else { 1086 hammer_cmd_mirror_read(av, 1, streaming); 1087 fflush(stdout); 1088 fflush(stderr); 1089 } 1090 _exit(1); 1091 } 1092 1093 /* 1094 * Target 1095 */ 1096 if ((pid2 = fork()) == 0) { 1097 signal(SIGPIPE, SIG_DFL); 1098 dup2(fds[1], 0); 1099 dup2(fds[1], 1); 1100 close(fds[0]); 1101 close(fds[1]); 1102 if ((ptr = strchr(av[1], ':')) != NULL) { 1103 *ptr++ = 0; 1104 xac = 0; 1105 xav[xac++] = "ssh"; 1106 if (CompressOpt) 1107 xav[xac++] = "-C"; 1108 if (SshPort) { 1109 xav[xac++] = "-p"; 1110 xav[xac++] = SshPort; 1111 } 1112 xav[xac++] = av[1]; 1113 xav[xac++] = "hammer"; 1114 1115 switch(VerboseOpt) { 1116 case 0: 1117 break; 1118 case 1: 1119 xav[xac++] = "-v"; 1120 break; 1121 case 2: 1122 xav[xac++] = "-vv"; 1123 break; 1124 default: 1125 xav[xac++] = "-vvv"; 1126 break; 1127 } 1128 if (ForceYesOpt) { 1129 xav[xac++] = "-y"; 1130 } 1131 xav[xac++] = "-2"; 1132 xav[xac++] = "mirror-write"; 1133 xav[xac++] = ptr; 1134 xav[xac++] = NULL; 1135 execv("/usr/bin/ssh", (void *)xav); 1136 } else { 1137 hammer_cmd_mirror_write(av + 1, 1); 1138 fflush(stdout); 1139 fflush(stderr); 1140 } 1141 _exit(1); 1142 } 1143 close(fds[0]); 1144 close(fds[1]); 1145 1146 while (waitpid(pid1, NULL, 0) <= 0) 1147 ; 1148 while (waitpid(pid2, NULL, 0) <= 0) 1149 ; 1150 1151 /* 1152 * If the link is lost restart 1153 */ 1154 if (streaming) { 1155 if (VerboseOpt) { 1156 fprintf(stderr, "\nLost Link\n"); 1157 fflush(stderr); 1158 } 1159 sleep(15 + DelayOpt); 1160 goto again; 1161 } 1162 1163 } 1164 1165 /* 1166 * Read and return multiple mrecords 1167 */ 1168 static int 1169 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 1170 { 1171 hammer_ioc_mrecord_any_t mrec; 1172 u_int count; 1173 size_t n; 1174 size_t i; 1175 size_t bytes; 1176 int type; 1177 1178 count = 0; 1179 while (size - count >= HAMMER_MREC_HEADSIZE) { 1180 /* 1181 * Cached the record header in case we run out of buffer 1182 * space. 1183 */ 1184 fflush(stdout); 1185 if (pickup->signature == 0) { 1186 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1187 i = read(fd, (char *)pickup + n, 1188 HAMMER_MREC_HEADSIZE - n); 1189 if (i <= 0) 1190 break; 1191 } 1192 if (n == 0) 1193 break; 1194 if (n != HAMMER_MREC_HEADSIZE) { 1195 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1196 exit(1); 1197 } 1198 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1199 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1200 "bad signature\n"); 1201 exit(1); 1202 } 1203 } 1204 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 1205 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 1206 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1207 "illegal rec_size\n"); 1208 exit(1); 1209 } 1210 1211 /* 1212 * Stop if we have insufficient space for the record and data. 1213 */ 1214 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 1215 if (size - count < bytes) 1216 break; 1217 1218 /* 1219 * Stop if the record type is not a REC, SKIP, or PASS, 1220 * which are the only types the ioctl supports. Other types 1221 * are used only by the userland protocol. 1222 * 1223 * Ignore all flags. 1224 */ 1225 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 1226 if (type != HAMMER_MREC_TYPE_PFSD && 1227 type != HAMMER_MREC_TYPE_REC && 1228 type != HAMMER_MREC_TYPE_SKIP && 1229 type != HAMMER_MREC_TYPE_PASS) { 1230 break; 1231 } 1232 1233 /* 1234 * Read the remainder and clear the pickup signature. 1235 */ 1236 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 1237 i = read(fd, buf + count + n, bytes - n); 1238 if (i <= 0) 1239 break; 1240 } 1241 if (n != bytes) { 1242 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1243 exit(1); 1244 } 1245 1246 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 1247 pickup->signature = 0; 1248 pickup->type = 0; 1249 mrec = (void *)(buf + count); 1250 1251 /* 1252 * Validate the completed record 1253 */ 1254 if (mrec->head.rec_crc != 1255 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1256 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1257 fprintf(stderr, "read_mrecords: malformed record " 1258 "on pipe, bad crc\n"); 1259 exit(1); 1260 } 1261 1262 /* 1263 * If its a B-Tree record validate the data crc. 1264 * 1265 * NOTE: If the VFS passes us an explicitly errorde mrec 1266 * we just pass it through. 1267 */ 1268 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 1269 1270 if (type == HAMMER_MREC_TYPE_REC) { 1271 if (mrec->head.rec_size < 1272 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 1273 fprintf(stderr, 1274 "read_mrecords: malformed record on " 1275 "pipe, illegal element data_len\n"); 1276 exit(1); 1277 } 1278 if (mrec->rec.leaf.data_len && 1279 mrec->rec.leaf.data_offset && 1280 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 1281 fprintf(stderr, 1282 "read_mrecords: data_crc did not " 1283 "match data! obj=%016jx key=%016jx\n", 1284 (uintmax_t)mrec->rec.leaf.base.obj_id, 1285 (uintmax_t)mrec->rec.leaf.base.key); 1286 fprintf(stderr, 1287 "continuing, but there are problems\n"); 1288 } 1289 } 1290 count += bytes; 1291 } 1292 return(count); 1293 } 1294 1295 /* 1296 * Read and return a single mrecord. 1297 */ 1298 static 1299 hammer_ioc_mrecord_any_t 1300 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 1301 { 1302 hammer_ioc_mrecord_any_t mrec; 1303 struct hammer_ioc_mrecord_head mrechd; 1304 size_t bytes; 1305 size_t n; 1306 size_t i; 1307 1308 if (pickup && pickup->type != 0) { 1309 mrechd = *pickup; 1310 pickup->signature = 0; 1311 pickup->type = 0; 1312 n = HAMMER_MREC_HEADSIZE; 1313 } else { 1314 /* 1315 * Read in the PFSD header from the sender. 1316 */ 1317 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1318 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 1319 if (i <= 0) 1320 break; 1321 } 1322 if (n == 0) { 1323 *errorp = 0; /* EOF */ 1324 return(NULL); 1325 } 1326 if (n != HAMMER_MREC_HEADSIZE) { 1327 fprintf(stderr, "short read of mrecord header\n"); 1328 *errorp = EPIPE; 1329 return(NULL); 1330 } 1331 } 1332 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1333 fprintf(stderr, "read_mrecord: bad signature\n"); 1334 *errorp = EINVAL; 1335 return(NULL); 1336 } 1337 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 1338 assert(bytes >= sizeof(mrechd)); 1339 mrec = malloc(bytes); 1340 mrec->head = mrechd; 1341 1342 while (n < bytes) { 1343 i = read(fdin, (char *)mrec + n, bytes - n); 1344 if (i <= 0) 1345 break; 1346 n += i; 1347 } 1348 if (n != bytes) { 1349 fprintf(stderr, "read_mrecord: short read on payload\n"); 1350 *errorp = EPIPE; 1351 return(NULL); 1352 } 1353 if (mrec->head.rec_crc != 1354 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1355 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1356 fprintf(stderr, "read_mrecord: bad CRC\n"); 1357 *errorp = EINVAL; 1358 return(NULL); 1359 } 1360 *errorp = 0; 1361 return(mrec); 1362 } 1363 1364 static 1365 void 1366 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec, 1367 int bytes) 1368 { 1369 char zbuf[HAMMER_HEAD_ALIGN]; 1370 int pad; 1371 1372 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 1373 1374 assert(bytes >= (int)sizeof(mrec->head)); 1375 bzero(&mrec->head, sizeof(mrec->head)); 1376 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 1377 mrec->head.type = type; 1378 mrec->head.rec_size = bytes; 1379 mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1380 bytes - HAMMER_MREC_CRCOFF); 1381 if (write(fdout, mrec, bytes) != bytes) { 1382 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1383 errno, strerror(errno)); 1384 exit(1); 1385 } 1386 if (pad) { 1387 bzero(zbuf, pad); 1388 if (write(fdout, zbuf, pad) != pad) { 1389 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1390 errno, strerror(errno)); 1391 exit(1); 1392 } 1393 } 1394 } 1395 1396 /* 1397 * Generate a mirroring header with the pfs information of the 1398 * originating filesytem. 1399 */ 1400 static void 1401 generate_mrec_header(int fd, int pfs_id, 1402 union hammer_ioc_mrecord_any *mrec_tmp) 1403 { 1404 struct hammer_ioc_pseudofs_rw pfs; 1405 1406 bzero(&pfs, sizeof(pfs)); 1407 bzero(mrec_tmp, sizeof(*mrec_tmp)); 1408 pfs.pfs_id = pfs_id; 1409 pfs.ondisk = &mrec_tmp->pfs.pfsd; 1410 pfs.bytes = sizeof(mrec_tmp->pfs.pfsd); 1411 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1412 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 1413 exit(1); 1414 } 1415 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1416 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n"); 1417 exit(1); 1418 } 1419 mrec_tmp->pfs.version = pfs.version; 1420 } 1421 1422 /* 1423 * Validate the pfs information from the originating filesystem 1424 * against the target filesystem. shared_uuid must match. 1425 * 1426 * return -1 if we got a TERM record 1427 */ 1428 static int 1429 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1430 struct hammer_ioc_mrecord_head *pickup, 1431 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1432 { 1433 struct hammer_ioc_pseudofs_rw pfs; 1434 struct hammer_pseudofs_data pfsd; 1435 hammer_ioc_mrecord_any_t mrec; 1436 int error; 1437 1438 /* 1439 * Get the PFSD info from the target filesystem. 1440 */ 1441 bzero(&pfs, sizeof(pfs)); 1442 bzero(&pfsd, sizeof(pfsd)); 1443 pfs.pfs_id = pfs_id; 1444 pfs.ondisk = &pfsd; 1445 pfs.bytes = sizeof(pfsd); 1446 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1447 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 1448 exit(1); 1449 } 1450 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1451 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n"); 1452 exit(1); 1453 } 1454 1455 mrec = read_mrecord(fdin, &error, pickup); 1456 if (mrec == NULL) { 1457 if (error == 0) 1458 fprintf(stderr, "validate_mrec_header: short read\n"); 1459 exit(1); 1460 } 1461 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1462 free(mrec); 1463 return(-1); 1464 } 1465 1466 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1467 fprintf(stderr, "validate_mrec_header: did not get expected " 1468 "PFSD record type\n"); 1469 exit(1); 1470 } 1471 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1472 fprintf(stderr, "validate_mrec_header: unexpected payload " 1473 "size\n"); 1474 exit(1); 1475 } 1476 if (mrec->pfs.version != pfs.version) { 1477 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 1478 exit(1); 1479 } 1480 1481 /* 1482 * Whew. Ok, is the read PFS info compatible with the target? 1483 */ 1484 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1485 sizeof(pfsd.shared_uuid)) != 0) { 1486 fprintf(stderr, 1487 "mirror-write: source and target have " 1488 "different shared-uuid's!\n"); 1489 exit(1); 1490 } 1491 if (is_target && 1492 (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) { 1493 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1494 exit(1); 1495 } 1496 if (tid_begp) 1497 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1498 if (tid_endp) 1499 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1500 free(mrec); 1501 return(0); 1502 } 1503 1504 static void 1505 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1506 { 1507 struct hammer_ioc_pseudofs_rw pfs; 1508 struct hammer_pseudofs_data pfsd; 1509 1510 bzero(&pfs, sizeof(pfs)); 1511 bzero(&pfsd, sizeof(pfsd)); 1512 pfs.pfs_id = pfs_id; 1513 pfs.ondisk = &pfsd; 1514 pfs.bytes = sizeof(pfsd); 1515 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1516 perror("update_pfs_snapshot (read)"); 1517 exit(1); 1518 } 1519 if (pfsd.sync_end_tid != snapshot_tid) { 1520 pfsd.sync_end_tid = snapshot_tid; 1521 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1522 perror("update_pfs_snapshot (rewrite)"); 1523 exit(1); 1524 } 1525 if (VerboseOpt >= 2) { 1526 fprintf(stderr, 1527 "Mirror-write: Completed, updated snapshot " 1528 "to %016jx\n", 1529 (uintmax_t)snapshot_tid); 1530 fflush(stderr); 1531 } 1532 } 1533 } 1534 1535 /* 1536 * Bandwidth-limited write in chunks 1537 */ 1538 static 1539 ssize_t 1540 writebw(int fd, const void *buf, size_t nbytes, 1541 u_int64_t *bwcount, struct timeval *tv1) 1542 { 1543 struct timeval tv2; 1544 size_t n; 1545 ssize_t r; 1546 ssize_t a; 1547 int usec; 1548 1549 a = 0; 1550 r = 0; 1551 while (nbytes) { 1552 if (*bwcount + nbytes > BandwidthOpt) 1553 n = BandwidthOpt - *bwcount; 1554 else 1555 n = nbytes; 1556 if (n) 1557 r = write(fd, buf, n); 1558 if (r >= 0) { 1559 a += r; 1560 nbytes -= r; 1561 buf = (const char *)buf + r; 1562 } 1563 if ((size_t)r != n) 1564 break; 1565 *bwcount += n; 1566 if (*bwcount >= BandwidthOpt) { 1567 gettimeofday(&tv2, NULL); 1568 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1569 (int)(tv2.tv_usec - tv1->tv_usec); 1570 if (usec >= 0 && usec < 1000000) 1571 usleep(1000000 - usec); 1572 gettimeofday(tv1, NULL); 1573 *bwcount -= BandwidthOpt; 1574 } 1575 } 1576 return(a ? a : r); 1577 } 1578 1579 /* 1580 * Get a yes or no answer from the terminal. The program may be run as 1581 * part of a two-way pipe so we cannot use stdin for this operation. 1582 */ 1583 static int 1584 getyn(void) 1585 { 1586 char buf[256]; 1587 FILE *fp; 1588 int result; 1589 1590 fp = fopen("/dev/tty", "r"); 1591 if (fp == NULL) { 1592 fprintf(stderr, "No terminal for response\n"); 1593 return(-1); 1594 } 1595 result = -1; 1596 while (fgets(buf, sizeof(buf), fp) != NULL) { 1597 if (buf[0] == 'y' || buf[0] == 'Y') { 1598 result = 1; 1599 break; 1600 } 1601 if (buf[0] == 'n' || buf[0] == 'N') { 1602 result = 0; 1603 break; 1604 } 1605 fprintf(stderr, "Response not understood\n"); 1606 break; 1607 } 1608 fclose(fp); 1609 return(result); 1610 } 1611 1612 static void 1613 mirror_usage(int code) 1614 { 1615 fprintf(stderr, 1616 "hammer mirror-read <filesystem> [begin-tid]\n" 1617 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1618 "hammer mirror-write <filesystem>\n" 1619 "hammer mirror-dump\n" 1620 "hammer mirror-copy [[user@]host:]<filesystem>" 1621 " [[user@]host:]<filesystem>\n" 1622 "hammer mirror-stream [[user@]host:]<filesystem>" 1623 " [[user@]host:]<filesystem>\n" 1624 ); 1625 exit(code); 1626 } 1627 1628