1 /* 2 * Copyright (c) 2015-2018 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 /* 35 * This module implements the hammer2 helper thread API, including 36 * the frontend/backend XOP API. 37 */ 38 #include "hammer2.h" 39 40 #define H2XOPDESCRIPTOR(label) \ 41 hammer2_xop_desc_t hammer2_##label##_desc = { \ 42 .storage_func = hammer2_xop_##label, \ 43 .id = #label \ 44 } 45 46 H2XOPDESCRIPTOR(ipcluster); 47 H2XOPDESCRIPTOR(readdir); 48 H2XOPDESCRIPTOR(nresolve); 49 H2XOPDESCRIPTOR(unlink); 50 H2XOPDESCRIPTOR(nrename); 51 H2XOPDESCRIPTOR(scanlhc); 52 H2XOPDESCRIPTOR(scanall); 53 H2XOPDESCRIPTOR(lookup); 54 H2XOPDESCRIPTOR(delete); 55 H2XOPDESCRIPTOR(inode_mkdirent); 56 H2XOPDESCRIPTOR(inode_create); 57 H2XOPDESCRIPTOR(inode_create_det); 58 H2XOPDESCRIPTOR(inode_create_ins); 59 H2XOPDESCRIPTOR(inode_destroy); 60 H2XOPDESCRIPTOR(inode_chain_sync); 61 H2XOPDESCRIPTOR(inode_unlinkall); 62 H2XOPDESCRIPTOR(inode_connect); 63 H2XOPDESCRIPTOR(inode_flush); 64 H2XOPDESCRIPTOR(strategy_read); 65 H2XOPDESCRIPTOR(strategy_write); 66 67 struct objcache *cache_xops; 68 69 /* 70 * Set flags and wakeup any waiters. 71 * 72 * WARNING! During teardown (thr) can disappear the instant our cmpset 73 * succeeds. 74 */ 75 void 76 hammer2_thr_signal(hammer2_thread_t *thr, uint32_t flags) 77 { 78 uint32_t oflags; 79 uint32_t nflags; 80 81 for (;;) { 82 oflags = thr->flags; 83 cpu_ccfence(); 84 nflags = (oflags | flags) & ~HAMMER2_THREAD_WAITING; 85 86 if (oflags & HAMMER2_THREAD_WAITING) { 87 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 88 wakeup(&thr->flags); 89 break; 90 } 91 } else { 92 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 93 break; 94 } 95 } 96 } 97 98 /* 99 * Set and clear flags and wakeup any waiters. 100 * 101 * WARNING! During teardown (thr) can disappear the instant our cmpset 102 * succeeds. 103 */ 104 void 105 hammer2_thr_signal2(hammer2_thread_t *thr, uint32_t posflags, uint32_t negflags) 106 { 107 uint32_t oflags; 108 uint32_t nflags; 109 110 for (;;) { 111 oflags = thr->flags; 112 cpu_ccfence(); 113 nflags = (oflags | posflags) & 114 ~(negflags | HAMMER2_THREAD_WAITING); 115 if (oflags & HAMMER2_THREAD_WAITING) { 116 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 117 wakeup(&thr->flags); 118 break; 119 } 120 } else { 121 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 122 break; 123 } 124 } 125 } 126 127 /* 128 * Wait until all the bits in flags are set. 129 * 130 * WARNING! During teardown (thr) can disappear the instant our cmpset 131 * succeeds. 132 */ 133 void 134 hammer2_thr_wait(hammer2_thread_t *thr, uint32_t flags) 135 { 136 uint32_t oflags; 137 uint32_t nflags; 138 139 for (;;) { 140 oflags = thr->flags; 141 cpu_ccfence(); 142 if ((oflags & flags) == flags) 143 break; 144 nflags = oflags | HAMMER2_THREAD_WAITING; 145 tsleep_interlock(&thr->flags, 0); 146 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 147 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 148 } 149 } 150 } 151 152 /* 153 * Wait until any of the bits in flags are set, with timeout. 154 * 155 * WARNING! During teardown (thr) can disappear the instant our cmpset 156 * succeeds. 157 */ 158 int 159 hammer2_thr_wait_any(hammer2_thread_t *thr, uint32_t flags, int timo) 160 { 161 uint32_t oflags; 162 uint32_t nflags; 163 int error; 164 165 error = 0; 166 for (;;) { 167 oflags = thr->flags; 168 cpu_ccfence(); 169 if (oflags & flags) 170 break; 171 nflags = oflags | HAMMER2_THREAD_WAITING; 172 tsleep_interlock(&thr->flags, 0); 173 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 174 error = tsleep(&thr->flags, PINTERLOCKED, 175 "h2twait", timo); 176 } 177 if (error == ETIMEDOUT) { 178 error = HAMMER2_ERROR_ETIMEDOUT; 179 break; 180 } 181 } 182 return error; 183 } 184 185 /* 186 * Wait until the bits in flags are clear. 187 * 188 * WARNING! During teardown (thr) can disappear the instant our cmpset 189 * succeeds. 190 */ 191 void 192 hammer2_thr_wait_neg(hammer2_thread_t *thr, uint32_t flags) 193 { 194 uint32_t oflags; 195 uint32_t nflags; 196 197 for (;;) { 198 oflags = thr->flags; 199 cpu_ccfence(); 200 if ((oflags & flags) == 0) 201 break; 202 nflags = oflags | HAMMER2_THREAD_WAITING; 203 tsleep_interlock(&thr->flags, 0); 204 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 205 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 206 } 207 } 208 } 209 210 /* 211 * Initialize the supplied thread structure, starting the specified 212 * thread. 213 * 214 * NOTE: thr structure can be retained across mounts and unmounts for this 215 * pmp, so make sure the flags are in a sane state. 216 */ 217 void 218 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp, 219 hammer2_dev_t *hmp, 220 const char *id, int clindex, int repidx, 221 void (*func)(void *arg)) 222 { 223 thr->pmp = pmp; /* xop helpers */ 224 thr->hmp = hmp; /* bulkfree */ 225 thr->clindex = clindex; 226 thr->repidx = repidx; 227 TAILQ_INIT(&thr->xopq); 228 atomic_clear_int(&thr->flags, HAMMER2_THREAD_STOP | 229 HAMMER2_THREAD_STOPPED | 230 HAMMER2_THREAD_FREEZE | 231 HAMMER2_THREAD_FROZEN); 232 if (thr->scratch == NULL) 233 thr->scratch = kmalloc(MAXPHYS, M_HAMMER2, M_WAITOK | M_ZERO); 234 if (repidx >= 0) { 235 lwkt_create(func, thr, &thr->td, NULL, 0, repidx % ncpus, 236 "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx); 237 } else if (pmp) { 238 lwkt_create(func, thr, &thr->td, NULL, 0, -1, 239 "%s-%s", id, pmp->pfs_names[clindex]); 240 } else { 241 lwkt_create(func, thr, &thr->td, NULL, 0, -1, "%s", id); 242 } 243 } 244 245 /* 246 * Terminate a thread. This function will silently return if the thread 247 * was never initialized or has already been deleted. 248 * 249 * This is accomplished by setting the STOP flag and waiting for the td 250 * structure to become NULL. 251 */ 252 void 253 hammer2_thr_delete(hammer2_thread_t *thr) 254 { 255 if (thr->td == NULL) 256 return; 257 hammer2_thr_signal(thr, HAMMER2_THREAD_STOP); 258 hammer2_thr_wait(thr, HAMMER2_THREAD_STOPPED); 259 thr->pmp = NULL; 260 if (thr->scratch) { 261 kfree(thr->scratch, M_HAMMER2); 262 thr->scratch = NULL; 263 } 264 KKASSERT(TAILQ_EMPTY(&thr->xopq)); 265 } 266 267 /* 268 * Asynchronous remaster request. Ask the synchronization thread to 269 * start over soon (as if it were frozen and unfrozen, but without waiting). 270 * The thread always recalculates mastership relationships when restarting. 271 */ 272 void 273 hammer2_thr_remaster(hammer2_thread_t *thr) 274 { 275 if (thr->td == NULL) 276 return; 277 hammer2_thr_signal(thr, HAMMER2_THREAD_REMASTER); 278 } 279 280 void 281 hammer2_thr_freeze_async(hammer2_thread_t *thr) 282 { 283 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 284 } 285 286 void 287 hammer2_thr_freeze(hammer2_thread_t *thr) 288 { 289 if (thr->td == NULL) 290 return; 291 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 292 hammer2_thr_wait(thr, HAMMER2_THREAD_FROZEN); 293 } 294 295 void 296 hammer2_thr_unfreeze(hammer2_thread_t *thr) 297 { 298 if (thr->td == NULL) 299 return; 300 hammer2_thr_signal(thr, HAMMER2_THREAD_UNFREEZE); 301 hammer2_thr_wait_neg(thr, HAMMER2_THREAD_FROZEN); 302 } 303 304 int 305 hammer2_thr_break(hammer2_thread_t *thr) 306 { 307 if (thr->flags & (HAMMER2_THREAD_STOP | 308 HAMMER2_THREAD_REMASTER | 309 HAMMER2_THREAD_FREEZE)) { 310 return 1; 311 } 312 return 0; 313 } 314 315 /**************************************************************************** 316 * HAMMER2 XOPS API * 317 ****************************************************************************/ 318 319 /* 320 * Allocate a XOP request. 321 * 322 * Once allocated a XOP request can be started, collected, and retired, 323 * and can be retired early if desired. 324 * 325 * NOTE: Fifo indices might not be zero but ri == wi on objcache_get(). 326 */ 327 void * 328 hammer2_xop_alloc(hammer2_inode_t *ip, int flags) 329 { 330 hammer2_xop_t *xop; 331 332 xop = objcache_get(cache_xops, M_WAITOK); 333 KKASSERT(xop->head.cluster.array[0].chain == NULL); 334 335 xop->head.ip1 = ip; 336 xop->head.desc = NULL; 337 xop->head.flags = flags; 338 xop->head.state = 0; 339 xop->head.error = 0; 340 xop->head.collect_key = 0; 341 xop->head.focus_dio = NULL; 342 343 if (flags & HAMMER2_XOP_MODIFYING) 344 xop->head.mtid = hammer2_trans_sub(ip->pmp); 345 else 346 xop->head.mtid = 0; 347 348 xop->head.cluster.nchains = ip->cluster.nchains; 349 xop->head.cluster.pmp = ip->pmp; 350 xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED; 351 352 /* 353 * run_mask - Active thread (or frontend) associated with XOP 354 */ 355 xop->head.run_mask = HAMMER2_XOPMASK_VOP; 356 357 hammer2_inode_ref(ip); 358 359 return xop; 360 } 361 362 void 363 hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len) 364 { 365 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 366 xop->name1_len = name_len; 367 bcopy(name, xop->name1, name_len); 368 } 369 370 void 371 hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len) 372 { 373 xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 374 xop->name2_len = name_len; 375 bcopy(name, xop->name2, name_len); 376 } 377 378 size_t 379 hammer2_xop_setname_inum(hammer2_xop_head_t *xop, hammer2_key_t inum) 380 { 381 const size_t name_len = 18; 382 383 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 384 xop->name1_len = name_len; 385 ksnprintf(xop->name1, name_len + 1, "0x%016jx", (intmax_t)inum); 386 387 return name_len; 388 } 389 390 391 void 392 hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2) 393 { 394 xop->ip2 = ip2; 395 hammer2_inode_ref(ip2); 396 } 397 398 void 399 hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3) 400 { 401 xop->ip3 = ip3; 402 hammer2_inode_ref(ip3); 403 } 404 405 void 406 hammer2_xop_setip4(hammer2_xop_head_t *xop, hammer2_inode_t *ip4) 407 { 408 xop->ip4 = ip4; 409 hammer2_inode_ref(ip4); 410 } 411 412 void 413 hammer2_xop_reinit(hammer2_xop_head_t *xop) 414 { 415 xop->state = 0; 416 xop->error = 0; 417 xop->collect_key = 0; 418 xop->run_mask = HAMMER2_XOPMASK_VOP; 419 } 420 421 /* 422 * A mounted PFS needs Xops threads to support frontend operations. 423 */ 424 void 425 hammer2_xop_helper_create(hammer2_pfs_t *pmp) 426 { 427 int i; 428 int j; 429 430 lockmgr(&pmp->lock, LK_EXCLUSIVE); 431 pmp->has_xop_threads = 1; 432 433 pmp->xop_groups = kmalloc(hammer2_xop_nthreads * 434 sizeof(hammer2_xop_group_t), 435 M_HAMMER2, M_WAITOK | M_ZERO); 436 for (i = 0; i < pmp->iroot->cluster.nchains; ++i) { 437 for (j = 0; j < hammer2_xop_nthreads; ++j) { 438 if (pmp->xop_groups[j].thrs[i].td) 439 continue; 440 hammer2_thr_create(&pmp->xop_groups[j].thrs[i], 441 pmp, NULL, 442 "h2xop", i, j, 443 hammer2_primary_xops_thread); 444 } 445 } 446 lockmgr(&pmp->lock, LK_RELEASE); 447 } 448 449 void 450 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp) 451 { 452 int i; 453 int j; 454 455 if (pmp->xop_groups == NULL) { 456 KKASSERT(pmp->has_xop_threads == 0); 457 return; 458 } 459 460 for (i = 0; i < pmp->pfs_nmasters; ++i) { 461 for (j = 0; j < hammer2_xop_nthreads; ++j) { 462 if (pmp->xop_groups[j].thrs[i].td) 463 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]); 464 } 465 } 466 pmp->has_xop_threads = 0; 467 kfree(pmp->xop_groups, M_HAMMER2); 468 pmp->xop_groups = NULL; 469 } 470 471 /* 472 * Start a XOP request, queueing it to all nodes in the cluster to 473 * execute the cluster op. 474 * 475 * XXX optimize single-target case. 476 */ 477 void 478 hammer2_xop_start_except(hammer2_xop_head_t *xop, hammer2_xop_desc_t *desc, 479 int notidx) 480 { 481 hammer2_inode_t *ip1; 482 hammer2_pfs_t *pmp; 483 hammer2_thread_t *thr; 484 int i; 485 int ng; 486 int nchains; 487 488 ip1 = xop->ip1; 489 pmp = ip1->pmp; 490 if (pmp->has_xop_threads == 0) 491 hammer2_xop_helper_create(pmp); 492 493 /* 494 * The sequencer assigns a worker thread to the XOP. 495 * 496 * (1) The worker threads are partitioned into two sets, one for 497 * NON-STRATEGY XOPs, and the other for STRATEGY XOPs. This 498 * guarantees that strategy calls will always be able to make 499 * progress and will not deadlock against non-strategy calls. 500 * 501 * (2) If clustered, non-strategy operations to the same inode must 502 * be serialized. This is to avoid confusion when issuing 503 * modifying operations because a XOP completes the instant a 504 * quorum is reached. 505 * 506 * TODO - RENAME fails here because it is potentially modifying 507 * three different inodes, but we triple-lock the inodes 508 * involved so it shouldn't create a sequencing schism. 509 */ 510 if (xop->flags & HAMMER2_XOP_STRATEGY) { 511 /* 512 * Use worker space 0 associated with the current cpu 513 * for strategy ops. 514 */ 515 hammer2_xop_strategy_t *xopst; 516 u_int which; 517 518 xopst = &((hammer2_xop_t *)xop)->xop_strategy; 519 which = ((unsigned int)ip1->ihash + 520 ((unsigned int)xopst->lbase >> HAMMER2_PBUFRADIX)) % 521 hammer2_xop_sgroups; 522 ng = mycpu->gd_cpuid % hammer2_xop_mod + 523 hammer2_xop_mod * which; 524 } else if (hammer2_spread_workers == 0 && ip1->cluster.nchains == 1) { 525 /* 526 * For now try to keep the work on the same cpu to reduce 527 * IPI overhead. Several threads are assigned to each cpu, 528 * don't be very smart and select the one to use based on 529 * the inode hash. 530 */ 531 u_int which; 532 533 which = (unsigned int)ip1->ihash % hammer2_xop_xgroups; 534 ng = mycpu->gd_cpuid % hammer2_xop_mod + 535 (which * hammer2_xop_mod) + 536 hammer2_xop_xbase; 537 } else { 538 /* 539 * Hash based on inode only, must serialize inode to same 540 * thread regardless of current cpu. 541 */ 542 ng = (unsigned int)ip1->ihash % 543 (hammer2_xop_mod * hammer2_xop_xgroups) + 544 hammer2_xop_xbase; 545 } 546 xop->desc = desc; 547 548 /* 549 * The instant xop is queued another thread can pick it off. In the 550 * case of asynchronous ops, another thread might even finish and 551 * deallocate it. 552 */ 553 hammer2_spin_ex(&pmp->xop_spin); 554 nchains = ip1->cluster.nchains; 555 for (i = 0; i < nchains; ++i) { 556 /* 557 * XXX ip1->cluster.array* not stable here. This temporary 558 * hack fixes basic issues in target XOPs which need to 559 * obtain a starting chain from the inode but does not 560 * address possible races against inode updates which 561 * might NULL-out a chain. 562 */ 563 if (i != notidx && ip1->cluster.array[i].chain) { 564 thr = &pmp->xop_groups[ng].thrs[i]; 565 atomic_set_64(&xop->run_mask, 1LLU << i); 566 atomic_set_64(&xop->chk_mask, 1LLU << i); 567 xop->collect[i].thr = thr; 568 TAILQ_INSERT_TAIL(&thr->xopq, xop, collect[i].entry); 569 } 570 } 571 hammer2_spin_unex(&pmp->xop_spin); 572 /* xop can become invalid at this point */ 573 574 /* 575 * Each thread has its own xopq 576 */ 577 for (i = 0; i < nchains; ++i) { 578 if (i != notidx) { 579 thr = &pmp->xop_groups[ng].thrs[i]; 580 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 581 } 582 } 583 } 584 585 void 586 hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_desc_t *desc) 587 { 588 hammer2_xop_start_except(xop, desc, -1); 589 } 590 591 /* 592 * Retire a XOP. Used by both the VOP frontend and by the XOP backend. 593 */ 594 void 595 hammer2_xop_retire(hammer2_xop_head_t *xop, uint64_t mask) 596 { 597 hammer2_chain_t *chain; 598 uint64_t nmask; 599 int i; 600 601 /* 602 * Remove the frontend collector or remove a backend feeder. 603 * 604 * When removing the frontend we must wakeup any backend feeders 605 * who are waiting for FIFO space. 606 * 607 * When removing the last backend feeder we must wakeup any waiting 608 * frontend. 609 */ 610 KKASSERT(xop->run_mask & mask); 611 nmask = atomic_fetchadd_64(&xop->run_mask, 612 -mask + HAMMER2_XOPMASK_FEED); 613 614 /* 615 * More than one entity left 616 */ 617 if ((nmask & HAMMER2_XOPMASK_ALLDONE) != mask) { 618 /* 619 * Frontend terminating, wakeup any backends waiting on 620 * fifo full. 621 * 622 * NOTE!!! The xop can get ripped out from under us at 623 * this point, so do not reference it again. 624 * The wakeup(xop) doesn't touch the xop and 625 * is ok. 626 */ 627 if (mask == HAMMER2_XOPMASK_VOP) { 628 if (nmask & HAMMER2_XOPMASK_FIFOW) 629 wakeup(xop); 630 } 631 632 /* 633 * Wakeup frontend if the last backend is terminating. 634 */ 635 nmask -= mask; 636 if ((nmask & HAMMER2_XOPMASK_ALLDONE) == HAMMER2_XOPMASK_VOP) { 637 if (nmask & HAMMER2_XOPMASK_WAIT) 638 wakeup(xop); 639 } 640 641 return; 642 } 643 /* else nobody else left, we can ignore FIFOW */ 644 645 /* 646 * All collectors are gone, we can cleanup and dispose of the XOP. 647 * Note that this can wind up being a frontend OR a backend. 648 * Pending chains are locked shared and not owned by any thread. 649 */ 650 651 /* 652 * Cleanup the xop's cluster. If there is an inode reference, 653 * cache the cluster chains in the inode to improve performance, 654 * preventing them from recursively destroying the chain recursion. 655 * 656 * Note that ip->ccache[i] does NOT necessarily represent usable 657 * chains or chains that are related to the inode. The chains are 658 * simply held to prevent bottom-up lastdrop destruction of 659 * potentially valuable resolved chain data. 660 */ 661 if (xop->ip1) { 662 /* 663 * Cache cluster chains in a convenient inode. The chains 664 * are cache ref'd but not held. The inode simply serves 665 * as a place to cache the chains to prevent the chains 666 * from being cleaned up. 667 */ 668 hammer2_chain_t *dropch[HAMMER2_MAXCLUSTER]; 669 hammer2_inode_t *ip; 670 int prior_nchains; 671 672 ip = xop->ip1; 673 hammer2_spin_ex(&ip->cluster_spin); 674 prior_nchains = ip->ccache_nchains; 675 for (i = 0; i < prior_nchains; ++i) { 676 dropch[i] = ip->ccache[i].chain; 677 ip->ccache[i].chain = NULL; 678 } 679 for (i = 0; i < xop->cluster.nchains; ++i) { 680 ip->ccache[i] = xop->cluster.array[i]; 681 if (ip->ccache[i].chain) 682 hammer2_chain_ref(ip->ccache[i].chain); 683 } 684 ip->ccache_nchains = i; 685 hammer2_spin_unex(&ip->cluster_spin); 686 687 /* 688 * Drop prior cache 689 */ 690 for (i = 0; i < prior_nchains; ++i) { 691 chain = dropch[i]; 692 if (chain) 693 hammer2_chain_drop(chain); 694 } 695 } 696 697 /* 698 * Drop and unhold chains in xop cluster 699 */ 700 for (i = 0; i < xop->cluster.nchains; ++i) { 701 xop->cluster.array[i].flags = 0; 702 chain = xop->cluster.array[i].chain; 703 if (chain) { 704 xop->cluster.array[i].chain = NULL; 705 hammer2_chain_drop_unhold(chain); 706 } 707 } 708 709 /* 710 * Cleanup the fifos. Since we are the only entity left on this 711 * xop we don't have to worry about fifo flow control, and one 712 * lfence() will do the job. 713 */ 714 cpu_lfence(); 715 mask = xop->chk_mask; 716 for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) { 717 hammer2_xop_fifo_t *fifo = &xop->collect[i]; 718 while (fifo->ri != fifo->wi) { 719 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 720 if (chain) 721 hammer2_chain_drop_unhold(chain); 722 ++fifo->ri; 723 } 724 mask &= ~(1U << i); 725 } 726 727 /* 728 * The inode is only held at this point, simply drop it. 729 */ 730 if (xop->ip1) { 731 hammer2_inode_drop(xop->ip1); 732 xop->ip1 = NULL; 733 } 734 if (xop->ip2) { 735 hammer2_inode_drop(xop->ip2); 736 xop->ip2 = NULL; 737 } 738 if (xop->ip3) { 739 hammer2_inode_drop(xop->ip3); 740 xop->ip3 = NULL; 741 } 742 if (xop->ip4) { 743 hammer2_inode_drop(xop->ip4); 744 xop->ip4 = NULL; 745 } 746 if (xop->name1) { 747 kfree(xop->name1, M_HAMMER2); 748 xop->name1 = NULL; 749 xop->name1_len = 0; 750 } 751 if (xop->name2) { 752 kfree(xop->name2, M_HAMMER2); 753 xop->name2 = NULL; 754 xop->name2_len = 0; 755 } 756 757 objcache_put(cache_xops, xop); 758 } 759 760 /* 761 * (Backend) Returns non-zero if the frontend is still attached. 762 */ 763 int 764 hammer2_xop_active(hammer2_xop_head_t *xop) 765 { 766 if (xop->run_mask & HAMMER2_XOPMASK_VOP) 767 return 1; 768 else 769 return 0; 770 } 771 772 /* 773 * (Backend) Feed chain data through the cluster validator and back to 774 * the frontend. Chains are fed from multiple nodes concurrently 775 * and pipelined via per-node FIFOs in the XOP. 776 * 777 * The chain must be locked (either shared or exclusive). The caller may 778 * unlock and drop the chain on return. This function will add an extra 779 * ref and hold the chain's data for the pass-back. 780 * 781 * No xop lock is needed because we are only manipulating fields under 782 * our direct control. 783 * 784 * Returns 0 on success and a hammer2 error code if sync is permanently 785 * lost. The caller retains a ref on the chain but by convention 786 * the lock is typically inherited by the xop (caller loses lock). 787 * 788 * Returns non-zero on error. In this situation the caller retains a 789 * ref on the chain but loses the lock (we unlock here). 790 */ 791 int 792 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain, 793 int clindex, int error) 794 { 795 hammer2_xop_fifo_t *fifo; 796 uint64_t mask; 797 798 /* 799 * Early termination (typicaly of xop_readir) 800 */ 801 if (hammer2_xop_active(xop) == 0) { 802 error = HAMMER2_ERROR_ABORTED; 803 goto done; 804 } 805 806 /* 807 * Multi-threaded entry into the XOP collector. We own the 808 * fifo->wi for our clindex. 809 */ 810 fifo = &xop->collect[clindex]; 811 812 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) 813 lwkt_yield(); 814 while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 815 atomic_set_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 816 mask = xop->run_mask; 817 if ((mask & HAMMER2_XOPMASK_VOP) == 0) { 818 error = HAMMER2_ERROR_ABORTED; 819 goto done; 820 } 821 tsleep_interlock(xop, 0); 822 if (atomic_cmpset_64(&xop->run_mask, mask, 823 mask | HAMMER2_XOPMASK_FIFOW)) { 824 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 825 tsleep(xop, PINTERLOCKED, "h2feed", hz*60); 826 } 827 } 828 /* retry */ 829 } 830 atomic_clear_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 831 if (chain) 832 hammer2_chain_ref_hold(chain); 833 if (error == 0 && chain) 834 error = chain->error; 835 fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error; 836 fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain; 837 cpu_sfence(); 838 ++fifo->wi; 839 840 mask = atomic_fetchadd_64(&xop->run_mask, HAMMER2_XOPMASK_FEED); 841 if (mask & HAMMER2_XOPMASK_WAIT) { 842 atomic_clear_64(&xop->run_mask, HAMMER2_XOPMASK_WAIT); 843 wakeup(xop); 844 } 845 error = 0; 846 847 /* 848 * Cleanup. If no error 849 * occurred the fifo inherits the lock and gains an additional ref. 850 * 851 * The caller's ref remains in both cases. 852 */ 853 done: 854 return error; 855 } 856 857 /* 858 * (Frontend) collect a response from a running cluster op. 859 * 860 * Responses are fed from all appropriate nodes concurrently 861 * and collected into a cohesive response >= collect_key. 862 * 863 * The collector will return the instant quorum or other requirements 864 * are met, even if some nodes get behind or become non-responsive. 865 * 866 * HAMMER2_XOP_COLLECT_NOWAIT - Used to 'poll' a completed collection, 867 * usually called synchronously from the 868 * node XOPs for the strategy code to 869 * fake the frontend collection and complete 870 * the BIO as soon as possible. 871 * 872 * Returns 0 on success plus a filled out xop->cluster structure. 873 * Return ENOENT on normal termination. 874 * Otherwise return an error. 875 * 876 * WARNING! If the xop returns a cluster with a non-NULL focus, note that 877 * none of the chains in the cluster (or the focus) are either 878 * locked or I/O synchronized with the cpu. hammer2_xop_gdata() 879 * and hammer2_xop_pdata() must be used to safely access the focus 880 * chain's content. 881 * 882 * The frontend can make certain assumptions based on higher-level 883 * locking done by the frontend, but data integrity absolutely 884 * requires using the gdata/pdata API. 885 */ 886 int 887 hammer2_xop_collect(hammer2_xop_head_t *xop, int flags) 888 { 889 hammer2_xop_fifo_t *fifo; 890 hammer2_chain_t *chain; 891 hammer2_key_t lokey; 892 uint64_t mask; 893 int error; 894 int keynull; 895 int adv; /* advance the element */ 896 int i; 897 898 loop: 899 /* 900 * First loop tries to advance pieces of the cluster which 901 * are out of sync. 902 */ 903 lokey = HAMMER2_KEY_MAX; 904 keynull = HAMMER2_CHECK_NULL; 905 mask = xop->run_mask; 906 cpu_lfence(); 907 908 for (i = 0; i < xop->cluster.nchains; ++i) { 909 chain = xop->cluster.array[i].chain; 910 if (chain == NULL) { 911 adv = 1; 912 } else if (chain->bref.key < xop->collect_key) { 913 adv = 1; 914 } else { 915 keynull &= ~HAMMER2_CHECK_NULL; 916 if (lokey > chain->bref.key) 917 lokey = chain->bref.key; 918 adv = 0; 919 } 920 if (adv == 0) 921 continue; 922 923 /* 924 * Advance element if possible, advanced element may be NULL. 925 */ 926 if (chain) 927 hammer2_chain_drop_unhold(chain); 928 929 fifo = &xop->collect[i]; 930 if (fifo->ri != fifo->wi) { 931 cpu_lfence(); 932 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 933 error = fifo->errors[fifo->ri & HAMMER2_XOPFIFO_MASK]; 934 ++fifo->ri; 935 xop->cluster.array[i].chain = chain; 936 xop->cluster.array[i].error = error; 937 if (chain == NULL) { 938 /* XXX */ 939 xop->cluster.array[i].flags |= 940 HAMMER2_CITEM_NULL; 941 } 942 if (fifo->wi - fifo->ri <= HAMMER2_XOPFIFO / 2) { 943 if (fifo->flags & HAMMER2_XOP_FIFO_STALL) { 944 atomic_clear_int(&fifo->flags, 945 HAMMER2_XOP_FIFO_STALL); 946 wakeup(xop); 947 lwkt_yield(); 948 } 949 } 950 --i; /* loop on same index */ 951 } else { 952 /* 953 * Retain CITEM_NULL flag. If set just repeat EOF. 954 * If not, the NULL,0 combination indicates an 955 * operation in-progress. 956 */ 957 xop->cluster.array[i].chain = NULL; 958 /* retain any CITEM_NULL setting */ 959 } 960 } 961 962 /* 963 * Determine whether the lowest collected key meets clustering 964 * requirements. Returns HAMMER2_ERROR_*: 965 * 966 * 0 - key valid, cluster can be returned. 967 * 968 * ENOENT - normal end of scan, return ENOENT. 969 * 970 * ESRCH - sufficient elements collected, quorum agreement 971 * that lokey is not a valid element and should be 972 * skipped. 973 * 974 * EDEADLK - sufficient elements collected, no quorum agreement 975 * (and no agreement possible). In this situation a 976 * repair is needed, for now we loop. 977 * 978 * EINPROGRESS - insufficient elements collected to resolve, wait 979 * for event and loop. 980 * 981 * EIO - IO error or CRC check error from hammer2_cluster_check() 982 */ 983 if ((flags & HAMMER2_XOP_COLLECT_WAITALL) && 984 (mask & HAMMER2_XOPMASK_ALLDONE) != HAMMER2_XOPMASK_VOP) { 985 error = HAMMER2_ERROR_EINPROGRESS; 986 } else { 987 error = hammer2_cluster_check(&xop->cluster, lokey, keynull); 988 } 989 if (error == HAMMER2_ERROR_EINPROGRESS) { 990 if (flags & HAMMER2_XOP_COLLECT_NOWAIT) 991 goto done; 992 tsleep_interlock(xop, 0); 993 if (atomic_cmpset_64(&xop->run_mask, 994 mask, mask | HAMMER2_XOPMASK_WAIT)) { 995 tsleep(xop, PINTERLOCKED, "h2coll", hz*60); 996 } 997 goto loop; 998 } 999 if (error == HAMMER2_ERROR_ESRCH) { 1000 if (lokey != HAMMER2_KEY_MAX) { 1001 xop->collect_key = lokey + 1; 1002 goto loop; 1003 } 1004 error = HAMMER2_ERROR_ENOENT; 1005 } 1006 if (error == HAMMER2_ERROR_EDEADLK) { 1007 kprintf("hammer2: no quorum possible lokey %016jx\n", 1008 lokey); 1009 if (lokey != HAMMER2_KEY_MAX) { 1010 xop->collect_key = lokey + 1; 1011 goto loop; 1012 } 1013 error = HAMMER2_ERROR_ENOENT; 1014 } 1015 if (lokey == HAMMER2_KEY_MAX) 1016 xop->collect_key = lokey; 1017 else 1018 xop->collect_key = lokey + 1; 1019 done: 1020 return error; 1021 } 1022 1023 /* 1024 * N x M processing threads are available to handle XOPs, N per cluster 1025 * index x M cluster nodes. 1026 * 1027 * Locate and return the next runnable xop, or NULL if no xops are 1028 * present or none of the xops are currently runnable (for various reasons). 1029 * The xop is left on the queue and serves to block other dependent xops 1030 * from being run. 1031 * 1032 * Dependent xops will not be returned. 1033 * 1034 * Sets HAMMER2_XOP_FIFO_RUN on the returned xop or returns NULL. 1035 * 1036 * NOTE! Xops run concurrently for each cluster index. 1037 */ 1038 #define XOP_HASH_SIZE 16 1039 #define XOP_HASH_MASK (XOP_HASH_SIZE - 1) 1040 1041 static __inline 1042 int 1043 xop_testhash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 1044 { 1045 uint32_t mask; 1046 int hv; 1047 1048 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 1049 mask = 1U << (hv & 31); 1050 hv >>= 5; 1051 1052 return ((int)(hash[hv & XOP_HASH_MASK] & mask)); 1053 } 1054 1055 static __inline 1056 void 1057 xop_sethash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 1058 { 1059 uint32_t mask; 1060 int hv; 1061 1062 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 1063 mask = 1U << (hv & 31); 1064 hv >>= 5; 1065 1066 hash[hv & XOP_HASH_MASK] |= mask; 1067 } 1068 1069 static 1070 hammer2_xop_head_t * 1071 hammer2_xop_next(hammer2_thread_t *thr) 1072 { 1073 hammer2_pfs_t *pmp = thr->pmp; 1074 int clindex = thr->clindex; 1075 uint32_t hash[XOP_HASH_SIZE] = { 0 }; 1076 hammer2_xop_head_t *xop; 1077 1078 hammer2_spin_ex(&pmp->xop_spin); 1079 TAILQ_FOREACH(xop, &thr->xopq, collect[clindex].entry) { 1080 /* 1081 * Check dependency 1082 */ 1083 if (xop_testhash(thr, xop->ip1, hash) || 1084 (xop->ip2 && xop_testhash(thr, xop->ip2, hash)) || 1085 (xop->ip3 && xop_testhash(thr, xop->ip3, hash)) || 1086 (xop->ip4 && xop_testhash(thr, xop->ip4, hash))) 1087 { 1088 continue; 1089 } 1090 xop_sethash(thr, xop->ip1, hash); 1091 if (xop->ip2) 1092 xop_sethash(thr, xop->ip2, hash); 1093 if (xop->ip3) 1094 xop_sethash(thr, xop->ip3, hash); 1095 if (xop->ip4) 1096 xop_sethash(thr, xop->ip4, hash); 1097 1098 /* 1099 * Check already running 1100 */ 1101 if (xop->collect[clindex].flags & HAMMER2_XOP_FIFO_RUN) 1102 continue; 1103 1104 /* 1105 * Found a good one, return it. 1106 */ 1107 atomic_set_int(&xop->collect[clindex].flags, 1108 HAMMER2_XOP_FIFO_RUN); 1109 break; 1110 } 1111 hammer2_spin_unex(&pmp->xop_spin); 1112 1113 return xop; 1114 } 1115 1116 /* 1117 * Remove the completed XOP from the queue, clear HAMMER2_XOP_FIFO_RUN. 1118 * 1119 * NOTE! Xops run concurrently for each cluster index. 1120 */ 1121 static 1122 void 1123 hammer2_xop_dequeue(hammer2_thread_t *thr, hammer2_xop_head_t *xop) 1124 { 1125 hammer2_pfs_t *pmp = thr->pmp; 1126 int clindex = thr->clindex; 1127 1128 hammer2_spin_ex(&pmp->xop_spin); 1129 TAILQ_REMOVE(&thr->xopq, xop, collect[clindex].entry); 1130 atomic_clear_int(&xop->collect[clindex].flags, 1131 HAMMER2_XOP_FIFO_RUN); 1132 hammer2_spin_unex(&pmp->xop_spin); 1133 if (TAILQ_FIRST(&thr->xopq)) 1134 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 1135 } 1136 1137 /* 1138 * Primary management thread for xops support. Each node has several such 1139 * threads which replicate front-end operations on cluster nodes. 1140 * 1141 * XOPS thread node operations, allowing the function to focus on a single 1142 * node in the cluster after validating the operation with the cluster. 1143 * This is primarily what prevents dead or stalled nodes from stalling 1144 * the front-end. 1145 */ 1146 void 1147 hammer2_primary_xops_thread(void *arg) 1148 { 1149 hammer2_thread_t *thr = arg; 1150 hammer2_xop_head_t *xop; 1151 uint64_t mask; 1152 uint32_t flags; 1153 uint32_t nflags; 1154 1155 mask = 1LLU << thr->clindex; 1156 1157 for (;;) { 1158 flags = thr->flags; 1159 1160 /* 1161 * Handle stop request 1162 */ 1163 if (flags & HAMMER2_THREAD_STOP) 1164 break; 1165 1166 /* 1167 * Handle freeze request 1168 */ 1169 if (flags & HAMMER2_THREAD_FREEZE) { 1170 hammer2_thr_signal2(thr, HAMMER2_THREAD_FROZEN, 1171 HAMMER2_THREAD_FREEZE); 1172 continue; 1173 } 1174 1175 if (flags & HAMMER2_THREAD_UNFREEZE) { 1176 hammer2_thr_signal2(thr, 0, 1177 HAMMER2_THREAD_FROZEN | 1178 HAMMER2_THREAD_UNFREEZE); 1179 continue; 1180 } 1181 1182 /* 1183 * Force idle if frozen until unfrozen or stopped. 1184 */ 1185 if (flags & HAMMER2_THREAD_FROZEN) { 1186 hammer2_thr_wait_any(thr, 1187 HAMMER2_THREAD_UNFREEZE | 1188 HAMMER2_THREAD_STOP, 1189 0); 1190 continue; 1191 } 1192 1193 /* 1194 * Reset state on REMASTER request 1195 */ 1196 if (flags & HAMMER2_THREAD_REMASTER) { 1197 hammer2_thr_signal2(thr, 0, HAMMER2_THREAD_REMASTER); 1198 /* reset state here */ 1199 continue; 1200 } 1201 1202 /* 1203 * Process requests. Each request can be multi-queued. 1204 * 1205 * If we get behind and the frontend VOP is no longer active, 1206 * we retire the request without processing it. The callback 1207 * may also abort processing if the frontend VOP becomes 1208 * inactive. 1209 */ 1210 if (flags & HAMMER2_THREAD_XOPQ) { 1211 nflags = flags & ~HAMMER2_THREAD_XOPQ; 1212 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 1213 continue; 1214 flags = nflags; 1215 /* fall through */ 1216 } 1217 while ((xop = hammer2_xop_next(thr)) != NULL) { 1218 if (hammer2_xop_active(xop)) { 1219 xop->desc->storage_func((hammer2_xop_t *)xop, 1220 thr->scratch, 1221 thr->clindex); 1222 hammer2_xop_dequeue(thr, xop); 1223 hammer2_xop_retire(xop, mask); 1224 } else { 1225 hammer2_xop_feed(xop, NULL, thr->clindex, 1226 ECONNABORTED); 1227 hammer2_xop_dequeue(thr, xop); 1228 hammer2_xop_retire(xop, mask); 1229 } 1230 } 1231 1232 /* 1233 * Wait for event, interlock using THREAD_WAITING and 1234 * THREAD_SIGNAL. 1235 * 1236 * For robustness poll on a 30-second interval, but nominally 1237 * expect to be woken up. 1238 */ 1239 nflags = flags | HAMMER2_THREAD_WAITING; 1240 1241 tsleep_interlock(&thr->flags, 0); 1242 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1243 tsleep(&thr->flags, PINTERLOCKED, "h2idle", hz*30); 1244 } 1245 } 1246 1247 #if 0 1248 /* 1249 * Cleanup / termination 1250 */ 1251 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) { 1252 kprintf("hammer2_thread: aborting xop %s\n", xop->desc->id); 1253 TAILQ_REMOVE(&thr->xopq, xop, 1254 collect[thr->clindex].entry); 1255 hammer2_xop_retire(xop, mask); 1256 } 1257 #endif 1258 thr->td = NULL; 1259 hammer2_thr_signal(thr, HAMMER2_THREAD_STOPPED); 1260 /* thr structure can go invalid after this point */ 1261 } 1262