1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * $DragonFly: src/sys/vfs/hammer/hammer_mirror.c,v 1.17 2008/07/31 22:30:33 dillon Exp $ 35 */ 36 /* 37 * HAMMER mirroring ioctls - serialize and deserialize modifications made 38 * to a filesystem. 39 */ 40 41 #include "hammer.h" 42 43 static int hammer_mirror_check(hammer_cursor_t cursor, 44 struct hammer_ioc_mrecord_rec *mrec); 45 static int hammer_mirror_update(hammer_cursor_t cursor, 46 struct hammer_ioc_mrecord_rec *mrec); 47 static int hammer_mirror_write(hammer_cursor_t cursor, 48 struct hammer_ioc_mrecord_rec *mrec, 49 char *udata); 50 static int hammer_ioc_mirror_write_rec(hammer_cursor_t cursor, 51 struct hammer_ioc_mrecord_rec *mrec, 52 struct hammer_ioc_mirror_rw *mirror, 53 u_int32_t localization, 54 char *uptr); 55 static int hammer_ioc_mirror_write_pass(hammer_cursor_t cursor, 56 struct hammer_ioc_mrecord_rec *mrec, 57 struct hammer_ioc_mirror_rw *mirror, 58 u_int32_t localization); 59 static int hammer_ioc_mirror_write_skip(hammer_cursor_t cursor, 60 struct hammer_ioc_mrecord_skip *mrec, 61 struct hammer_ioc_mirror_rw *mirror, 62 u_int32_t localization); 63 static int hammer_mirror_delete_to(hammer_cursor_t cursor, 64 struct hammer_ioc_mirror_rw *mirror); 65 static int hammer_mirror_localize_data(hammer_data_ondisk_t data, 66 hammer_btree_leaf_elm_t leaf); 67 68 /* 69 * All B-Tree records within the specified key range which also conform 70 * to the transaction id range are returned. Mirroring code keeps track 71 * of the last transaction id fully scanned and can efficiently pick up 72 * where it left off if interrupted. 73 * 74 * The PFS is identified in the mirror structure. The passed ip is just 75 * some directory in the overall HAMMER filesystem and has nothing to 76 * do with the PFS. 77 */ 78 int 79 hammer_ioc_mirror_read(hammer_transaction_t trans, hammer_inode_t ip, 80 struct hammer_ioc_mirror_rw *mirror) 81 { 82 struct hammer_cmirror cmirror; 83 struct hammer_cursor cursor; 84 union hammer_ioc_mrecord_any mrec; 85 hammer_btree_leaf_elm_t elm; 86 const int crc_start = HAMMER_MREC_CRCOFF; 87 char *uptr; 88 int error; 89 int data_len; 90 int bytes; 91 int eatdisk; 92 u_int32_t localization; 93 u_int32_t rec_crc; 94 95 localization = (u_int32_t)mirror->pfs_id << 16; 96 97 if ((mirror->key_beg.localization | mirror->key_end.localization) & 98 HAMMER_LOCALIZE_PSEUDOFS_MASK) { 99 return(EINVAL); 100 } 101 if (hammer_btree_cmp(&mirror->key_beg, &mirror->key_end) > 0) 102 return(EINVAL); 103 104 mirror->key_cur = mirror->key_beg; 105 mirror->key_cur.localization &= HAMMER_LOCALIZE_MASK; 106 mirror->key_cur.localization += localization; 107 bzero(&mrec, sizeof(mrec)); 108 bzero(&cmirror, sizeof(cmirror)); 109 110 retry: 111 error = hammer_init_cursor(trans, &cursor, NULL, NULL); 112 if (error) { 113 hammer_done_cursor(&cursor); 114 goto failed; 115 } 116 cursor.key_beg = mirror->key_cur; 117 cursor.key_end = mirror->key_end; 118 cursor.key_end.localization &= HAMMER_LOCALIZE_MASK; 119 cursor.key_end.localization += localization; 120 121 cursor.flags |= HAMMER_CURSOR_END_INCLUSIVE; 122 cursor.flags |= HAMMER_CURSOR_BACKEND; 123 124 /* 125 * This flag filters the search to only return elements whos create 126 * or delete TID is >= mirror_tid. The B-Tree uses the mirror_tid 127 * field stored with internal and leaf nodes to shortcut the scan. 128 */ 129 cursor.flags |= HAMMER_CURSOR_MIRROR_FILTERED; 130 cursor.cmirror = &cmirror; 131 cmirror.mirror_tid = mirror->tid_beg; 132 133 error = hammer_btree_first(&cursor); 134 while (error == 0) { 135 /* 136 * Yield to more important tasks 137 */ 138 if (error == 0) { 139 error = hammer_signal_check(trans->hmp); 140 if (error) 141 break; 142 } 143 144 /* 145 * An internal node can be returned in mirror-filtered 146 * mode and indicates that the scan is returning a skip 147 * range in the cursor->cmirror structure. 148 */ 149 uptr = (char *)mirror->ubuf + mirror->count; 150 if (cursor.node->ondisk->type == HAMMER_BTREE_TYPE_INTERNAL) { 151 /* 152 * Check space 153 */ 154 mirror->key_cur = cmirror.skip_beg; 155 bytes = sizeof(mrec.skip); 156 if (mirror->count + HAMMER_HEAD_DOALIGN(bytes) > 157 mirror->size) { 158 break; 159 } 160 161 /* 162 * Fill mrec 163 */ 164 mrec.head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 165 mrec.head.type = HAMMER_MREC_TYPE_SKIP; 166 mrec.head.rec_size = bytes; 167 mrec.skip.skip_beg = cmirror.skip_beg; 168 mrec.skip.skip_end = cmirror.skip_end; 169 mrec.head.rec_crc = crc32(&mrec.head.rec_size, 170 bytes - crc_start); 171 error = copyout(&mrec, uptr, bytes); 172 eatdisk = 0; 173 goto didwrite; 174 } 175 176 /* 177 * Leaf node. In full-history mode we could filter out 178 * elements modified outside the user-requested TID range. 179 * 180 * However, such elements must be returned so the writer 181 * can compare them against the target to detemrine what 182 * needs to be deleted on the target, particular for 183 * no-history mirrors. 184 */ 185 KKASSERT(cursor.node->ondisk->type == HAMMER_BTREE_TYPE_LEAF); 186 elm = &cursor.node->ondisk->elms[cursor.index].leaf; 187 mirror->key_cur = elm->base; 188 189 /* 190 * Determine if we should generate a PASS or a REC. PASS 191 * records are records without any data payload. Such 192 * records will be generated if the target is already expected 193 * to have the record, allowing it to delete the gaps. 194 * 195 * A PASS record is also used to perform deletions on the 196 * target. 197 * 198 * Such deletions are needed if the master or files on the 199 * master are no-history, or if the slave is so far behind 200 * the master has already been pruned. 201 */ 202 if (elm->base.create_tid < mirror->tid_beg || 203 elm->base.create_tid > mirror->tid_end) { 204 bytes = sizeof(mrec.rec); 205 if (mirror->count + HAMMER_HEAD_DOALIGN(bytes) > 206 mirror->size) { 207 break; 208 } 209 210 /* 211 * Fill mrec. 212 */ 213 mrec.head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 214 mrec.head.type = HAMMER_MREC_TYPE_PASS; 215 mrec.head.rec_size = bytes; 216 mrec.rec.leaf = *elm; 217 mrec.head.rec_crc = crc32(&mrec.head.rec_size, 218 bytes - crc_start); 219 error = copyout(&mrec, uptr, bytes); 220 eatdisk = 1; 221 goto didwrite; 222 223 } 224 225 /* 226 * The core code exports the data to userland. 227 */ 228 data_len = (elm->data_offset) ? elm->data_len : 0; 229 if (data_len) { 230 error = hammer_btree_extract(&cursor, 231 HAMMER_CURSOR_GET_DATA); 232 if (error) 233 break; 234 } 235 236 bytes = sizeof(mrec.rec) + data_len; 237 if (mirror->count + HAMMER_HEAD_DOALIGN(bytes) > mirror->size) 238 break; 239 240 /* 241 * Construct the record for userland and copyout. 242 * 243 * The user is asking for a snapshot, if the record was 244 * deleted beyond the user-requested ending tid, the record 245 * is not considered deleted from the point of view of 246 * userland and delete_tid is cleared. 247 */ 248 mrec.head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 249 mrec.head.type = HAMMER_MREC_TYPE_REC; 250 mrec.head.rec_size = bytes; 251 mrec.rec.leaf = *elm; 252 if (elm->base.delete_tid > mirror->tid_end) 253 mrec.rec.leaf.base.delete_tid = 0; 254 rec_crc = crc32(&mrec.head.rec_size, 255 sizeof(mrec.rec) - crc_start); 256 if (data_len) 257 rec_crc = crc32_ext(cursor.data, data_len, rec_crc); 258 mrec.head.rec_crc = rec_crc; 259 error = copyout(&mrec, uptr, sizeof(mrec.rec)); 260 if (data_len && error == 0) { 261 error = copyout(cursor.data, uptr + sizeof(mrec.rec), 262 data_len); 263 } 264 eatdisk = 1; 265 266 /* 267 * eatdisk controls whether we skip the current cursor 268 * position on the next scan or not. If doing a SKIP 269 * the cursor is already positioned properly for the next 270 * scan and eatdisk will be 0. 271 */ 272 didwrite: 273 if (error == 0) { 274 mirror->count += HAMMER_HEAD_DOALIGN(bytes); 275 if (eatdisk) 276 cursor.flags |= HAMMER_CURSOR_ATEDISK; 277 else 278 cursor.flags &= ~HAMMER_CURSOR_ATEDISK; 279 error = hammer_btree_iterate(&cursor); 280 } 281 } 282 if (error == ENOENT) { 283 mirror->key_cur = mirror->key_end; 284 error = 0; 285 } 286 hammer_done_cursor(&cursor); 287 if (error == EDEADLK) 288 goto retry; 289 if (error == EINTR) { 290 mirror->head.flags |= HAMMER_IOC_HEAD_INTR; 291 error = 0; 292 } 293 failed: 294 mirror->key_cur.localization &= HAMMER_LOCALIZE_MASK; 295 return(error); 296 } 297 298 /* 299 * Copy records from userland to the target mirror. 300 * 301 * The PFS is identified in the mirror structure. The passed ip is just 302 * some directory in the overall HAMMER filesystem and has nothing to 303 * do with the PFS. In fact, there might not even be a root directory for 304 * the PFS yet! 305 */ 306 int 307 hammer_ioc_mirror_write(hammer_transaction_t trans, hammer_inode_t ip, 308 struct hammer_ioc_mirror_rw *mirror) 309 { 310 union hammer_ioc_mrecord_any mrec; 311 struct hammer_cursor cursor; 312 u_int32_t localization; 313 int checkspace_count = 0; 314 int error; 315 int bytes; 316 char *uptr; 317 int seq; 318 319 localization = (u_int32_t)mirror->pfs_id << 16; 320 seq = trans->hmp->flusher.act; 321 322 /* 323 * Validate the mirror structure and relocalize the tracking keys. 324 */ 325 if (mirror->size < 0 || mirror->size > 0x70000000) 326 return(EINVAL); 327 mirror->key_beg.localization &= HAMMER_LOCALIZE_MASK; 328 mirror->key_beg.localization += localization; 329 mirror->key_end.localization &= HAMMER_LOCALIZE_MASK; 330 mirror->key_end.localization += localization; 331 mirror->key_cur.localization &= HAMMER_LOCALIZE_MASK; 332 mirror->key_cur.localization += localization; 333 334 /* 335 * Set up our tracking cursor for the loop. The tracking cursor 336 * is used to delete records that are no longer present on the 337 * master. The last handled record at key_cur must be skipped. 338 */ 339 error = hammer_init_cursor(trans, &cursor, NULL, NULL); 340 341 cursor.key_beg = mirror->key_cur; 342 cursor.key_end = mirror->key_end; 343 cursor.flags |= HAMMER_CURSOR_BACKEND; 344 error = hammer_btree_first(&cursor); 345 if (error == 0) 346 cursor.flags |= HAMMER_CURSOR_ATEDISK; 347 if (error == ENOENT) 348 error = 0; 349 350 /* 351 * Loop until our input buffer has been exhausted. 352 */ 353 while (error == 0 && 354 mirror->count + sizeof(mrec.head) <= mirror->size) { 355 356 /* 357 * Don't blow out the buffer cache. Leave room for frontend 358 * cache as well. 359 */ 360 while (hammer_flusher_meta_halflimit(trans->hmp) || 361 hammer_flusher_undo_exhausted(trans, 2)) { 362 hammer_unlock_cursor(&cursor); 363 hammer_flusher_wait(trans->hmp, seq); 364 hammer_lock_cursor(&cursor); 365 seq = hammer_flusher_async_one(trans->hmp); 366 } 367 368 /* 369 * If there is insufficient free space it may be due to 370 * reserved bigblocks, which flushing might fix. 371 */ 372 if (hammer_checkspace(trans->hmp, HAMMER_CHKSPC_MIRROR)) { 373 if (++checkspace_count == 10) { 374 error = ENOSPC; 375 break; 376 } 377 hammer_unlock_cursor(&cursor); 378 hammer_flusher_wait(trans->hmp, seq); 379 hammer_lock_cursor(&cursor); 380 seq = hammer_flusher_async(trans->hmp, NULL); 381 } 382 383 384 /* 385 * Acquire and validate header 386 */ 387 if ((bytes = mirror->size - mirror->count) > sizeof(mrec)) 388 bytes = sizeof(mrec); 389 uptr = (char *)mirror->ubuf + mirror->count; 390 error = copyin(uptr, &mrec, bytes); 391 if (error) 392 break; 393 if (mrec.head.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 394 error = EINVAL; 395 break; 396 } 397 if (mrec.head.rec_size < sizeof(mrec.head) || 398 mrec.head.rec_size > sizeof(mrec) + HAMMER_XBUFSIZE || 399 mirror->count + mrec.head.rec_size > mirror->size) { 400 error = EINVAL; 401 break; 402 } 403 404 switch(mrec.head.type) { 405 case HAMMER_MREC_TYPE_SKIP: 406 if (mrec.head.rec_size != sizeof(mrec.skip)) 407 error = EINVAL; 408 if (error == 0) 409 error = hammer_ioc_mirror_write_skip(&cursor, &mrec.skip, mirror, localization); 410 break; 411 case HAMMER_MREC_TYPE_REC: 412 if (mrec.head.rec_size < sizeof(mrec.rec)) 413 error = EINVAL; 414 if (error == 0) 415 error = hammer_ioc_mirror_write_rec(&cursor, &mrec.rec, mirror, localization, uptr + sizeof(mrec.rec)); 416 break; 417 case HAMMER_MREC_TYPE_PASS: 418 if (mrec.head.rec_size != sizeof(mrec.rec)) 419 error = EINVAL; 420 if (error == 0) 421 error = hammer_ioc_mirror_write_pass(&cursor, &mrec.rec, mirror, localization); 422 break; 423 default: 424 error = EINVAL; 425 break; 426 } 427 428 /* 429 * Retry the current record on deadlock, otherwise setup 430 * for the next loop. 431 */ 432 if (error == EDEADLK) { 433 while (error == EDEADLK) { 434 hammer_recover_cursor(&cursor); 435 error = hammer_cursor_upgrade(&cursor); 436 } 437 } else { 438 if (error == EALREADY) 439 error = 0; 440 if (error == 0) { 441 mirror->count += 442 HAMMER_HEAD_DOALIGN(mrec.head.rec_size); 443 } 444 } 445 } 446 hammer_done_cursor(&cursor); 447 448 /* 449 * cumulative error 450 */ 451 if (error) { 452 mirror->head.flags |= HAMMER_IOC_HEAD_ERROR; 453 mirror->head.error = error; 454 } 455 456 /* 457 * ioctls don't update the RW data structure if an error is returned, 458 * always return 0. 459 */ 460 return(0); 461 } 462 463 /* 464 * Handle skip records. 465 * 466 * We must iterate from the last resolved record position at mirror->key_cur 467 * to skip_beg and delete any records encountered. 468 * 469 * mirror->key_cur must be carefully set when we succeed in processing 470 * this mrec. 471 */ 472 static int 473 hammer_ioc_mirror_write_skip(hammer_cursor_t cursor, 474 struct hammer_ioc_mrecord_skip *mrec, 475 struct hammer_ioc_mirror_rw *mirror, 476 u_int32_t localization) 477 { 478 int error; 479 480 /* 481 * Relocalize the skip range 482 */ 483 mrec->skip_beg.localization &= HAMMER_LOCALIZE_MASK; 484 mrec->skip_beg.localization += localization; 485 mrec->skip_end.localization &= HAMMER_LOCALIZE_MASK; 486 mrec->skip_end.localization += localization; 487 488 /* 489 * Iterate from current position to skip_beg, deleting any records 490 * we encounter. 491 */ 492 cursor->key_end = mrec->skip_beg; 493 cursor->flags |= HAMMER_CURSOR_BACKEND; 494 error = hammer_mirror_delete_to(cursor, mirror); 495 496 /* 497 * Now skip past the skip (which is the whole point point of 498 * having a skip record). The sender has not sent us any records 499 * for the skip area so we wouldn't know what to keep and what 500 * to delete anyway. 501 * 502 * Clear ATEDISK because skip_end is non-inclusive, so we can't 503 * count an exact match if we happened to get one. 504 */ 505 if (error == 0) { 506 mirror->key_cur = mrec->skip_end; 507 cursor->key_beg = mrec->skip_end; 508 error = hammer_btree_lookup(cursor); 509 cursor->flags &= ~HAMMER_CURSOR_ATEDISK; 510 if (error == ENOENT) 511 error = 0; 512 } 513 return(error); 514 } 515 516 /* 517 * Handle B-Tree records. 518 * 519 * We must iterate to mrec->base.key (non-inclusively), and then process 520 * the record. We are allowed to write a new record or delete an existing 521 * record, but cannot replace an existing record. 522 * 523 * mirror->key_cur must be carefully set when we succeed in processing 524 * this mrec. 525 */ 526 static int 527 hammer_ioc_mirror_write_rec(hammer_cursor_t cursor, 528 struct hammer_ioc_mrecord_rec *mrec, 529 struct hammer_ioc_mirror_rw *mirror, 530 u_int32_t localization, 531 char *uptr) 532 { 533 hammer_transaction_t trans; 534 u_int32_t rec_crc; 535 int error; 536 537 trans = cursor->trans; 538 rec_crc = crc32(mrec, sizeof(*mrec)); 539 540 if (mrec->leaf.data_len < 0 || 541 mrec->leaf.data_len > HAMMER_XBUFSIZE || 542 mrec->leaf.data_len + sizeof(*mrec) > mrec->head.rec_size) { 543 return(EINVAL); 544 } 545 546 /* 547 * Re-localize for target. relocalization of data is handled 548 * by hammer_mirror_write(). 549 */ 550 mrec->leaf.base.localization &= HAMMER_LOCALIZE_MASK; 551 mrec->leaf.base.localization += localization; 552 553 /* 554 * Delete records through until we reach (non-inclusively) the 555 * target record. 556 */ 557 cursor->key_end = mrec->leaf.base; 558 cursor->flags &= ~HAMMER_CURSOR_END_INCLUSIVE; 559 cursor->flags |= HAMMER_CURSOR_BACKEND; 560 error = hammer_mirror_delete_to(cursor, mirror); 561 562 /* 563 * Locate the record. 564 * 565 * If the record exists only the delete_tid may be updated. 566 * 567 * If the record does not exist we can create it only if the 568 * create_tid is not too old. If the create_tid is too old 569 * it may have already been destroyed on the slave from pruning. 570 * 571 * Note that mirror operations are effectively as-of operations 572 * and delete_tid can be 0 for mirroring purposes even if it is 573 * not actually 0 at the originator. 574 * 575 * These functions can return EDEADLK 576 */ 577 cursor->key_beg = mrec->leaf.base; 578 cursor->flags |= HAMMER_CURSOR_BACKEND; 579 cursor->flags &= ~HAMMER_CURSOR_INSERT; 580 error = hammer_btree_lookup(cursor); 581 582 if (error == 0 && hammer_mirror_check(cursor, mrec)) { 583 error = hammer_mirror_update(cursor, mrec); 584 } else if (error == ENOENT) { 585 if (mrec->leaf.base.create_tid >= mirror->tid_beg) 586 error = hammer_mirror_write(cursor, mrec, uptr); 587 else 588 error = 0; 589 } 590 if (error == 0 || error == EALREADY) 591 mirror->key_cur = mrec->leaf.base; 592 return(error); 593 } 594 595 /* 596 * This works like write_rec but no write or update is necessary, 597 * and no data payload is included so we couldn't do a write even 598 * if we wanted to. 599 * 600 * We must still iterate for deletions, and we can validate the 601 * record header which is a good way to test for corrupted mirror 602 * targets XXX. 603 * 604 * mirror->key_cur must be carefully set when we succeed in processing 605 * this mrec. 606 */ 607 static 608 int 609 hammer_ioc_mirror_write_pass(hammer_cursor_t cursor, 610 struct hammer_ioc_mrecord_rec *mrec, 611 struct hammer_ioc_mirror_rw *mirror, 612 u_int32_t localization) 613 { 614 hammer_transaction_t trans; 615 u_int32_t rec_crc; 616 int error; 617 618 trans = cursor->trans; 619 rec_crc = crc32(mrec, sizeof(*mrec)); 620 621 /* 622 * Re-localize for target. Relocalization of data is handled 623 * by hammer_mirror_write(). 624 */ 625 mrec->leaf.base.localization &= HAMMER_LOCALIZE_MASK; 626 mrec->leaf.base.localization += localization; 627 628 /* 629 * Delete records through until we reach (non-inclusively) the 630 * target record. 631 */ 632 cursor->key_end = mrec->leaf.base; 633 cursor->flags &= ~HAMMER_CURSOR_END_INCLUSIVE; 634 cursor->flags |= HAMMER_CURSOR_BACKEND; 635 636 error = hammer_mirror_delete_to(cursor, mirror); 637 638 /* 639 * Locate the record and get past it by setting ATEDISK. Perform 640 * any necessary deletions. We have no data payload and cannot 641 * create a new record. 642 */ 643 if (error == 0) { 644 mirror->key_cur = mrec->leaf.base; 645 cursor->key_beg = mrec->leaf.base; 646 cursor->flags |= HAMMER_CURSOR_BACKEND; 647 cursor->flags &= ~HAMMER_CURSOR_INSERT; 648 error = hammer_btree_lookup(cursor); 649 if (error == 0) { 650 if (hammer_mirror_check(cursor, mrec)) 651 error = hammer_mirror_update(cursor, mrec); 652 cursor->flags |= HAMMER_CURSOR_ATEDISK; 653 } else { 654 cursor->flags &= ~HAMMER_CURSOR_ATEDISK; 655 } 656 if (error == ENOENT) 657 error = 0; 658 } 659 return(error); 660 } 661 662 /* 663 * As part of the mirror write we iterate across swaths of records 664 * on the target which no longer exist on the source, and mark them 665 * deleted. 666 * 667 * The caller has indexed the cursor and set up key_end. We iterate 668 * through to key_end. 669 */ 670 static 671 int 672 hammer_mirror_delete_to(hammer_cursor_t cursor, 673 struct hammer_ioc_mirror_rw *mirror) 674 { 675 hammer_btree_leaf_elm_t elm; 676 int error; 677 678 error = hammer_btree_iterate(cursor); 679 while (error == 0) { 680 elm = &cursor->node->ondisk->elms[cursor->index].leaf; 681 KKASSERT(elm->base.btype == HAMMER_BTREE_TYPE_RECORD); 682 cursor->flags |= HAMMER_CURSOR_ATEDISK; 683 if (elm->base.delete_tid == 0) { 684 error = hammer_delete_at_cursor(cursor, 685 HAMMER_DELETE_ADJUST, 686 mirror->tid_end, 687 time_second, 688 1, NULL); 689 } 690 if (error == 0) 691 error = hammer_btree_iterate(cursor); 692 } 693 if (error == ENOENT) 694 error = 0; 695 return(error); 696 } 697 698 /* 699 * Check whether an update is needed in the case where a match already 700 * exists on the target. The only type of update allowed in this case 701 * is an update of the delete_tid. 702 * 703 * Return non-zero if the update should proceed. 704 */ 705 static 706 int 707 hammer_mirror_check(hammer_cursor_t cursor, struct hammer_ioc_mrecord_rec *mrec) 708 { 709 hammer_btree_leaf_elm_t leaf = cursor->leaf; 710 711 if (leaf->base.delete_tid != mrec->leaf.base.delete_tid) { 712 if (mrec->leaf.base.delete_tid != 0) 713 return(1); 714 } 715 return(0); 716 } 717 718 /* 719 * Update a record in-place. Only the delete_tid can change, and 720 * only from zero to non-zero. 721 */ 722 static 723 int 724 hammer_mirror_update(hammer_cursor_t cursor, 725 struct hammer_ioc_mrecord_rec *mrec) 726 { 727 int error; 728 729 /* 730 * This case shouldn't occur. 731 */ 732 if (mrec->leaf.base.delete_tid == 0) 733 return(0); 734 735 /* 736 * Mark the record deleted on the mirror target. 737 */ 738 error = hammer_delete_at_cursor(cursor, HAMMER_DELETE_ADJUST, 739 mrec->leaf.base.delete_tid, 740 mrec->leaf.delete_ts, 741 1, NULL); 742 cursor->flags |= HAMMER_CURSOR_ATEDISK; 743 return(error); 744 } 745 746 /* 747 * Write out a new record. 748 */ 749 static 750 int 751 hammer_mirror_write(hammer_cursor_t cursor, 752 struct hammer_ioc_mrecord_rec *mrec, 753 char *udata) 754 { 755 hammer_transaction_t trans; 756 hammer_buffer_t data_buffer; 757 hammer_off_t ndata_offset; 758 hammer_tid_t high_tid; 759 void *ndata; 760 int error; 761 int doprop; 762 763 trans = cursor->trans; 764 data_buffer = NULL; 765 766 /* 767 * Get the sync lock so the whole mess is atomic 768 */ 769 hammer_sync_lock_sh(trans); 770 771 /* 772 * Allocate and adjust data 773 */ 774 if (mrec->leaf.data_len && mrec->leaf.data_offset) { 775 ndata = hammer_alloc_data(trans, mrec->leaf.data_len, 776 mrec->leaf.base.rec_type, 777 &ndata_offset, &data_buffer, &error); 778 if (ndata == NULL) 779 return(error); 780 mrec->leaf.data_offset = ndata_offset; 781 hammer_modify_buffer(trans, data_buffer, NULL, 0); 782 error = copyin(udata, ndata, mrec->leaf.data_len); 783 if (error == 0) { 784 if (hammer_crc_test_leaf(ndata, &mrec->leaf) == 0) { 785 kprintf("data crc mismatch on pipe\n"); 786 error = EINVAL; 787 } else { 788 error = hammer_mirror_localize_data( 789 ndata, &mrec->leaf); 790 } 791 } 792 hammer_modify_buffer_done(data_buffer); 793 } else { 794 mrec->leaf.data_offset = 0; 795 error = 0; 796 ndata = NULL; 797 } 798 if (error) 799 goto failed; 800 801 /* 802 * Do the insertion. This can fail with a EDEADLK or EALREADY 803 */ 804 cursor->flags |= HAMMER_CURSOR_INSERT; 805 error = hammer_btree_lookup(cursor); 806 if (error != ENOENT) { 807 if (error == 0) 808 error = EALREADY; 809 goto failed; 810 } 811 812 error = hammer_btree_insert(cursor, &mrec->leaf, &doprop); 813 814 /* 815 * Cursor is left on the current element, we want to skip it now. 816 */ 817 cursor->flags |= HAMMER_CURSOR_ATEDISK; 818 cursor->flags &= ~HAMMER_CURSOR_INSERT; 819 820 /* 821 * Track a count of active inodes. 822 */ 823 if (error == 0 && 824 mrec->leaf.base.rec_type == HAMMER_RECTYPE_INODE && 825 mrec->leaf.base.delete_tid == 0) { 826 hammer_modify_volume_field(trans, 827 trans->rootvol, 828 vol0_stat_inodes); 829 ++trans->hmp->rootvol->ondisk->vol0_stat_inodes; 830 hammer_modify_volume_done(trans->rootvol); 831 } 832 833 /* 834 * vol0_next_tid must track the highest TID stored in the filesystem. 835 * We do not need to generate undo for this update. 836 */ 837 high_tid = mrec->leaf.base.create_tid; 838 if (high_tid < mrec->leaf.base.delete_tid) 839 high_tid = mrec->leaf.base.delete_tid; 840 if (trans->rootvol->ondisk->vol0_next_tid < high_tid) { 841 hammer_modify_volume(trans, trans->rootvol, NULL, 0); 842 trans->rootvol->ondisk->vol0_next_tid = high_tid; 843 hammer_modify_volume_done(trans->rootvol); 844 } 845 846 if (error == 0 && doprop) 847 hammer_btree_do_propagation(cursor, NULL, &mrec->leaf); 848 849 failed: 850 /* 851 * Cleanup 852 */ 853 if (error && mrec->leaf.data_offset) { 854 hammer_blockmap_free(cursor->trans, 855 mrec->leaf.data_offset, 856 mrec->leaf.data_len); 857 } 858 hammer_sync_unlock(trans); 859 if (data_buffer) 860 hammer_rel_buffer(data_buffer, 0); 861 return(error); 862 } 863 864 /* 865 * Localize the data payload. Directory entries may need their 866 * localization adjusted. 867 * 868 * PFS directory entries must be skipped entirely (return EALREADY). 869 */ 870 static 871 int 872 hammer_mirror_localize_data(hammer_data_ondisk_t data, 873 hammer_btree_leaf_elm_t leaf) 874 { 875 u_int32_t localization; 876 877 if (leaf->base.rec_type == HAMMER_RECTYPE_DIRENTRY) { 878 if (data->entry.obj_id == HAMMER_OBJID_ROOT) 879 return(EALREADY); 880 localization = leaf->base.localization & 881 HAMMER_LOCALIZE_PSEUDOFS_MASK; 882 if (data->entry.localization != localization) { 883 data->entry.localization = localization; 884 hammer_crc_set_leaf(data, leaf); 885 } 886 } 887 return(0); 888 } 889 890