1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.16 2008/11/09 05:22:56 dillon Exp $ 35 */ 36 37 #include "hammer.h" 38 39 #define SERIALBUF_SIZE (512 * 1024) 40 41 typedef struct histogram { 42 hammer_tid_t tid; 43 u_int64_t bytes; 44 } *histogram_t; 45 46 static int read_mrecords(int fd, char *buf, u_int size, 47 hammer_ioc_mrecord_head_t pickup); 48 static int generate_histogram(int fd, const char *filesystem, 49 histogram_t *histogram_ary, 50 struct hammer_ioc_mirror_rw *mirror_base, 51 int *repeatp); 52 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 53 hammer_ioc_mrecord_head_t pickup); 54 static void write_mrecord(int fdout, u_int32_t type, 55 hammer_ioc_mrecord_any_t mrec, int bytes); 56 static void generate_mrec_header(int fd, int pfs_id, 57 union hammer_ioc_mrecord_any *mrec_tmp); 58 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 59 struct hammer_ioc_mrecord_head *pickup, 60 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 61 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 62 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 63 u_int64_t *bwcount, struct timeval *tv1); 64 static int getyn(void); 65 static void mirror_usage(int code); 66 67 /* 68 * Generate a mirroring data stream from the specific source over the 69 * entire key range, but restricted to the specified transaction range. 70 * 71 * The HAMMER VFS does most of the work, we add a few new mrecord 72 * types to negotiate the TID ranges and verify that the entire 73 * stream made it to the destination. 74 * 75 * streaming will be 0 for mirror-read, 1 for mirror-stream. The code will 76 * set up a fake value of -1 when running the histogram for mirror-read. 77 */ 78 void 79 hammer_cmd_mirror_read(char **av, int ac, int streaming) 80 { 81 struct hammer_ioc_mirror_rw mirror; 82 struct hammer_ioc_pseudofs_rw pfs; 83 union hammer_ioc_mrecord_any mrec_tmp; 84 struct hammer_ioc_mrecord_head pickup; 85 hammer_ioc_mrecord_any_t mrec; 86 hammer_tid_t sync_tid; 87 histogram_t histogram_ary; 88 const char *filesystem; 89 char *buf = malloc(SERIALBUF_SIZE); 90 int interrupted = 0; 91 int error; 92 int fd; 93 int n; 94 int didwork; 95 int histogram; 96 int histindex; 97 int histmax; 98 int repeat = 0; 99 int sameline; 100 int64_t total_bytes; 101 time_t base_t = time(NULL); 102 struct timeval bwtv; 103 u_int64_t bwcount; 104 u_int64_t estbytes; 105 106 if (ac == 0 || ac > 2) 107 mirror_usage(1); 108 filesystem = av[0]; 109 110 pickup.signature = 0; 111 pickup.type = 0; 112 histogram = 0; 113 histindex = 0; 114 histmax = 0; 115 histogram_ary = NULL; 116 sameline = 0; 117 118 again: 119 bzero(&mirror, sizeof(mirror)); 120 hammer_key_beg_init(&mirror.key_beg); 121 hammer_key_end_init(&mirror.key_end); 122 123 fd = getpfs(&pfs, filesystem); 124 125 if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) { 126 fprintf(stderr, "%cRunning \b\b", (sameline ? '\r' : '\n')); 127 fflush(stderr); 128 sameline = 1; 129 } 130 sameline = 1; 131 total_bytes = 0; 132 gettimeofday(&bwtv, NULL); 133 bwcount = 0; 134 135 /* 136 * Send initial header for the purpose of determining the 137 * shared-uuid. 138 */ 139 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 140 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 141 &mrec_tmp, sizeof(mrec_tmp.pfs)); 142 143 /* 144 * In 2-way mode the target will send us a PFS info packet 145 * first. Use the target's current snapshot TID as our default 146 * begin TID. 147 */ 148 if (TwoWayPipeOpt) { 149 mirror.tid_beg = 0; 150 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 151 NULL, &mirror.tid_beg); 152 if (n < 0) { /* got TERM record */ 153 relpfs(fd, &pfs); 154 return; 155 } 156 ++mirror.tid_beg; 157 } else if (streaming && histogram) { 158 mirror.tid_beg = histogram_ary[histindex].tid + 1; 159 } else { 160 mirror.tid_beg = 0; 161 } 162 163 /* 164 * Write out the PFS header, tid_beg will be updated if our PFS 165 * has a larger begin sync. tid_end is set to the latest source 166 * TID whos flush cycle has completed. 167 */ 168 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 169 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 170 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 171 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 172 mirror.ubuf = buf; 173 mirror.size = SERIALBUF_SIZE; 174 mirror.pfs_id = pfs.pfs_id; 175 mirror.shared_uuid = pfs.ondisk->shared_uuid; 176 177 /* 178 * XXX If the histogram is exhausted and the TID delta is large 179 * the stream might have been offline for a while and is 180 * now picking it up again. Do another histogram. 181 */ 182 #if 0 183 if (streaming && histogram && histindex == histend) { 184 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM) 185 histogram = 0; 186 } 187 #endif 188 189 /* 190 * Initial bulk startup control, try to do some incremental 191 * mirroring in order to allow the stream to be killed and 192 * restarted without having to start over. 193 */ 194 if (histogram == 0 && BulkOpt == 0) { 195 if (VerboseOpt && repeat == 0) { 196 fprintf(stderr, "\n"); 197 sameline = 0; 198 } 199 histmax = generate_histogram(fd, filesystem, 200 &histogram_ary, &mirror, 201 &repeat); 202 histindex = 0; 203 histogram = 1; 204 205 /* 206 * Just stream the histogram, then stop 207 */ 208 if (streaming == 0) 209 streaming = -1; 210 } 211 212 if (streaming && histogram) { 213 ++histindex; 214 mirror.tid_end = histogram_ary[histindex].tid; 215 estbytes = histogram_ary[histindex-1].bytes; 216 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end; 217 } else { 218 estbytes = 0; 219 } 220 221 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 222 &mrec_tmp, sizeof(mrec_tmp.pfs)); 223 224 /* 225 * A cycle file overrides the beginning TID only if we are 226 * not operating in two-way or histogram mode. 227 */ 228 if (TwoWayPipeOpt == 0 && histogram == 0) { 229 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 230 } 231 232 /* 233 * An additional argument overrides the beginning TID regardless 234 * of what mode we are in. This is not recommending if operating 235 * in two-way mode. 236 */ 237 if (ac == 2) 238 mirror.tid_beg = strtoull(av[1], NULL, 0); 239 240 if (streaming == 0 || VerboseOpt >= 2) { 241 fprintf(stderr, 242 "Mirror-read: Mirror %016jx to %016jx", 243 (uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end); 244 if (histogram) 245 fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes); 246 fprintf(stderr, "\n"); 247 fflush(stderr); 248 } 249 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 250 fprintf(stderr, "Mirror-read: Resuming at object %016jx\n", 251 (uintmax_t)mirror.key_beg.obj_id); 252 } 253 254 /* 255 * Nothing to do if begin equals end. 256 */ 257 if (mirror.tid_beg >= mirror.tid_end) { 258 if (streaming == 0 || VerboseOpt >= 2) 259 fprintf(stderr, "Mirror-read: No work to do\n"); 260 sleep(DelayOpt); 261 didwork = 0; 262 histogram = 0; 263 goto done; 264 } 265 didwork = 1; 266 267 /* 268 * Write out bulk records 269 */ 270 mirror.ubuf = buf; 271 mirror.size = SERIALBUF_SIZE; 272 273 do { 274 mirror.count = 0; 275 mirror.pfs_id = pfs.pfs_id; 276 mirror.shared_uuid = pfs.ondisk->shared_uuid; 277 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 278 fprintf(stderr, "Mirror-read %s failed: %s\n", 279 filesystem, strerror(errno)); 280 exit(1); 281 } 282 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 283 fprintf(stderr, 284 "Mirror-read %s fatal error %d\n", 285 filesystem, mirror.head.error); 286 exit(1); 287 } 288 if (mirror.count) { 289 if (BandwidthOpt) { 290 n = writebw(1, mirror.ubuf, mirror.count, 291 &bwcount, &bwtv); 292 } else { 293 n = write(1, mirror.ubuf, mirror.count); 294 } 295 if (n != mirror.count) { 296 fprintf(stderr, "Mirror-read %s failed: " 297 "short write\n", 298 filesystem); 299 exit(1); 300 } 301 } 302 total_bytes += mirror.count; 303 if (streaming && VerboseOpt) { 304 fprintf(stderr, 305 "\rscan obj=%016jx tids=%016jx:%016jx %11jd", 306 (uintmax_t)mirror.key_cur.obj_id, 307 (uintmax_t)mirror.tid_beg, 308 (uintmax_t)mirror.tid_end, 309 (intmax_t)total_bytes); 310 fflush(stderr); 311 sameline = 0; 312 } 313 mirror.key_beg = mirror.key_cur; 314 315 /* 316 * Deal with time limit option 317 */ 318 if (TimeoutOpt && 319 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 320 fprintf(stderr, 321 "Mirror-read %s interrupted by timer at" 322 " %016jx\n", 323 filesystem, 324 (uintmax_t)mirror.key_cur.obj_id); 325 interrupted = 1; 326 break; 327 } 328 } while (mirror.count != 0); 329 330 done: 331 if (streaming && VerboseOpt && sameline == 0) { 332 fprintf(stderr, "\n"); 333 fflush(stderr); 334 sameline = 1; 335 } 336 337 /* 338 * Write out the termination sync record - only if not interrupted 339 */ 340 if (interrupted == 0) { 341 if (didwork) { 342 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 343 &mrec_tmp, sizeof(mrec_tmp.sync)); 344 } else { 345 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 346 &mrec_tmp, sizeof(mrec_tmp.sync)); 347 } 348 } 349 350 /* 351 * If the -2 option was given (automatic when doing mirror-copy), 352 * a two-way pipe is assumed and we expect a response mrec from 353 * the target. 354 */ 355 if (TwoWayPipeOpt) { 356 mrec = read_mrecord(0, &error, &pickup); 357 if (mrec == NULL || 358 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 359 mrec->head.rec_size != sizeof(mrec->update)) { 360 fprintf(stderr, "mirror_read: Did not get final " 361 "acknowledgement packet from target\n"); 362 exit(1); 363 } 364 if (interrupted) { 365 if (CyclePath) { 366 hammer_set_cycle(&mirror.key_cur, 367 mirror.tid_beg); 368 fprintf(stderr, "Cyclefile %s updated for " 369 "continuation\n", CyclePath); 370 } 371 } else { 372 sync_tid = mrec->update.tid; 373 if (CyclePath) { 374 hammer_key_beg_init(&mirror.key_beg); 375 hammer_set_cycle(&mirror.key_beg, sync_tid); 376 fprintf(stderr, 377 "Cyclefile %s updated to 0x%016jx\n", 378 CyclePath, (uintmax_t)sync_tid); 379 } 380 } 381 } else if (CyclePath) { 382 /* NOTE! mirror.tid_beg cannot be updated */ 383 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 384 "fully updated unless you use mirror-copy\n"); 385 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 386 } 387 if (streaming && interrupted == 0) { 388 time_t t1 = time(NULL); 389 time_t t2; 390 391 /* 392 * Try to break down large bulk transfers into smaller ones 393 * so it can sync the transaction id on the slave. This 394 * way if we get interrupted a restart doesn't have to 395 * start from scratch. 396 */ 397 if (streaming && histogram) { 398 if (histindex != histmax) { 399 if (VerboseOpt && VerboseOpt < 2 && 400 streaming >= 0) { 401 fprintf(stderr, " (bulk incremental)"); 402 } 403 relpfs(fd, &pfs); 404 goto again; 405 } 406 } 407 408 if (VerboseOpt && streaming >= 0) { 409 fprintf(stderr, " W"); 410 fflush(stderr); 411 } 412 pfs.ondisk->sync_end_tid = mirror.tid_end; 413 if (streaming < 0) { 414 /* 415 * Fake streaming mode when using a histogram to 416 * break up a mirror-read, do not wait on source. 417 */ 418 streaming = 0; 419 } else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 420 fprintf(stderr, "Mirror-read %s: cannot stream: %s\n", 421 filesystem, strerror(errno)); 422 } else { 423 t2 = time(NULL) - t1; 424 if (t2 >= 0 && t2 < DelayOpt) { 425 if (VerboseOpt) { 426 fprintf(stderr, "\bD"); 427 fflush(stderr); 428 } 429 sleep(DelayOpt - t2); 430 } 431 if (VerboseOpt) { 432 fprintf(stderr, "\b "); 433 fflush(stderr); 434 } 435 relpfs(fd, &pfs); 436 goto again; 437 } 438 } 439 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 440 &mrec_tmp, sizeof(mrec_tmp.sync)); 441 relpfs(fd, &pfs); 442 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 443 } 444 445 /* 446 * What we are trying to do here is figure out how much data is 447 * going to be sent for the TID range and to break the TID range 448 * down into reasonably-sized slices (from the point of view of 449 * data sent) so a lost connection can restart at a reasonable 450 * place and not all the way back at the beginning. 451 * 452 * An entry's TID serves as the end_tid for the prior entry 453 * So we have to offset the calculation by 1 so that TID falls into 454 * the previous entry when populating entries. 455 * 456 * Because the transaction id space is bursty we need a relatively 457 * large number of buckets (like a million) to do a reasonable job 458 * for things like an initial bulk mirrors on a very large filesystem. 459 */ 460 #define HIST_COUNT (1024 * 1024) 461 462 static int 463 generate_histogram(int fd, const char *filesystem, 464 histogram_t *histogram_ary, 465 struct hammer_ioc_mirror_rw *mirror_base, 466 int *repeatp) 467 { 468 struct hammer_ioc_mirror_rw mirror; 469 union hammer_ioc_mrecord_any *mrec; 470 hammer_tid_t tid_beg; 471 hammer_tid_t tid_end; 472 hammer_tid_t tid; 473 hammer_tid_t tidx; 474 u_int64_t *tid_bytes; 475 u_int64_t total; 476 u_int64_t accum; 477 int i; 478 int res; 479 int off; 480 int len; 481 482 mirror = *mirror_base; 483 tid_beg = mirror.tid_beg; 484 tid_end = mirror.tid_end; 485 mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA; 486 487 if (*histogram_ary == NULL) { 488 *histogram_ary = malloc(sizeof(struct histogram) * 489 (HIST_COUNT + 2)); 490 } 491 if (tid_beg >= tid_end) 492 return(0); 493 494 /* needs 2 extra */ 495 tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2)); 496 bzero(tid_bytes, sizeof(tid_bytes)); 497 498 if (*repeatp == 0) { 499 fprintf(stderr, "Prescan to break up bulk transfer"); 500 if (VerboseOpt > 1) 501 fprintf(stderr, " (%juMB chunks)", 502 (uintmax_t)(SplitupOpt / (1024 * 1024))); 503 fprintf(stderr, "\n"); 504 } 505 506 /* 507 * Note: (tid_beg,tid_end), range is inclusive of both beg & end. 508 * 509 * Note: Estimates can be off when the mirror is way behind due 510 * to skips. 511 */ 512 total = 0; 513 accum = 0; 514 for (;;) { 515 mirror.count = 0; 516 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 517 fprintf(stderr, "Mirror-read %s failed: %s\n", 518 filesystem, strerror(errno)); 519 exit(1); 520 } 521 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 522 fprintf(stderr, 523 "Mirror-read %s fatal error %d\n", 524 filesystem, mirror.head.error); 525 exit(1); 526 } 527 for (off = 0; 528 off < mirror.count; 529 off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size) 530 ) { 531 mrec = (void *)((char *)mirror.ubuf + off); 532 533 /* 534 * We only care about general RECs and PASS 535 * records. We ignore SKIPs. 536 */ 537 switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) { 538 case HAMMER_MREC_TYPE_REC: 539 case HAMMER_MREC_TYPE_PASS: 540 break; 541 default: 542 continue; 543 } 544 545 /* 546 * Calculate for two indices, create_tid and 547 * delete_tid. Record data only applies to 548 * the create_tid. 549 * 550 * When tid is exactly on the boundary it really 551 * belongs to the previous entry because scans 552 * are inclusive of the ending entry. 553 */ 554 tid = mrec->rec.leaf.base.delete_tid; 555 if (tid && tid >= tid_beg && tid <= tid_end) { 556 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 557 if (mrec->head.type == 558 HAMMER_MREC_TYPE_REC) { 559 len -= HAMMER_HEAD_DOALIGN( 560 mrec->rec.leaf.data_len); 561 assert(len > 0); 562 } 563 i = (tid - tid_beg) * HIST_COUNT / 564 (tid_end - tid_beg); 565 tidx = tid_beg + i * (tid_end - tid_beg) / 566 HIST_COUNT; 567 if (tid == tidx && i) 568 --i; 569 assert(i >= 0 && i < HIST_COUNT); 570 tid_bytes[i] += len; 571 total += len; 572 accum += len; 573 } 574 575 tid = mrec->rec.leaf.base.create_tid; 576 if (tid && tid >= tid_beg && tid <= tid_end) { 577 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 578 if (mrec->head.type == 579 HAMMER_MREC_TYPE_REC_NODATA) { 580 len += HAMMER_HEAD_DOALIGN( 581 mrec->rec.leaf.data_len); 582 } 583 i = (tid - tid_beg) * HIST_COUNT / 584 (tid_end - tid_beg); 585 tidx = tid_beg + i * (tid_end - tid_beg) / 586 HIST_COUNT; 587 if (tid == tidx && i) 588 --i; 589 assert(i >= 0 && i < HIST_COUNT); 590 tid_bytes[i] += len; 591 total += len; 592 accum += len; 593 } 594 } 595 if (VerboseOpt > 1) { 596 if (*repeatp == 0 && accum > SplitupOpt) { 597 fprintf(stderr, "."); 598 fflush(stderr); 599 accum = 0; 600 } 601 } 602 if (mirror.count == 0) 603 break; 604 mirror.key_beg = mirror.key_cur; 605 } 606 607 /* 608 * Reduce to SplitupOpt (default 100MB) chunks. This code may 609 * use up to two additional elements. Do the array in-place. 610 * 611 * Inefficient degenerate cases can occur if we do not accumulate 612 * at least the requested split amount, so error on the side of 613 * going over a bit. 614 */ 615 res = 0; 616 (*histogram_ary)[res].tid = tid_beg; 617 (*histogram_ary)[res].bytes = tid_bytes[0]; 618 for (i = 1; i < HIST_COUNT; ++i) { 619 if ((*histogram_ary)[res].bytes >= SplitupOpt) { 620 ++res; 621 (*histogram_ary)[res].tid = tid_beg + 622 i * (tid_end - tid_beg) / 623 HIST_COUNT; 624 (*histogram_ary)[res].bytes = 0; 625 626 } 627 (*histogram_ary)[res].bytes += tid_bytes[i]; 628 } 629 ++res; 630 (*histogram_ary)[res].tid = tid_end; 631 (*histogram_ary)[res].bytes = -1; 632 633 if (*repeatp == 0) { 634 if (VerboseOpt > 1) 635 fprintf(stderr, "\n"); /* newline after ... */ 636 fprintf(stderr, "Prescan %d chunks, total %ju MBytes (", 637 res, (uintmax_t)total / (1024 * 1024)); 638 for (i = 0; i < res && i < 3; ++i) { 639 if (i) 640 fprintf(stderr, ", "); 641 fprintf(stderr, "%ju", 642 (uintmax_t)(*histogram_ary)[i].bytes); 643 } 644 if (i < res) 645 fprintf(stderr, ", ..."); 646 fprintf(stderr, ")\n"); 647 } 648 assert(res <= HIST_COUNT); 649 *repeatp = 1; 650 651 free(tid_bytes); 652 return(res); 653 } 654 655 static void 656 create_pfs(const char *filesystem, uuid_t *s_uuid) 657 { 658 if (ForceYesOpt == 1) { 659 fprintf(stderr, "PFS slave %s does not exist. " 660 "Auto create new slave PFS!\n", filesystem); 661 662 } else { 663 fprintf(stderr, "PFS slave %s does not exist.\n" 664 "Do you want to create a new slave PFS? (yes|no) ", 665 filesystem); 666 fflush(stderr); 667 if (getyn() != 1) { 668 fprintf(stderr, "Aborting operation\n"); 669 exit(1); 670 } 671 } 672 673 u_int32_t status; 674 char *shared_uuid = NULL; 675 uuid_to_string(s_uuid, &shared_uuid, &status); 676 677 char *cmd = NULL; 678 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 679 filesystem, shared_uuid); 680 free(shared_uuid); 681 682 if (cmd == NULL) { 683 fprintf(stderr, "Failed to alloc memory\n"); 684 exit(1); 685 } 686 if (system(cmd) != 0) { 687 fprintf(stderr, "Failed to create PFS\n"); 688 } 689 free(cmd); 690 } 691 692 /* 693 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 694 * some additional packet types to negotiate TID ranges and to verify 695 * completion. The HAMMER VFS does most of the work. 696 * 697 * It is important to note that the mirror.key_{beg,end} range must 698 * match the ranged used by the original. For now both sides use 699 * range the entire key space. 700 * 701 * It is even more important that the records in the stream conform 702 * to the TID range also supplied in the stream. The HAMMER VFS will 703 * use the REC, PASS, and SKIP record types to track the portions of 704 * the B-Tree being scanned in order to be able to proactively delete 705 * records on the target within those active areas that are not mentioned 706 * by the source. 707 * 708 * The mirror.key_cur field is used by the VFS to do this tracking. It 709 * must be initialized to key_beg but then is persistently updated by 710 * the HAMMER VFS on each successive ioctl() call. If you blow up this 711 * field you will blow up the mirror target, possibly to the point of 712 * deleting everything. As a safety measure the HAMMER VFS simply marks 713 * the records that the source has destroyed as deleted on the target, 714 * and normal pruning operations will deal with their final disposition 715 * at some later time. 716 */ 717 void 718 hammer_cmd_mirror_write(char **av, int ac) 719 { 720 struct hammer_ioc_mirror_rw mirror; 721 const char *filesystem; 722 char *buf = malloc(SERIALBUF_SIZE); 723 struct hammer_ioc_pseudofs_rw pfs; 724 struct hammer_ioc_mrecord_head pickup; 725 struct hammer_ioc_synctid synctid; 726 union hammer_ioc_mrecord_any mrec_tmp; 727 hammer_ioc_mrecord_any_t mrec; 728 struct stat st; 729 int error; 730 int fd; 731 int n; 732 733 if (ac != 1) 734 mirror_usage(1); 735 filesystem = av[0]; 736 737 pickup.signature = 0; 738 pickup.type = 0; 739 740 again: 741 bzero(&mirror, sizeof(mirror)); 742 hammer_key_beg_init(&mirror.key_beg); 743 hammer_key_end_init(&mirror.key_end); 744 mirror.key_end = mirror.key_beg; 745 746 /* 747 * Read initial packet 748 */ 749 mrec = read_mrecord(0, &error, &pickup); 750 if (mrec == NULL) { 751 if (error == 0) 752 fprintf(stderr, "validate_mrec_header: short read\n"); 753 exit(1); 754 } 755 /* 756 * Validate packet 757 */ 758 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 759 return; 760 } 761 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 762 fprintf(stderr, "validate_mrec_header: did not get expected " 763 "PFSD record type\n"); 764 exit(1); 765 } 766 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 767 fprintf(stderr, "validate_mrec_header: unexpected payload " 768 "size\n"); 769 exit(1); 770 } 771 772 /* 773 * Create slave PFS if it doesn't yet exist 774 */ 775 if (lstat(filesystem, &st) != 0) { 776 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 777 } 778 free(mrec); 779 mrec = NULL; 780 781 fd = getpfs(&pfs, filesystem); 782 783 /* 784 * In two-way mode the target writes out a PFS packet first. 785 * The source uses our tid_end as its tid_beg by default, 786 * picking up where it left off. 787 */ 788 mirror.tid_beg = 0; 789 if (TwoWayPipeOpt) { 790 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 791 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 792 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 793 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 794 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 795 &mrec_tmp, sizeof(mrec_tmp.pfs)); 796 } 797 798 /* 799 * Read and process the PFS header. The source informs us of 800 * the TID range the stream represents. 801 */ 802 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 803 &mirror.tid_beg, &mirror.tid_end); 804 if (n < 0) { /* got TERM record */ 805 relpfs(fd, &pfs); 806 return; 807 } 808 809 mirror.ubuf = buf; 810 mirror.size = SERIALBUF_SIZE; 811 812 /* 813 * Read and process bulk records (REC, PASS, and SKIP types). 814 * 815 * On your life, do NOT mess with mirror.key_cur or your mirror 816 * target may become history. 817 */ 818 for (;;) { 819 mirror.count = 0; 820 mirror.pfs_id = pfs.pfs_id; 821 mirror.shared_uuid = pfs.ondisk->shared_uuid; 822 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 823 if (mirror.size <= 0) 824 break; 825 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 826 fprintf(stderr, "Mirror-write %s failed: %s\n", 827 filesystem, strerror(errno)); 828 exit(1); 829 } 830 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 831 fprintf(stderr, 832 "Mirror-write %s fatal error %d\n", 833 filesystem, mirror.head.error); 834 exit(1); 835 } 836 #if 0 837 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 838 fprintf(stderr, 839 "Mirror-write %s interrupted by timer at" 840 " %016llx\n", 841 filesystem, 842 mirror.key_cur.obj_id); 843 exit(0); 844 } 845 #endif 846 } 847 848 /* 849 * Read and process the termination sync record. 850 */ 851 mrec = read_mrecord(0, &error, &pickup); 852 853 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 854 fprintf(stderr, "Mirror-write: received termination request\n"); 855 free(mrec); 856 return; 857 } 858 859 if (mrec == NULL || 860 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 861 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 862 mrec->head.rec_size != sizeof(mrec->sync)) { 863 fprintf(stderr, "Mirror-write %s: Did not get termination " 864 "sync record, or rec_size is wrong rt=%d\n", 865 filesystem, mrec->head.type); 866 exit(1); 867 } 868 869 /* 870 * Update the PFS info on the target so the user has visibility 871 * into the new snapshot, and sync the target filesystem. 872 */ 873 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 874 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 875 876 bzero(&synctid, sizeof(synctid)); 877 synctid.op = HAMMER_SYNCTID_SYNC2; 878 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 879 880 if (VerboseOpt >= 2) { 881 fprintf(stderr, "Mirror-write %s: succeeded\n", 882 filesystem); 883 } 884 } 885 886 free(mrec); 887 mrec = NULL; 888 889 /* 890 * Report back to the originator. 891 */ 892 if (TwoWayPipeOpt) { 893 mrec_tmp.update.tid = mirror.tid_end; 894 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 895 &mrec_tmp, sizeof(mrec_tmp.update)); 896 } else { 897 printf("Source can update synctid to 0x%016jx\n", 898 (uintmax_t)mirror.tid_end); 899 } 900 relpfs(fd, &pfs); 901 goto again; 902 } 903 904 void 905 hammer_cmd_mirror_dump(void) 906 { 907 char *buf = malloc(SERIALBUF_SIZE); 908 struct hammer_ioc_mrecord_head pickup; 909 hammer_ioc_mrecord_any_t mrec; 910 int error; 911 int size; 912 int offset; 913 int bytes; 914 915 /* 916 * Read and process the PFS header 917 */ 918 pickup.signature = 0; 919 pickup.type = 0; 920 921 mrec = read_mrecord(0, &error, &pickup); 922 923 again: 924 /* 925 * Read and process bulk records 926 */ 927 for (;;) { 928 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 929 if (size <= 0) 930 break; 931 offset = 0; 932 while (offset < size) { 933 mrec = (void *)((char *)buf + offset); 934 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 935 if (offset + bytes > size) { 936 fprintf(stderr, "Misaligned record\n"); 937 exit(1); 938 } 939 940 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 941 case HAMMER_MREC_TYPE_REC_BADCRC: 942 case HAMMER_MREC_TYPE_REC: 943 printf("Record obj=%016jx key=%016jx " 944 "rt=%02x ot=%02x", 945 (uintmax_t)mrec->rec.leaf.base.obj_id, 946 (uintmax_t)mrec->rec.leaf.base.key, 947 mrec->rec.leaf.base.rec_type, 948 mrec->rec.leaf.base.obj_type); 949 if (mrec->head.type == 950 HAMMER_MREC_TYPE_REC_BADCRC) { 951 printf(" (BAD CRC)"); 952 } 953 printf("\n"); 954 printf(" tids %016jx:%016jx data=%d\n", 955 (uintmax_t)mrec->rec.leaf.base.create_tid, 956 (uintmax_t)mrec->rec.leaf.base.delete_tid, 957 mrec->rec.leaf.data_len); 958 break; 959 case HAMMER_MREC_TYPE_PASS: 960 printf("Pass obj=%016jx key=%016jx " 961 "rt=%02x ot=%02x\n", 962 (uintmax_t)mrec->rec.leaf.base.obj_id, 963 (uintmax_t)mrec->rec.leaf.base.key, 964 mrec->rec.leaf.base.rec_type, 965 mrec->rec.leaf.base.obj_type); 966 printf(" tids %016jx:%016jx data=%d\n", 967 (uintmax_t)mrec->rec.leaf.base.create_tid, 968 (uintmax_t)mrec->rec.leaf.base.delete_tid, 969 mrec->rec.leaf.data_len); 970 break; 971 case HAMMER_MREC_TYPE_SKIP: 972 printf("Skip obj=%016jx key=%016jx rt=%02x to\n" 973 " obj=%016jx key=%016jx rt=%02x\n", 974 (uintmax_t)mrec->skip.skip_beg.obj_id, 975 (uintmax_t)mrec->skip.skip_beg.key, 976 mrec->skip.skip_beg.rec_type, 977 (uintmax_t)mrec->skip.skip_end.obj_id, 978 (uintmax_t)mrec->skip.skip_end.key, 979 mrec->skip.skip_end.rec_type); 980 default: 981 break; 982 } 983 offset += bytes; 984 } 985 } 986 987 /* 988 * Read and process the termination sync record. 989 */ 990 mrec = read_mrecord(0, &error, &pickup); 991 if (mrec == NULL || 992 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 993 mrec->head.type != HAMMER_MREC_TYPE_IDLE) 994 ) { 995 fprintf(stderr, "Mirror-dump: Did not get termination " 996 "sync record\n"); 997 } 998 999 /* 1000 * Continue with more batches until EOF. 1001 */ 1002 mrec = read_mrecord(0, &error, &pickup); 1003 if (mrec) 1004 goto again; 1005 } 1006 1007 void 1008 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 1009 { 1010 pid_t pid1; 1011 pid_t pid2; 1012 int fds[2]; 1013 const char *xav[16]; 1014 char tbuf[16]; 1015 char *ptr; 1016 int xac; 1017 1018 if (ac != 2) 1019 mirror_usage(1); 1020 1021 TwoWayPipeOpt = 1; 1022 signal(SIGPIPE, SIG_IGN); 1023 1024 again: 1025 if (pipe(fds) < 0) { 1026 perror("pipe"); 1027 exit(1); 1028 } 1029 1030 /* 1031 * Source 1032 */ 1033 if ((pid1 = fork()) == 0) { 1034 signal(SIGPIPE, SIG_DFL); 1035 dup2(fds[0], 0); 1036 dup2(fds[0], 1); 1037 close(fds[0]); 1038 close(fds[1]); 1039 if ((ptr = strchr(av[0], ':')) != NULL) { 1040 *ptr++ = 0; 1041 xac = 0; 1042 xav[xac++] = "ssh"; 1043 if (CompressOpt) 1044 xav[xac++] = "-C"; 1045 if (SshPort) { 1046 xav[xac++] = "-p"; 1047 xav[xac++] = SshPort; 1048 } 1049 xav[xac++] = av[0]; 1050 xav[xac++] = "hammer"; 1051 1052 switch(VerboseOpt) { 1053 case 0: 1054 break; 1055 case 1: 1056 xav[xac++] = "-v"; 1057 break; 1058 case 2: 1059 xav[xac++] = "-vv"; 1060 break; 1061 default: 1062 xav[xac++] = "-vvv"; 1063 break; 1064 } 1065 if (ForceYesOpt) { 1066 xav[xac++] = "-y"; 1067 } 1068 xav[xac++] = "-2"; 1069 if (TimeoutOpt) { 1070 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 1071 xav[xac++] = "-t"; 1072 xav[xac++] = tbuf; 1073 } 1074 if (streaming) 1075 xav[xac++] = "mirror-read-stream"; 1076 else 1077 xav[xac++] = "mirror-read"; 1078 xav[xac++] = ptr; 1079 xav[xac++] = NULL; 1080 execv("/usr/bin/ssh", (void *)xav); 1081 } else { 1082 hammer_cmd_mirror_read(av, 1, streaming); 1083 fflush(stdout); 1084 fflush(stderr); 1085 } 1086 _exit(1); 1087 } 1088 1089 /* 1090 * Target 1091 */ 1092 if ((pid2 = fork()) == 0) { 1093 signal(SIGPIPE, SIG_DFL); 1094 dup2(fds[1], 0); 1095 dup2(fds[1], 1); 1096 close(fds[0]); 1097 close(fds[1]); 1098 if ((ptr = strchr(av[1], ':')) != NULL) { 1099 *ptr++ = 0; 1100 xac = 0; 1101 xav[xac++] = "ssh"; 1102 if (CompressOpt) 1103 xav[xac++] = "-C"; 1104 if (SshPort) { 1105 xav[xac++] = "-p"; 1106 xav[xac++] = SshPort; 1107 } 1108 xav[xac++] = av[1]; 1109 xav[xac++] = "hammer"; 1110 1111 switch(VerboseOpt) { 1112 case 0: 1113 break; 1114 case 1: 1115 xav[xac++] = "-v"; 1116 break; 1117 case 2: 1118 xav[xac++] = "-vv"; 1119 break; 1120 default: 1121 xav[xac++] = "-vvv"; 1122 break; 1123 } 1124 if (ForceYesOpt) { 1125 xav[xac++] = "-y"; 1126 } 1127 xav[xac++] = "-2"; 1128 xav[xac++] = "mirror-write"; 1129 xav[xac++] = ptr; 1130 xav[xac++] = NULL; 1131 execv("/usr/bin/ssh", (void *)xav); 1132 } else { 1133 hammer_cmd_mirror_write(av + 1, 1); 1134 fflush(stdout); 1135 fflush(stderr); 1136 } 1137 _exit(1); 1138 } 1139 close(fds[0]); 1140 close(fds[1]); 1141 1142 while (waitpid(pid1, NULL, 0) <= 0) 1143 ; 1144 while (waitpid(pid2, NULL, 0) <= 0) 1145 ; 1146 1147 /* 1148 * If the link is lost restart 1149 */ 1150 if (streaming) { 1151 if (VerboseOpt) { 1152 fprintf(stderr, "\nLost Link\n"); 1153 fflush(stderr); 1154 } 1155 sleep(15 + DelayOpt); 1156 goto again; 1157 } 1158 1159 } 1160 1161 /* 1162 * Read and return multiple mrecords 1163 */ 1164 static int 1165 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 1166 { 1167 hammer_ioc_mrecord_any_t mrec; 1168 u_int count; 1169 size_t n; 1170 size_t i; 1171 size_t bytes; 1172 int type; 1173 1174 count = 0; 1175 while (size - count >= HAMMER_MREC_HEADSIZE) { 1176 /* 1177 * Cached the record header in case we run out of buffer 1178 * space. 1179 */ 1180 fflush(stdout); 1181 if (pickup->signature == 0) { 1182 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1183 i = read(fd, (char *)pickup + n, 1184 HAMMER_MREC_HEADSIZE - n); 1185 if (i <= 0) 1186 break; 1187 } 1188 if (n == 0) 1189 break; 1190 if (n != HAMMER_MREC_HEADSIZE) { 1191 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1192 exit(1); 1193 } 1194 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1195 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1196 "bad signature\n"); 1197 exit(1); 1198 } 1199 } 1200 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 1201 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 1202 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1203 "illegal rec_size\n"); 1204 exit(1); 1205 } 1206 1207 /* 1208 * Stop if we have insufficient space for the record and data. 1209 */ 1210 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 1211 if (size - count < bytes) 1212 break; 1213 1214 /* 1215 * Stop if the record type is not a REC, SKIP, or PASS, 1216 * which are the only types the ioctl supports. Other types 1217 * are used only by the userland protocol. 1218 * 1219 * Ignore all flags. 1220 */ 1221 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 1222 if (type != HAMMER_MREC_TYPE_PFSD && 1223 type != HAMMER_MREC_TYPE_REC && 1224 type != HAMMER_MREC_TYPE_SKIP && 1225 type != HAMMER_MREC_TYPE_PASS) { 1226 break; 1227 } 1228 1229 /* 1230 * Read the remainder and clear the pickup signature. 1231 */ 1232 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 1233 i = read(fd, buf + count + n, bytes - n); 1234 if (i <= 0) 1235 break; 1236 } 1237 if (n != bytes) { 1238 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1239 exit(1); 1240 } 1241 1242 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 1243 pickup->signature = 0; 1244 pickup->type = 0; 1245 mrec = (void *)(buf + count); 1246 1247 /* 1248 * Validate the completed record 1249 */ 1250 if (mrec->head.rec_crc != 1251 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1252 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1253 fprintf(stderr, "read_mrecords: malformed record " 1254 "on pipe, bad crc\n"); 1255 exit(1); 1256 } 1257 1258 /* 1259 * If its a B-Tree record validate the data crc. 1260 * 1261 * NOTE: If the VFS passes us an explicitly errorde mrec 1262 * we just pass it through. 1263 */ 1264 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 1265 1266 if (type == HAMMER_MREC_TYPE_REC) { 1267 if (mrec->head.rec_size < 1268 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 1269 fprintf(stderr, 1270 "read_mrecords: malformed record on " 1271 "pipe, illegal element data_len\n"); 1272 exit(1); 1273 } 1274 if (mrec->rec.leaf.data_len && 1275 mrec->rec.leaf.data_offset && 1276 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 1277 fprintf(stderr, 1278 "read_mrecords: data_crc did not " 1279 "match data! obj=%016jx key=%016jx\n", 1280 (uintmax_t)mrec->rec.leaf.base.obj_id, 1281 (uintmax_t)mrec->rec.leaf.base.key); 1282 fprintf(stderr, 1283 "continuing, but there are problems\n"); 1284 } 1285 } 1286 count += bytes; 1287 } 1288 return(count); 1289 } 1290 1291 /* 1292 * Read and return a single mrecord. 1293 */ 1294 static 1295 hammer_ioc_mrecord_any_t 1296 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 1297 { 1298 hammer_ioc_mrecord_any_t mrec; 1299 struct hammer_ioc_mrecord_head mrechd; 1300 size_t bytes; 1301 size_t n; 1302 size_t i; 1303 1304 if (pickup && pickup->type != 0) { 1305 mrechd = *pickup; 1306 pickup->signature = 0; 1307 pickup->type = 0; 1308 n = HAMMER_MREC_HEADSIZE; 1309 } else { 1310 /* 1311 * Read in the PFSD header from the sender. 1312 */ 1313 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1314 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 1315 if (i <= 0) 1316 break; 1317 } 1318 if (n == 0) { 1319 *errorp = 0; /* EOF */ 1320 return(NULL); 1321 } 1322 if (n != HAMMER_MREC_HEADSIZE) { 1323 fprintf(stderr, "short read of mrecord header\n"); 1324 *errorp = EPIPE; 1325 return(NULL); 1326 } 1327 } 1328 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1329 fprintf(stderr, "read_mrecord: bad signature\n"); 1330 *errorp = EINVAL; 1331 return(NULL); 1332 } 1333 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 1334 assert(bytes >= sizeof(mrechd)); 1335 mrec = malloc(bytes); 1336 mrec->head = mrechd; 1337 1338 while (n < bytes) { 1339 i = read(fdin, (char *)mrec + n, bytes - n); 1340 if (i <= 0) 1341 break; 1342 n += i; 1343 } 1344 if (n != bytes) { 1345 fprintf(stderr, "read_mrecord: short read on payload\n"); 1346 *errorp = EPIPE; 1347 return(NULL); 1348 } 1349 if (mrec->head.rec_crc != 1350 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1351 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1352 fprintf(stderr, "read_mrecord: bad CRC\n"); 1353 *errorp = EINVAL; 1354 return(NULL); 1355 } 1356 *errorp = 0; 1357 return(mrec); 1358 } 1359 1360 static 1361 void 1362 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec, 1363 int bytes) 1364 { 1365 char zbuf[HAMMER_HEAD_ALIGN]; 1366 int pad; 1367 1368 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 1369 1370 assert(bytes >= (int)sizeof(mrec->head)); 1371 bzero(&mrec->head, sizeof(mrec->head)); 1372 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 1373 mrec->head.type = type; 1374 mrec->head.rec_size = bytes; 1375 mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1376 bytes - HAMMER_MREC_CRCOFF); 1377 if (write(fdout, mrec, bytes) != bytes) { 1378 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1379 errno, strerror(errno)); 1380 exit(1); 1381 } 1382 if (pad) { 1383 bzero(zbuf, pad); 1384 if (write(fdout, zbuf, pad) != pad) { 1385 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1386 errno, strerror(errno)); 1387 exit(1); 1388 } 1389 } 1390 } 1391 1392 /* 1393 * Generate a mirroring header with the pfs information of the 1394 * originating filesytem. 1395 */ 1396 static void 1397 generate_mrec_header(int fd, int pfs_id, 1398 union hammer_ioc_mrecord_any *mrec_tmp) 1399 { 1400 struct hammer_ioc_pseudofs_rw pfs; 1401 1402 bzero(&pfs, sizeof(pfs)); 1403 bzero(mrec_tmp, sizeof(*mrec_tmp)); 1404 pfs.pfs_id = pfs_id; 1405 pfs.ondisk = &mrec_tmp->pfs.pfsd; 1406 pfs.bytes = sizeof(mrec_tmp->pfs.pfsd); 1407 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1408 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 1409 exit(1); 1410 } 1411 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1412 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n"); 1413 exit(1); 1414 } 1415 mrec_tmp->pfs.version = pfs.version; 1416 } 1417 1418 /* 1419 * Validate the pfs information from the originating filesystem 1420 * against the target filesystem. shared_uuid must match. 1421 * 1422 * return -1 if we got a TERM record 1423 */ 1424 static int 1425 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1426 struct hammer_ioc_mrecord_head *pickup, 1427 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1428 { 1429 struct hammer_ioc_pseudofs_rw pfs; 1430 struct hammer_pseudofs_data pfsd; 1431 hammer_ioc_mrecord_any_t mrec; 1432 int error; 1433 1434 /* 1435 * Get the PFSD info from the target filesystem. 1436 */ 1437 bzero(&pfs, sizeof(pfs)); 1438 bzero(&pfsd, sizeof(pfsd)); 1439 pfs.pfs_id = pfs_id; 1440 pfs.ondisk = &pfsd; 1441 pfs.bytes = sizeof(pfsd); 1442 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1443 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 1444 exit(1); 1445 } 1446 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1447 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n"); 1448 exit(1); 1449 } 1450 1451 mrec = read_mrecord(fdin, &error, pickup); 1452 if (mrec == NULL) { 1453 if (error == 0) 1454 fprintf(stderr, "validate_mrec_header: short read\n"); 1455 exit(1); 1456 } 1457 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1458 free(mrec); 1459 return(-1); 1460 } 1461 1462 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1463 fprintf(stderr, "validate_mrec_header: did not get expected " 1464 "PFSD record type\n"); 1465 exit(1); 1466 } 1467 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1468 fprintf(stderr, "validate_mrec_header: unexpected payload " 1469 "size\n"); 1470 exit(1); 1471 } 1472 if (mrec->pfs.version != pfs.version) { 1473 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 1474 exit(1); 1475 } 1476 1477 /* 1478 * Whew. Ok, is the read PFS info compatible with the target? 1479 */ 1480 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1481 sizeof(pfsd.shared_uuid)) != 0) { 1482 fprintf(stderr, 1483 "mirror-write: source and target have " 1484 "different shared-uuid's!\n"); 1485 exit(1); 1486 } 1487 if (is_target && 1488 (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) { 1489 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1490 exit(1); 1491 } 1492 if (tid_begp) 1493 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1494 if (tid_endp) 1495 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1496 free(mrec); 1497 return(0); 1498 } 1499 1500 static void 1501 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1502 { 1503 struct hammer_ioc_pseudofs_rw pfs; 1504 struct hammer_pseudofs_data pfsd; 1505 1506 bzero(&pfs, sizeof(pfs)); 1507 bzero(&pfsd, sizeof(pfsd)); 1508 pfs.pfs_id = pfs_id; 1509 pfs.ondisk = &pfsd; 1510 pfs.bytes = sizeof(pfsd); 1511 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1512 perror("update_pfs_snapshot (read)"); 1513 exit(1); 1514 } 1515 if (pfsd.sync_end_tid != snapshot_tid) { 1516 pfsd.sync_end_tid = snapshot_tid; 1517 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1518 perror("update_pfs_snapshot (rewrite)"); 1519 exit(1); 1520 } 1521 if (VerboseOpt >= 2) { 1522 fprintf(stderr, 1523 "Mirror-write: Completed, updated snapshot " 1524 "to %016jx\n", 1525 (uintmax_t)snapshot_tid); 1526 fflush(stderr); 1527 } 1528 } 1529 } 1530 1531 /* 1532 * Bandwidth-limited write in chunks 1533 */ 1534 static 1535 ssize_t 1536 writebw(int fd, const void *buf, size_t nbytes, 1537 u_int64_t *bwcount, struct timeval *tv1) 1538 { 1539 struct timeval tv2; 1540 size_t n; 1541 ssize_t r; 1542 ssize_t a; 1543 int usec; 1544 1545 a = 0; 1546 r = 0; 1547 while (nbytes) { 1548 if (*bwcount + nbytes > BandwidthOpt) 1549 n = BandwidthOpt - *bwcount; 1550 else 1551 n = nbytes; 1552 if (n) 1553 r = write(fd, buf, n); 1554 if (r >= 0) { 1555 a += r; 1556 nbytes -= r; 1557 buf = (const char *)buf + r; 1558 } 1559 if ((size_t)r != n) 1560 break; 1561 *bwcount += n; 1562 if (*bwcount >= BandwidthOpt) { 1563 gettimeofday(&tv2, NULL); 1564 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1565 (int)(tv2.tv_usec - tv1->tv_usec); 1566 if (usec >= 0 && usec < 1000000) 1567 usleep(1000000 - usec); 1568 gettimeofday(tv1, NULL); 1569 *bwcount -= BandwidthOpt; 1570 } 1571 } 1572 return(a ? a : r); 1573 } 1574 1575 /* 1576 * Get a yes or no answer from the terminal. The program may be run as 1577 * part of a two-way pipe so we cannot use stdin for this operation. 1578 */ 1579 static int 1580 getyn(void) 1581 { 1582 char buf[256]; 1583 FILE *fp; 1584 int result; 1585 1586 fp = fopen("/dev/tty", "r"); 1587 if (fp == NULL) { 1588 fprintf(stderr, "No terminal for response\n"); 1589 return(-1); 1590 } 1591 result = -1; 1592 while (fgets(buf, sizeof(buf), fp) != NULL) { 1593 if (buf[0] == 'y' || buf[0] == 'Y') { 1594 result = 1; 1595 break; 1596 } 1597 if (buf[0] == 'n' || buf[0] == 'N') { 1598 result = 0; 1599 break; 1600 } 1601 fprintf(stderr, "Response not understood\n"); 1602 break; 1603 } 1604 fclose(fp); 1605 return(result); 1606 } 1607 1608 static void 1609 mirror_usage(int code) 1610 { 1611 fprintf(stderr, 1612 "hammer mirror-read <filesystem> [begin-tid]\n" 1613 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1614 "hammer mirror-write <filesystem>\n" 1615 "hammer mirror-dump\n" 1616 "hammer mirror-copy [[user@]host:]<filesystem>" 1617 " [[user@]host:]<filesystem>\n" 1618 "hammer mirror-stream [[user@]host:]<filesystem>" 1619 " [[user@]host:]<filesystem>\n" 1620 ); 1621 exit(code); 1622 } 1623 1624