1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 /* 35 * HAMMER mirroring ioctls - serialize and deserialize modifications made 36 * to a filesystem. 37 */ 38 39 #include "hammer.h" 40 41 static int hammer_mirror_check(hammer_cursor_t cursor, 42 struct hammer_ioc_mrecord_rec *mrec); 43 static int hammer_mirror_update(hammer_cursor_t cursor, 44 struct hammer_ioc_mrecord_rec *mrec); 45 static int hammer_ioc_mirror_write_rec(hammer_cursor_t cursor, 46 struct hammer_ioc_mrecord_rec *mrec, 47 struct hammer_ioc_mirror_rw *mirror, 48 uint32_t localization, 49 char *uptr); 50 static int hammer_ioc_mirror_write_pass(hammer_cursor_t cursor, 51 struct hammer_ioc_mrecord_rec *mrec, 52 struct hammer_ioc_mirror_rw *mirror, 53 uint32_t localization); 54 static int hammer_ioc_mirror_write_skip(hammer_cursor_t cursor, 55 struct hammer_ioc_mrecord_skip *mrec, 56 struct hammer_ioc_mirror_rw *mirror, 57 uint32_t localization); 58 static int hammer_mirror_delete_to(hammer_cursor_t cursor, 59 struct hammer_ioc_mirror_rw *mirror); 60 static int hammer_mirror_nomirror(hammer_base_elm_t base); 61 62 /* 63 * All B-Tree records within the specified key range which also conform 64 * to the transaction id range are returned. Mirroring code keeps track 65 * of the last transaction id fully scanned and can efficiently pick up 66 * where it left off if interrupted. 67 * 68 * The PFS is identified in the mirror structure. The passed ip is just 69 * some directory in the overall HAMMER filesystem and has nothing to 70 * do with the PFS. 71 */ 72 int 73 hammer_ioc_mirror_read(hammer_transaction_t trans, hammer_inode_t ip, 74 struct hammer_ioc_mirror_rw *mirror) 75 { 76 struct hammer_cmirror cmirror; 77 struct hammer_cursor cursor; 78 union hammer_ioc_mrecord_any mrec; 79 hammer_btree_leaf_elm_t elm; 80 const int crc_start = HAMMER_MREC_CRCOFF; 81 char *uptr; 82 int error; 83 int data_len; 84 int bytes; 85 int eatdisk; 86 int mrec_flags; 87 uint32_t localization; 88 uint32_t rec_crc; 89 90 localization = pfs_to_lo(mirror->pfs_id); 91 92 if ((mirror->key_beg.localization | mirror->key_end.localization) & 93 HAMMER_LOCALIZE_PSEUDOFS_MASK) { 94 return(EINVAL); 95 } 96 if (hammer_btree_cmp(&mirror->key_beg, &mirror->key_end) > 0) 97 return(EINVAL); 98 99 mirror->key_cur = mirror->key_beg; 100 mirror->key_cur.localization &= HAMMER_LOCALIZE_MASK; 101 mirror->key_cur.localization |= localization; 102 bzero(&mrec, sizeof(mrec)); 103 bzero(&cmirror, sizeof(cmirror)); 104 105 /* 106 * Make CRC errors non-fatal (at least on data), causing an EDOM 107 * error instead of EIO. 108 */ 109 trans->flags |= HAMMER_TRANSF_CRCDOM; 110 111 retry: 112 error = hammer_init_cursor(trans, &cursor, NULL, NULL); 113 if (error) { 114 hammer_done_cursor(&cursor); 115 goto failed; 116 } 117 cursor.key_beg = mirror->key_cur; 118 cursor.key_end = mirror->key_end; 119 cursor.key_end.localization &= HAMMER_LOCALIZE_MASK; 120 cursor.key_end.localization |= localization; 121 122 cursor.flags |= HAMMER_CURSOR_END_INCLUSIVE; 123 cursor.flags |= HAMMER_CURSOR_BACKEND; 124 125 /* 126 * This flag filters the search to only return elements whos create 127 * or delete TID is >= mirror_tid. The B-Tree uses the mirror_tid 128 * field stored with internal and leaf nodes to shortcut the scan. 129 */ 130 cursor.flags |= HAMMER_CURSOR_MIRROR_FILTERED; 131 cursor.cmirror = &cmirror; 132 cmirror.mirror_tid = mirror->tid_beg; 133 134 error = hammer_btree_first(&cursor); 135 while (error == 0) { 136 /* 137 * Yield to more important tasks 138 */ 139 if (error == 0) { 140 error = hammer_signal_check(trans->hmp); 141 if (error) 142 break; 143 } 144 145 /* 146 * An internal node can be returned in mirror-filtered 147 * mode and indicates that the scan is returning a skip 148 * range in the cursor->cmirror structure. 149 */ 150 uptr = (char *)mirror->ubuf + mirror->count; 151 if (cursor.node->ondisk->type == HAMMER_BTREE_TYPE_INTERNAL) { 152 /* 153 * Check space 154 */ 155 mirror->key_cur = cmirror.skip_beg; 156 bytes = sizeof(mrec.skip); 157 if (mirror->count + HAMMER_HEAD_DOALIGN(bytes) > 158 mirror->size) { 159 break; 160 } 161 162 /* 163 * Fill mrec 164 */ 165 mrec.head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 166 mrec.head.type = HAMMER_MREC_TYPE_SKIP; 167 mrec.head.rec_size = bytes; 168 mrec.skip.skip_beg = cmirror.skip_beg; 169 mrec.skip.skip_end = cmirror.skip_end; 170 mrec.head.rec_crc = crc32(&mrec.head.rec_size, 171 bytes - crc_start); 172 error = copyout(&mrec, uptr, bytes); 173 eatdisk = 0; 174 goto didwrite; 175 } 176 177 /* 178 * Leaf node. In full-history mode we could filter out 179 * elements modified outside the user-requested TID range. 180 * 181 * However, such elements must be returned so the writer 182 * can compare them against the target to determine what 183 * needs to be deleted on the target, particular for 184 * no-history mirrors. 185 */ 186 KKASSERT(cursor.node->ondisk->type == HAMMER_BTREE_TYPE_LEAF); 187 elm = &cursor.node->ondisk->elms[cursor.index].leaf; 188 mirror->key_cur = elm->base; 189 190 /* 191 * If the record was created after our end point we just 192 * ignore it. 193 */ 194 if (elm->base.create_tid > mirror->tid_end) { 195 error = 0; 196 bytes = 0; 197 eatdisk = 1; 198 goto didwrite; 199 } 200 201 /* 202 * Determine if we should generate a PASS or a REC. PASS 203 * records are records without any data payload. Such 204 * records will be generated if the target is already expected 205 * to have the record, allowing it to delete the gaps. 206 * 207 * A PASS record is also used to perform deletions on the 208 * target. 209 * 210 * Such deletions are needed if the master or files on the 211 * master are no-history, or if the slave is so far behind 212 * the master has already been pruned. 213 */ 214 if (elm->base.create_tid < mirror->tid_beg) { 215 bytes = sizeof(mrec.rec); 216 if (mirror->count + HAMMER_HEAD_DOALIGN(bytes) > 217 mirror->size) { 218 break; 219 } 220 221 /* 222 * Fill mrec. 223 */ 224 mrec.head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 225 mrec.head.type = HAMMER_MREC_TYPE_PASS; 226 mrec.head.rec_size = bytes; 227 mrec.rec.leaf = *elm; 228 mrec.head.rec_crc = crc32(&mrec.head.rec_size, 229 bytes - crc_start); 230 error = copyout(&mrec, uptr, bytes); 231 eatdisk = 1; 232 goto didwrite; 233 } 234 235 /* 236 * The core code exports the data to userland. 237 * 238 * CRC errors on data are reported but passed through, 239 * but the data must be washed by the user program. 240 * 241 * If userland just wants the btree records it can 242 * request that bulk data not be returned. This is 243 * use during mirror-stream histogram generation. 244 */ 245 mrec_flags = 0; 246 data_len = (elm->data_offset) ? elm->data_len : 0; 247 if (data_len && 248 (mirror->head.flags & HAMMER_IOC_MIRROR_NODATA)) { 249 data_len = 0; 250 mrec_flags |= HAMMER_MRECF_NODATA; 251 } 252 if (data_len) { 253 error = hammer_btree_extract_data(&cursor); 254 if (error) { 255 if (error != EDOM) 256 break; 257 mrec_flags |= HAMMER_MRECF_CRC_ERROR | 258 HAMMER_MRECF_DATA_CRC_BAD; 259 } 260 } 261 262 bytes = sizeof(mrec.rec) + data_len; 263 if (mirror->count + HAMMER_HEAD_DOALIGN(bytes) > mirror->size) 264 break; 265 266 /* 267 * Construct the record for userland and copyout. 268 * 269 * The user is asking for a snapshot, if the record was 270 * deleted beyond the user-requested ending tid, the record 271 * is not considered deleted from the point of view of 272 * userland and delete_tid is cleared. 273 */ 274 mrec.head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 275 mrec.head.type = HAMMER_MREC_TYPE_REC | mrec_flags; 276 mrec.head.rec_size = bytes; 277 mrec.rec.leaf = *elm; 278 279 if (elm->base.delete_tid > mirror->tid_end) 280 mrec.rec.leaf.base.delete_tid = 0; 281 rec_crc = crc32(&mrec.head.rec_size, 282 sizeof(mrec.rec) - crc_start); 283 if (data_len) 284 rec_crc = crc32_ext(cursor.data, data_len, rec_crc); 285 mrec.head.rec_crc = rec_crc; 286 error = copyout(&mrec, uptr, sizeof(mrec.rec)); 287 if (data_len && error == 0) { 288 error = copyout(cursor.data, uptr + sizeof(mrec.rec), 289 data_len); 290 } 291 eatdisk = 1; 292 293 /* 294 * eatdisk controls whether we skip the current cursor 295 * position on the next scan or not. If doing a SKIP 296 * the cursor is already positioned properly for the next 297 * scan and eatdisk will be 0. 298 */ 299 didwrite: 300 if (error == 0) { 301 mirror->count += HAMMER_HEAD_DOALIGN(bytes); 302 if (eatdisk) 303 cursor.flags |= HAMMER_CURSOR_ATEDISK; 304 else 305 cursor.flags &= ~HAMMER_CURSOR_ATEDISK; 306 error = hammer_btree_iterate(&cursor); 307 } 308 } 309 if (error == ENOENT) { 310 mirror->key_cur = mirror->key_end; 311 error = 0; 312 } 313 hammer_done_cursor(&cursor); 314 if (error == EDEADLK) 315 goto retry; 316 if (error == EINTR) { 317 mirror->head.flags |= HAMMER_IOC_HEAD_INTR; 318 error = 0; 319 } 320 failed: 321 mirror->key_cur.localization &= HAMMER_LOCALIZE_MASK; 322 return(error); 323 } 324 325 /* 326 * Copy records from userland to the target mirror. 327 * 328 * The PFS is identified in the mirror structure. The passed ip is just 329 * some directory in the overall HAMMER filesystem and has nothing to 330 * do with the PFS. In fact, there might not even be a root directory for 331 * the PFS yet! 332 */ 333 int 334 hammer_ioc_mirror_write(hammer_transaction_t trans, hammer_inode_t ip, 335 struct hammer_ioc_mirror_rw *mirror) 336 { 337 union hammer_ioc_mrecord_any mrec; 338 struct hammer_cursor cursor; 339 uint32_t localization; 340 int checkspace_count = 0; 341 int error; 342 int bytes; 343 char *uptr; 344 int seq; 345 346 localization = pfs_to_lo(mirror->pfs_id); 347 seq = trans->hmp->flusher.done; 348 349 /* 350 * Validate the mirror structure and relocalize the tracking keys. 351 */ 352 if (mirror->size < 0 || mirror->size > 0x70000000) 353 return(EINVAL); 354 mirror->key_beg.localization &= HAMMER_LOCALIZE_MASK; 355 mirror->key_beg.localization |= localization; 356 mirror->key_end.localization &= HAMMER_LOCALIZE_MASK; 357 mirror->key_end.localization |= localization; 358 mirror->key_cur.localization &= HAMMER_LOCALIZE_MASK; 359 mirror->key_cur.localization |= localization; 360 361 /* 362 * Set up our tracking cursor for the loop. The tracking cursor 363 * is used to delete records that are no longer present on the 364 * master. The last handled record at key_cur must be skipped. 365 */ 366 error = hammer_init_cursor(trans, &cursor, NULL, NULL); 367 368 cursor.key_beg = mirror->key_cur; 369 cursor.key_end = mirror->key_end; 370 cursor.flags |= HAMMER_CURSOR_BACKEND; 371 error = hammer_btree_first(&cursor); 372 if (error == 0) 373 cursor.flags |= HAMMER_CURSOR_ATEDISK; 374 if (error == ENOENT) 375 error = 0; 376 377 /* 378 * Loop until our input buffer has been exhausted. 379 */ 380 while (error == 0 && 381 mirror->count + sizeof(mrec.head) <= mirror->size) { 382 383 /* 384 * Don't blow out the buffer cache. Leave room for frontend 385 * cache as well. 386 * 387 * WARNING: See warnings in hammer_unlock_cursor() function. 388 */ 389 while (hammer_flusher_meta_halflimit(trans->hmp) || 390 hammer_flusher_undo_exhausted(trans, 2)) { 391 hammer_unlock_cursor(&cursor); 392 hammer_flusher_wait(trans->hmp, seq); 393 hammer_lock_cursor(&cursor); 394 seq = hammer_flusher_async_one(trans->hmp); 395 } 396 397 /* 398 * If there is insufficient free space it may be due to 399 * reserved big-blocks, which flushing might fix. 400 */ 401 if (hammer_checkspace(trans->hmp, HAMMER_CHKSPC_MIRROR)) { 402 if (++checkspace_count == 10) { 403 error = ENOSPC; 404 break; 405 } 406 hammer_unlock_cursor(&cursor); 407 hammer_flusher_wait(trans->hmp, seq); 408 hammer_lock_cursor(&cursor); 409 seq = hammer_flusher_async(trans->hmp, NULL); 410 } 411 412 413 /* 414 * Acquire and validate header 415 */ 416 if ((bytes = mirror->size - mirror->count) > sizeof(mrec)) 417 bytes = sizeof(mrec); 418 uptr = (char *)mirror->ubuf + mirror->count; 419 error = copyin(uptr, &mrec, bytes); 420 if (error) 421 break; 422 if (mrec.head.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 423 error = EINVAL; 424 break; 425 } 426 if (mrec.head.rec_size < sizeof(mrec.head) || 427 mrec.head.rec_size > sizeof(mrec) + HAMMER_XBUFSIZE || 428 mirror->count + mrec.head.rec_size > mirror->size) { 429 error = EINVAL; 430 break; 431 } 432 433 switch(mrec.head.type & HAMMER_MRECF_TYPE_MASK) { 434 case HAMMER_MREC_TYPE_SKIP: 435 if (mrec.head.rec_size != sizeof(mrec.skip)) 436 error = EINVAL; 437 if (error == 0) 438 error = hammer_ioc_mirror_write_skip(&cursor, &mrec.skip, mirror, localization); 439 break; 440 case HAMMER_MREC_TYPE_REC: 441 if (mrec.head.rec_size < sizeof(mrec.rec)) 442 error = EINVAL; 443 if (error == 0) 444 error = hammer_ioc_mirror_write_rec(&cursor, &mrec.rec, mirror, localization, uptr + sizeof(mrec.rec)); 445 break; 446 case HAMMER_MREC_TYPE_REC_NODATA: 447 case HAMMER_MREC_TYPE_REC_BADCRC: 448 /* 449 * Records with bad data payloads are ignored XXX. 450 * Records with no data payload have to be skipped 451 * (they shouldn't have been written in the first 452 * place). 453 */ 454 if (mrec.head.rec_size < sizeof(mrec.rec)) 455 error = EINVAL; 456 break; 457 case HAMMER_MREC_TYPE_PASS: 458 if (mrec.head.rec_size != sizeof(mrec.rec)) 459 error = EINVAL; 460 if (error == 0) 461 error = hammer_ioc_mirror_write_pass(&cursor, &mrec.rec, mirror, localization); 462 break; 463 default: 464 error = EINVAL; 465 break; 466 } 467 468 /* 469 * Retry the current record on deadlock, otherwise setup 470 * for the next loop. 471 */ 472 if (error == EDEADLK) { 473 while (error == EDEADLK) { 474 hammer_sync_lock_sh(trans); 475 hammer_recover_cursor(&cursor); 476 error = hammer_cursor_upgrade(&cursor); 477 hammer_sync_unlock(trans); 478 } 479 } else { 480 if (error == EALREADY) 481 error = 0; 482 if (error == 0) { 483 mirror->count += 484 HAMMER_HEAD_DOALIGN(mrec.head.rec_size); 485 } 486 } 487 } 488 hammer_done_cursor(&cursor); 489 490 /* 491 * cumulative error 492 */ 493 if (error) { 494 mirror->head.flags |= HAMMER_IOC_HEAD_ERROR; 495 mirror->head.error = error; 496 } 497 498 /* 499 * ioctls don't update the RW data structure if an error is returned, 500 * always return 0. 501 */ 502 return(0); 503 } 504 505 /* 506 * Handle skip records. 507 * 508 * We must iterate from the last resolved record position at mirror->key_cur 509 * to skip_beg non-inclusive and delete any records encountered. 510 * 511 * mirror->key_cur must be carefully set when we succeed in processing 512 * this mrec. 513 */ 514 static int 515 hammer_ioc_mirror_write_skip(hammer_cursor_t cursor, 516 struct hammer_ioc_mrecord_skip *mrec, 517 struct hammer_ioc_mirror_rw *mirror, 518 uint32_t localization) 519 { 520 int error; 521 522 /* 523 * Relocalize the skip range 524 */ 525 mrec->skip_beg.localization &= HAMMER_LOCALIZE_MASK; 526 mrec->skip_beg.localization |= localization; 527 mrec->skip_end.localization &= HAMMER_LOCALIZE_MASK; 528 mrec->skip_end.localization |= localization; 529 530 /* 531 * Iterate from current position to skip_beg, deleting any records 532 * we encounter. The record at skip_beg is not included (it is 533 * skipped). 534 */ 535 cursor->key_end = mrec->skip_beg; 536 cursor->flags &= ~HAMMER_CURSOR_END_INCLUSIVE; 537 cursor->flags |= HAMMER_CURSOR_BACKEND; 538 error = hammer_mirror_delete_to(cursor, mirror); 539 540 /* 541 * Now skip past the skip (which is the whole point point of 542 * having a skip record). The sender has not sent us any records 543 * for the skip area so we wouldn't know what to keep and what 544 * to delete anyway. 545 * 546 * Clear ATEDISK because skip_end is non-inclusive, so we can't 547 * count an exact match if we happened to get one. 548 */ 549 if (error == 0) { 550 mirror->key_cur = mrec->skip_end; 551 cursor->key_beg = mrec->skip_end; 552 error = hammer_btree_lookup(cursor); 553 cursor->flags &= ~HAMMER_CURSOR_ATEDISK; 554 if (error == ENOENT) 555 error = 0; 556 } 557 return(error); 558 } 559 560 /* 561 * Handle B-Tree records. 562 * 563 * We must iterate to mrec->base.key (non-inclusively), and then process 564 * the record. We are allowed to write a new record or delete an existing 565 * record, but cannot replace an existing record. 566 * 567 * mirror->key_cur must be carefully set when we succeed in processing 568 * this mrec. 569 */ 570 static int 571 hammer_ioc_mirror_write_rec(hammer_cursor_t cursor, 572 struct hammer_ioc_mrecord_rec *mrec, 573 struct hammer_ioc_mirror_rw *mirror, 574 uint32_t localization, 575 char *uptr) 576 { 577 int error; 578 579 if (mrec->leaf.data_len < 0 || 580 mrec->leaf.data_len > HAMMER_XBUFSIZE || 581 mrec->leaf.data_len + sizeof(*mrec) > mrec->head.rec_size) { 582 return(EINVAL); 583 } 584 585 /* 586 * Re-localize for target. relocalization of data is handled 587 * by hammer_create_at_cursor(). 588 */ 589 mrec->leaf.base.localization &= HAMMER_LOCALIZE_MASK; 590 mrec->leaf.base.localization |= localization; 591 592 /* 593 * Delete records through until we reach (non-inclusively) the 594 * target record. 595 */ 596 cursor->key_end = mrec->leaf.base; 597 cursor->flags &= ~HAMMER_CURSOR_END_INCLUSIVE; 598 cursor->flags |= HAMMER_CURSOR_BACKEND; 599 error = hammer_mirror_delete_to(cursor, mirror); 600 601 /* 602 * Certain records are not part of the mirroring operation 603 */ 604 if (error == 0 && hammer_mirror_nomirror(&mrec->leaf.base)) 605 return(0); 606 607 /* 608 * Locate the record. 609 * 610 * If the record exists only the delete_tid may be updated. 611 * 612 * If the record does not exist we can create it only if the 613 * create_tid is not too old. If the create_tid is too old 614 * it may have already been destroyed on the slave from pruning. 615 * 616 * Note that mirror operations are effectively as-of operations 617 * and delete_tid can be 0 for mirroring purposes even if it is 618 * not actually 0 at the originator. 619 * 620 * These functions can return EDEADLK 621 */ 622 if (error == 0) { 623 cursor->key_beg = mrec->leaf.base; 624 cursor->flags |= HAMMER_CURSOR_BACKEND; 625 cursor->flags &= ~HAMMER_CURSOR_INSERT; 626 error = hammer_btree_lookup(cursor); 627 } 628 629 if (error == 0 && hammer_mirror_check(cursor, mrec)) { 630 error = hammer_mirror_update(cursor, mrec); 631 } else if (error == ENOENT) { 632 if (mrec->leaf.base.create_tid >= mirror->tid_beg) { 633 error = hammer_create_at_cursor( 634 cursor, &mrec->leaf, 635 uptr, HAMMER_CREATE_MODE_UMIRROR); 636 } else { 637 error = 0; 638 } 639 } 640 if (error == 0 || error == EALREADY) 641 mirror->key_cur = mrec->leaf.base; 642 return(error); 643 } 644 645 /* 646 * This works like write_rec but no write or update is necessary, 647 * and no data payload is included so we couldn't do a write even 648 * if we wanted to. 649 * 650 * We must still iterate for deletions, and we can validate the 651 * record header which is a good way to test for corrupted mirror 652 * targets XXX. 653 * 654 * mirror->key_cur must be carefully set when we succeed in processing 655 * this mrec. 656 */ 657 static 658 int 659 hammer_ioc_mirror_write_pass(hammer_cursor_t cursor, 660 struct hammer_ioc_mrecord_rec *mrec, 661 struct hammer_ioc_mirror_rw *mirror, 662 uint32_t localization) 663 { 664 int error; 665 666 /* 667 * Re-localize for target. Relocalization of data is handled 668 * by hammer_create_at_cursor(). 669 */ 670 mrec->leaf.base.localization &= HAMMER_LOCALIZE_MASK; 671 mrec->leaf.base.localization |= localization; 672 673 /* 674 * Delete records through until we reach (non-inclusively) the 675 * target record. 676 */ 677 cursor->key_end = mrec->leaf.base; 678 cursor->flags &= ~HAMMER_CURSOR_END_INCLUSIVE; 679 cursor->flags |= HAMMER_CURSOR_BACKEND; 680 error = hammer_mirror_delete_to(cursor, mirror); 681 682 /* 683 * Certain records are not part of the mirroring operation 684 */ 685 if (hammer_mirror_nomirror(&mrec->leaf.base)) 686 return(0); 687 688 /* 689 * Locate the record and get past it by setting ATEDISK. Perform 690 * any necessary deletions. We have no data payload and cannot 691 * create a new record. 692 */ 693 if (error == 0) { 694 mirror->key_cur = mrec->leaf.base; 695 cursor->key_beg = mrec->leaf.base; 696 cursor->flags |= HAMMER_CURSOR_BACKEND; 697 cursor->flags &= ~HAMMER_CURSOR_INSERT; 698 error = hammer_btree_lookup(cursor); 699 if (error == 0) { 700 if (hammer_mirror_check(cursor, mrec)) 701 error = hammer_mirror_update(cursor, mrec); 702 cursor->flags |= HAMMER_CURSOR_ATEDISK; 703 } else { 704 cursor->flags &= ~HAMMER_CURSOR_ATEDISK; 705 } 706 if (error == ENOENT) 707 error = 0; 708 } 709 return(error); 710 } 711 712 /* 713 * As part of the mirror write we iterate across swaths of records 714 * on the target which no longer exist on the source, and mark them 715 * deleted. 716 * 717 * The caller has indexed the cursor and set up key_end. We iterate 718 * through to key_end. 719 * 720 * There is an edge case where the master has deleted a record whos 721 * create_tid exactly matches our end_tid. We cannot delete this 722 * record on the slave yet because we cannot assign delete_tid == create_tid. 723 * The deletion should be picked up on the next sequence since in order 724 * to have been deleted on the master a transaction must have occured with 725 * a TID greater then the create_tid of the record. 726 * 727 * To support incremental re-mirroring, just for robustness, we do not 728 * touch any records created beyond (or equal to) mirror->tid_end. 729 */ 730 static 731 int 732 hammer_mirror_delete_to(hammer_cursor_t cursor, 733 struct hammer_ioc_mirror_rw *mirror) 734 { 735 hammer_btree_leaf_elm_t elm; 736 int error; 737 738 error = hammer_btree_iterate(cursor); 739 while (error == 0) { 740 elm = &cursor->node->ondisk->elms[cursor->index].leaf; 741 KKASSERT(elm->base.btype == HAMMER_BTREE_TYPE_RECORD); 742 cursor->flags |= HAMMER_CURSOR_ATEDISK; 743 744 /* 745 * Certain records are not part of the mirroring operation 746 */ 747 if (hammer_mirror_nomirror(&elm->base)) { 748 error = hammer_btree_iterate(cursor); 749 continue; 750 } 751 752 /* 753 * Note: Must still delete records with create_tid < tid_beg, 754 * as record may have been pruned-away on source. 755 */ 756 if (elm->base.delete_tid == 0 && 757 elm->base.create_tid < mirror->tid_end) { 758 error = hammer_delete_at_cursor(cursor, 759 HAMMER_DELETE_ADJUST, 760 mirror->tid_end, 761 time_second, 762 1, NULL); 763 } 764 if (error == 0) 765 error = hammer_btree_iterate(cursor); 766 } 767 if (error == ENOENT) 768 error = 0; 769 return(error); 770 } 771 772 /* 773 * Check whether an update is needed in the case where a match already 774 * exists on the target. The only type of update allowed in this case 775 * is an update of the delete_tid. 776 * 777 * Return non-zero if the update should proceed. 778 */ 779 static 780 int 781 hammer_mirror_check(hammer_cursor_t cursor, struct hammer_ioc_mrecord_rec *mrec) 782 { 783 hammer_btree_leaf_elm_t leaf = cursor->leaf; 784 785 if (leaf->base.delete_tid != mrec->leaf.base.delete_tid) { 786 if (mrec->leaf.base.delete_tid != 0) 787 return(1); 788 } 789 return(0); 790 } 791 792 /* 793 * Filter out records which are never mirrored, such as configuration space 794 * records (for hammer cleanup). 795 * 796 * NOTE: We currently allow HAMMER_RECTYPE_SNAPSHOT records to be mirrored. 797 */ 798 static 799 int 800 hammer_mirror_nomirror(hammer_base_elm_t base) 801 { 802 /* 803 * Certain types of records are never updated when mirroring. 804 * Slaves have their own configuration space. 805 */ 806 if (base->rec_type == HAMMER_RECTYPE_CONFIG) 807 return(1); 808 return(0); 809 } 810 811 812 /* 813 * Update a record in-place. Only the delete_tid can change, and 814 * only from zero to non-zero. 815 */ 816 static 817 int 818 hammer_mirror_update(hammer_cursor_t cursor, 819 struct hammer_ioc_mrecord_rec *mrec) 820 { 821 int error; 822 823 /* 824 * This case shouldn't occur. 825 */ 826 if (mrec->leaf.base.delete_tid == 0) 827 return(0); 828 829 /* 830 * Mark the record deleted on the mirror target. 831 */ 832 error = hammer_delete_at_cursor(cursor, HAMMER_DELETE_ADJUST, 833 mrec->leaf.base.delete_tid, 834 mrec->leaf.delete_ts, 835 1, NULL); 836 cursor->flags |= HAMMER_CURSOR_ATEDISK; 837 return(error); 838 } 839