1 /* 2 * Copyright (c) 2015-2018 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 /* 35 * This module implements the hammer2 helper thread API, including 36 * the frontend/backend XOP API. 37 */ 38 #include "hammer2.h" 39 40 #define H2XOPDESCRIPTOR(label) \ 41 hammer2_xop_desc_t hammer2_##label##_desc = { \ 42 .storage_func = hammer2_xop_##label, \ 43 .id = #label \ 44 } 45 46 H2XOPDESCRIPTOR(ipcluster); 47 H2XOPDESCRIPTOR(readdir); 48 H2XOPDESCRIPTOR(nresolve); 49 H2XOPDESCRIPTOR(unlink); 50 H2XOPDESCRIPTOR(nrename); 51 H2XOPDESCRIPTOR(scanlhc); 52 H2XOPDESCRIPTOR(scanall); 53 H2XOPDESCRIPTOR(lookup); 54 H2XOPDESCRIPTOR(delete); 55 H2XOPDESCRIPTOR(inode_mkdirent); 56 H2XOPDESCRIPTOR(inode_create); 57 H2XOPDESCRIPTOR(inode_create_det); 58 H2XOPDESCRIPTOR(inode_create_ins); 59 H2XOPDESCRIPTOR(inode_destroy); 60 H2XOPDESCRIPTOR(inode_chain_sync); 61 H2XOPDESCRIPTOR(inode_unlinkall); 62 H2XOPDESCRIPTOR(inode_connect); 63 H2XOPDESCRIPTOR(inode_flush); 64 H2XOPDESCRIPTOR(strategy_read); 65 H2XOPDESCRIPTOR(strategy_write); 66 67 /* 68 * Set flags and wakeup any waiters. 69 * 70 * WARNING! During teardown (thr) can disappear the instant our cmpset 71 * succeeds. 72 */ 73 void 74 hammer2_thr_signal(hammer2_thread_t *thr, uint32_t flags) 75 { 76 uint32_t oflags; 77 uint32_t nflags; 78 79 for (;;) { 80 oflags = thr->flags; 81 cpu_ccfence(); 82 nflags = (oflags | flags) & ~HAMMER2_THREAD_WAITING; 83 84 if (oflags & HAMMER2_THREAD_WAITING) { 85 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 86 wakeup(&thr->flags); 87 break; 88 } 89 } else { 90 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 91 break; 92 } 93 } 94 } 95 96 /* 97 * Set and clear flags and wakeup any waiters. 98 * 99 * WARNING! During teardown (thr) can disappear the instant our cmpset 100 * succeeds. 101 */ 102 void 103 hammer2_thr_signal2(hammer2_thread_t *thr, uint32_t posflags, uint32_t negflags) 104 { 105 uint32_t oflags; 106 uint32_t nflags; 107 108 for (;;) { 109 oflags = thr->flags; 110 cpu_ccfence(); 111 nflags = (oflags | posflags) & 112 ~(negflags | HAMMER2_THREAD_WAITING); 113 if (oflags & HAMMER2_THREAD_WAITING) { 114 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 115 wakeup(&thr->flags); 116 break; 117 } 118 } else { 119 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 120 break; 121 } 122 } 123 } 124 125 /* 126 * Wait until all the bits in flags are set. 127 * 128 * WARNING! During teardown (thr) can disappear the instant our cmpset 129 * succeeds. 130 */ 131 void 132 hammer2_thr_wait(hammer2_thread_t *thr, uint32_t flags) 133 { 134 uint32_t oflags; 135 uint32_t nflags; 136 137 for (;;) { 138 oflags = thr->flags; 139 cpu_ccfence(); 140 if ((oflags & flags) == flags) 141 break; 142 nflags = oflags | HAMMER2_THREAD_WAITING; 143 tsleep_interlock(&thr->flags, 0); 144 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 145 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 146 } 147 } 148 } 149 150 /* 151 * Wait until any of the bits in flags are set, with timeout. 152 * 153 * WARNING! During teardown (thr) can disappear the instant our cmpset 154 * succeeds. 155 */ 156 int 157 hammer2_thr_wait_any(hammer2_thread_t *thr, uint32_t flags, int timo) 158 { 159 uint32_t oflags; 160 uint32_t nflags; 161 int error; 162 163 error = 0; 164 for (;;) { 165 oflags = thr->flags; 166 cpu_ccfence(); 167 if (oflags & flags) 168 break; 169 nflags = oflags | HAMMER2_THREAD_WAITING; 170 tsleep_interlock(&thr->flags, 0); 171 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 172 error = tsleep(&thr->flags, PINTERLOCKED, 173 "h2twait", timo); 174 } 175 if (error == ETIMEDOUT) { 176 error = HAMMER2_ERROR_ETIMEDOUT; 177 break; 178 } 179 } 180 return error; 181 } 182 183 /* 184 * Wait until the bits in flags are clear. 185 * 186 * WARNING! During teardown (thr) can disappear the instant our cmpset 187 * succeeds. 188 */ 189 void 190 hammer2_thr_wait_neg(hammer2_thread_t *thr, uint32_t flags) 191 { 192 uint32_t oflags; 193 uint32_t nflags; 194 195 for (;;) { 196 oflags = thr->flags; 197 cpu_ccfence(); 198 if ((oflags & flags) == 0) 199 break; 200 nflags = oflags | HAMMER2_THREAD_WAITING; 201 tsleep_interlock(&thr->flags, 0); 202 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 203 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 204 } 205 } 206 } 207 208 /* 209 * Initialize the supplied thread structure, starting the specified 210 * thread. 211 * 212 * NOTE: thr structure can be retained across mounts and unmounts for this 213 * pmp, so make sure the flags are in a sane state. 214 */ 215 void 216 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp, 217 hammer2_dev_t *hmp, 218 const char *id, int clindex, int repidx, 219 void (*func)(void *arg)) 220 { 221 thr->pmp = pmp; /* xop helpers */ 222 thr->hmp = hmp; /* bulkfree */ 223 thr->clindex = clindex; 224 thr->repidx = repidx; 225 TAILQ_INIT(&thr->xopq); 226 atomic_clear_int(&thr->flags, HAMMER2_THREAD_STOP | 227 HAMMER2_THREAD_STOPPED | 228 HAMMER2_THREAD_FREEZE | 229 HAMMER2_THREAD_FROZEN); 230 if (thr->scratch == NULL) 231 thr->scratch = kmalloc(MAXPHYS, M_HAMMER2, M_WAITOK | M_ZERO); 232 if (repidx >= 0) { 233 lwkt_create(func, thr, &thr->td, NULL, 0, repidx % ncpus, 234 "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx); 235 } else if (pmp) { 236 lwkt_create(func, thr, &thr->td, NULL, 0, -1, 237 "%s-%s", id, pmp->pfs_names[clindex]); 238 } else { 239 lwkt_create(func, thr, &thr->td, NULL, 0, -1, "%s", id); 240 } 241 } 242 243 /* 244 * Terminate a thread. This function will silently return if the thread 245 * was never initialized or has already been deleted. 246 * 247 * This is accomplished by setting the STOP flag and waiting for the td 248 * structure to become NULL. 249 */ 250 void 251 hammer2_thr_delete(hammer2_thread_t *thr) 252 { 253 if (thr->td == NULL) 254 return; 255 hammer2_thr_signal(thr, HAMMER2_THREAD_STOP); 256 hammer2_thr_wait(thr, HAMMER2_THREAD_STOPPED); 257 thr->pmp = NULL; 258 if (thr->scratch) { 259 kfree(thr->scratch, M_HAMMER2); 260 thr->scratch = NULL; 261 } 262 KKASSERT(TAILQ_EMPTY(&thr->xopq)); 263 } 264 265 /* 266 * Asynchronous remaster request. Ask the synchronization thread to 267 * start over soon (as if it were frozen and unfrozen, but without waiting). 268 * The thread always recalculates mastership relationships when restarting. 269 */ 270 void 271 hammer2_thr_remaster(hammer2_thread_t *thr) 272 { 273 if (thr->td == NULL) 274 return; 275 hammer2_thr_signal(thr, HAMMER2_THREAD_REMASTER); 276 } 277 278 void 279 hammer2_thr_freeze_async(hammer2_thread_t *thr) 280 { 281 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 282 } 283 284 void 285 hammer2_thr_freeze(hammer2_thread_t *thr) 286 { 287 if (thr->td == NULL) 288 return; 289 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 290 hammer2_thr_wait(thr, HAMMER2_THREAD_FROZEN); 291 } 292 293 void 294 hammer2_thr_unfreeze(hammer2_thread_t *thr) 295 { 296 if (thr->td == NULL) 297 return; 298 hammer2_thr_signal(thr, HAMMER2_THREAD_UNFREEZE); 299 hammer2_thr_wait_neg(thr, HAMMER2_THREAD_FROZEN); 300 } 301 302 int 303 hammer2_thr_break(hammer2_thread_t *thr) 304 { 305 if (thr->flags & (HAMMER2_THREAD_STOP | 306 HAMMER2_THREAD_REMASTER | 307 HAMMER2_THREAD_FREEZE)) { 308 return 1; 309 } 310 return 0; 311 } 312 313 /**************************************************************************** 314 * HAMMER2 XOPS API * 315 ****************************************************************************/ 316 317 /* 318 * Allocate a XOP request. 319 * 320 * Once allocated a XOP request can be started, collected, and retired, 321 * and can be retired early if desired. 322 * 323 * NOTE: Fifo indices might not be zero but ri == wi on objcache_get(). 324 */ 325 void * 326 hammer2_xop_alloc(hammer2_inode_t *ip, int flags) 327 { 328 hammer2_xop_t *xop; 329 330 xop = objcache_get(cache_xops, M_WAITOK); 331 KKASSERT(xop->head.cluster.array[0].chain == NULL); 332 333 xop->head.ip1 = ip; 334 xop->head.desc = NULL; 335 xop->head.flags = flags; 336 xop->head.state = 0; 337 xop->head.error = 0; 338 xop->head.collect_key = 0; 339 xop->head.focus_dio = NULL; 340 341 if (flags & HAMMER2_XOP_MODIFYING) 342 xop->head.mtid = hammer2_trans_sub(ip->pmp); 343 else 344 xop->head.mtid = 0; 345 346 xop->head.cluster.nchains = ip->cluster.nchains; 347 xop->head.cluster.pmp = ip->pmp; 348 xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED; 349 350 /* 351 * run_mask - Active thread (or frontend) associated with XOP 352 */ 353 xop->head.run_mask = HAMMER2_XOPMASK_VOP; 354 355 hammer2_inode_ref(ip); 356 357 return xop; 358 } 359 360 void 361 hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len) 362 { 363 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 364 xop->name1_len = name_len; 365 bcopy(name, xop->name1, name_len); 366 } 367 368 void 369 hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len) 370 { 371 xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 372 xop->name2_len = name_len; 373 bcopy(name, xop->name2, name_len); 374 } 375 376 size_t 377 hammer2_xop_setname_inum(hammer2_xop_head_t *xop, hammer2_key_t inum) 378 { 379 const size_t name_len = 18; 380 381 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 382 xop->name1_len = name_len; 383 ksnprintf(xop->name1, name_len + 1, "0x%016jx", (intmax_t)inum); 384 385 return name_len; 386 } 387 388 389 void 390 hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2) 391 { 392 xop->ip2 = ip2; 393 hammer2_inode_ref(ip2); 394 } 395 396 void 397 hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3) 398 { 399 xop->ip3 = ip3; 400 hammer2_inode_ref(ip3); 401 } 402 403 void 404 hammer2_xop_setip4(hammer2_xop_head_t *xop, hammer2_inode_t *ip4) 405 { 406 xop->ip4 = ip4; 407 hammer2_inode_ref(ip4); 408 } 409 410 void 411 hammer2_xop_reinit(hammer2_xop_head_t *xop) 412 { 413 xop->state = 0; 414 xop->error = 0; 415 xop->collect_key = 0; 416 xop->run_mask = HAMMER2_XOPMASK_VOP; 417 } 418 419 /* 420 * A mounted PFS needs Xops threads to support frontend operations. 421 */ 422 void 423 hammer2_xop_helper_create(hammer2_pfs_t *pmp) 424 { 425 int i; 426 int j; 427 428 lockmgr(&pmp->lock, LK_EXCLUSIVE); 429 pmp->has_xop_threads = 1; 430 431 pmp->xop_groups = kmalloc(hammer2_xop_nthreads * 432 sizeof(hammer2_xop_group_t), 433 M_HAMMER2, M_WAITOK | M_ZERO); 434 for (i = 0; i < pmp->iroot->cluster.nchains; ++i) { 435 for (j = 0; j < hammer2_xop_nthreads; ++j) { 436 if (pmp->xop_groups[j].thrs[i].td) 437 continue; 438 hammer2_thr_create(&pmp->xop_groups[j].thrs[i], 439 pmp, NULL, 440 "h2xop", i, j, 441 hammer2_primary_xops_thread); 442 } 443 } 444 lockmgr(&pmp->lock, LK_RELEASE); 445 } 446 447 void 448 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp) 449 { 450 int i; 451 int j; 452 453 if (pmp->xop_groups == NULL) { 454 KKASSERT(pmp->has_xop_threads == 0); 455 return; 456 } 457 458 for (i = 0; i < pmp->pfs_nmasters; ++i) { 459 for (j = 0; j < hammer2_xop_nthreads; ++j) { 460 if (pmp->xop_groups[j].thrs[i].td) 461 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]); 462 } 463 } 464 pmp->has_xop_threads = 0; 465 kfree(pmp->xop_groups, M_HAMMER2); 466 pmp->xop_groups = NULL; 467 } 468 469 /* 470 * Start a XOP request, queueing it to all nodes in the cluster to 471 * execute the cluster op. 472 * 473 * XXX optimize single-target case. 474 */ 475 void 476 hammer2_xop_start_except(hammer2_xop_head_t *xop, hammer2_xop_desc_t *desc, 477 int notidx) 478 { 479 hammer2_inode_t *ip1; 480 hammer2_pfs_t *pmp; 481 hammer2_thread_t *thr; 482 int i; 483 int ng; 484 int nchains; 485 486 ip1 = xop->ip1; 487 pmp = ip1->pmp; 488 if (pmp->has_xop_threads == 0) 489 hammer2_xop_helper_create(pmp); 490 491 /* 492 * The sequencer assigns a worker thread to the XOP. 493 * 494 * (1) The worker threads are partitioned into two sets, one for 495 * NON-STRATEGY XOPs, and the other for STRATEGY XOPs. This 496 * guarantees that strategy calls will always be able to make 497 * progress and will not deadlock against non-strategy calls. 498 * 499 * (2) If clustered, non-strategy operations to the same inode must 500 * be serialized. This is to avoid confusion when issuing 501 * modifying operations because a XOP completes the instant a 502 * quorum is reached. 503 * 504 * TODO - RENAME fails here because it is potentially modifying 505 * three different inodes, but we triple-lock the inodes 506 * involved so it shouldn't create a sequencing schism. 507 */ 508 if (xop->flags & HAMMER2_XOP_STRATEGY) { 509 /* 510 * Use worker space 0 associated with the current cpu 511 * for strategy ops. 512 */ 513 hammer2_xop_strategy_t *xopst; 514 u_int which; 515 516 xopst = &((hammer2_xop_t *)xop)->xop_strategy; 517 which = ((unsigned int)ip1->ihash + 518 ((unsigned int)xopst->lbase >> HAMMER2_PBUFRADIX)) % 519 hammer2_xop_sgroups; 520 ng = mycpu->gd_cpuid % hammer2_xop_mod + 521 hammer2_xop_mod * which; 522 } else if (hammer2_spread_workers == 0 && ip1->cluster.nchains == 1) { 523 /* 524 * For now try to keep the work on the same cpu to reduce 525 * IPI overhead. Several threads are assigned to each cpu, 526 * don't be very smart and select the one to use based on 527 * the inode hash. 528 */ 529 u_int which; 530 531 which = (unsigned int)ip1->ihash % hammer2_xop_xgroups; 532 ng = mycpu->gd_cpuid % hammer2_xop_mod + 533 (which * hammer2_xop_mod) + 534 hammer2_xop_xbase; 535 } else { 536 /* 537 * Hash based on inode only, must serialize inode to same 538 * thread regardless of current cpu. 539 */ 540 ng = (unsigned int)ip1->ihash % 541 (hammer2_xop_mod * hammer2_xop_xgroups) + 542 hammer2_xop_xbase; 543 } 544 xop->desc = desc; 545 546 /* 547 * The instant xop is queued another thread can pick it off. In the 548 * case of asynchronous ops, another thread might even finish and 549 * deallocate it. 550 */ 551 hammer2_spin_ex(&pmp->xop_spin); 552 nchains = ip1->cluster.nchains; 553 for (i = 0; i < nchains; ++i) { 554 /* 555 * XXX ip1->cluster.array* not stable here. This temporary 556 * hack fixes basic issues in target XOPs which need to 557 * obtain a starting chain from the inode but does not 558 * address possible races against inode updates which 559 * might NULL-out a chain. 560 */ 561 if (i != notidx && ip1->cluster.array[i].chain) { 562 thr = &pmp->xop_groups[ng].thrs[i]; 563 atomic_set_64(&xop->run_mask, 1LLU << i); 564 atomic_set_64(&xop->chk_mask, 1LLU << i); 565 xop->collect[i].thr = thr; 566 TAILQ_INSERT_TAIL(&thr->xopq, xop, collect[i].entry); 567 } 568 } 569 hammer2_spin_unex(&pmp->xop_spin); 570 /* xop can become invalid at this point */ 571 572 /* 573 * Each thread has its own xopq 574 */ 575 for (i = 0; i < nchains; ++i) { 576 if (i != notidx) { 577 thr = &pmp->xop_groups[ng].thrs[i]; 578 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 579 } 580 } 581 } 582 583 void 584 hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_desc_t *desc) 585 { 586 hammer2_xop_start_except(xop, desc, -1); 587 } 588 589 /* 590 * Retire a XOP. Used by both the VOP frontend and by the XOP backend. 591 */ 592 void 593 hammer2_xop_retire(hammer2_xop_head_t *xop, uint64_t mask) 594 { 595 hammer2_chain_t *chain; 596 uint64_t nmask; 597 int i; 598 599 /* 600 * Remove the frontend collector or remove a backend feeder. 601 * 602 * When removing the frontend we must wakeup any backend feeders 603 * who are waiting for FIFO space. 604 * 605 * When removing the last backend feeder we must wakeup any waiting 606 * frontend. 607 */ 608 KKASSERT(xop->run_mask & mask); 609 nmask = atomic_fetchadd_64(&xop->run_mask, 610 -mask + HAMMER2_XOPMASK_FEED); 611 612 /* 613 * More than one entity left 614 */ 615 if ((nmask & HAMMER2_XOPMASK_ALLDONE) != mask) { 616 /* 617 * Frontend terminating, wakeup any backends waiting on 618 * fifo full. 619 * 620 * NOTE!!! The xop can get ripped out from under us at 621 * this point, so do not reference it again. 622 * The wakeup(xop) doesn't touch the xop and 623 * is ok. 624 */ 625 if (mask == HAMMER2_XOPMASK_VOP) { 626 if (nmask & HAMMER2_XOPMASK_FIFOW) 627 wakeup(xop); 628 } 629 630 /* 631 * Wakeup frontend if the last backend is terminating. 632 */ 633 nmask -= mask; 634 if ((nmask & HAMMER2_XOPMASK_ALLDONE) == HAMMER2_XOPMASK_VOP) { 635 if (nmask & HAMMER2_XOPMASK_WAIT) 636 wakeup(xop); 637 } 638 639 return; 640 } 641 /* else nobody else left, we can ignore FIFOW */ 642 643 /* 644 * All collectors are gone, we can cleanup and dispose of the XOP. 645 * Note that this can wind up being a frontend OR a backend. 646 * Pending chains are locked shared and not owned by any thread. 647 * 648 * Cleanup the collection cluster. 649 */ 650 for (i = 0; i < xop->cluster.nchains; ++i) { 651 xop->cluster.array[i].flags = 0; 652 chain = xop->cluster.array[i].chain; 653 if (chain) { 654 xop->cluster.array[i].chain = NULL; 655 hammer2_chain_drop_unhold(chain); 656 } 657 } 658 659 /* 660 * Cleanup the fifos. Since we are the only entity left on this 661 * xop we don't have to worry about fifo flow control, and one 662 * lfence() will do the job. 663 */ 664 cpu_lfence(); 665 mask = xop->chk_mask; 666 for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) { 667 hammer2_xop_fifo_t *fifo = &xop->collect[i]; 668 while (fifo->ri != fifo->wi) { 669 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 670 if (chain) 671 hammer2_chain_drop_unhold(chain); 672 ++fifo->ri; 673 } 674 mask &= ~(1U << i); 675 } 676 677 /* 678 * The inode is only held at this point, simply drop it. 679 */ 680 if (xop->ip1) { 681 hammer2_inode_drop(xop->ip1); 682 xop->ip1 = NULL; 683 } 684 if (xop->ip2) { 685 hammer2_inode_drop(xop->ip2); 686 xop->ip2 = NULL; 687 } 688 if (xop->ip3) { 689 hammer2_inode_drop(xop->ip3); 690 xop->ip3 = NULL; 691 } 692 if (xop->ip4) { 693 hammer2_inode_drop(xop->ip4); 694 xop->ip4 = NULL; 695 } 696 if (xop->name1) { 697 kfree(xop->name1, M_HAMMER2); 698 xop->name1 = NULL; 699 xop->name1_len = 0; 700 } 701 if (xop->name2) { 702 kfree(xop->name2, M_HAMMER2); 703 xop->name2 = NULL; 704 xop->name2_len = 0; 705 } 706 707 objcache_put(cache_xops, xop); 708 } 709 710 /* 711 * (Backend) Returns non-zero if the frontend is still attached. 712 */ 713 int 714 hammer2_xop_active(hammer2_xop_head_t *xop) 715 { 716 if (xop->run_mask & HAMMER2_XOPMASK_VOP) 717 return 1; 718 else 719 return 0; 720 } 721 722 /* 723 * (Backend) Feed chain data through the cluster validator and back to 724 * the frontend. Chains are fed from multiple nodes concurrently 725 * and pipelined via per-node FIFOs in the XOP. 726 * 727 * The chain must be locked (either shared or exclusive). The caller may 728 * unlock and drop the chain on return. This function will add an extra 729 * ref and hold the chain's data for the pass-back. 730 * 731 * No xop lock is needed because we are only manipulating fields under 732 * our direct control. 733 * 734 * Returns 0 on success and a hammer2 error code if sync is permanently 735 * lost. The caller retains a ref on the chain but by convention 736 * the lock is typically inherited by the xop (caller loses lock). 737 * 738 * Returns non-zero on error. In this situation the caller retains a 739 * ref on the chain but loses the lock (we unlock here). 740 */ 741 int 742 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain, 743 int clindex, int error) 744 { 745 hammer2_xop_fifo_t *fifo; 746 uint64_t mask; 747 748 /* 749 * Early termination (typicaly of xop_readir) 750 */ 751 if (hammer2_xop_active(xop) == 0) { 752 error = HAMMER2_ERROR_ABORTED; 753 goto done; 754 } 755 756 /* 757 * Multi-threaded entry into the XOP collector. We own the 758 * fifo->wi for our clindex. 759 */ 760 fifo = &xop->collect[clindex]; 761 762 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) 763 lwkt_yield(); 764 while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 765 atomic_set_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 766 mask = xop->run_mask; 767 if ((mask & HAMMER2_XOPMASK_VOP) == 0) { 768 error = HAMMER2_ERROR_ABORTED; 769 goto done; 770 } 771 tsleep_interlock(xop, 0); 772 if (atomic_cmpset_64(&xop->run_mask, mask, 773 mask | HAMMER2_XOPMASK_FIFOW)) { 774 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 775 tsleep(xop, PINTERLOCKED, "h2feed", hz*60); 776 } 777 } 778 /* retry */ 779 } 780 atomic_clear_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 781 if (chain) 782 hammer2_chain_ref_hold(chain); 783 if (error == 0 && chain) 784 error = chain->error; 785 fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error; 786 fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain; 787 cpu_sfence(); 788 ++fifo->wi; 789 790 mask = atomic_fetchadd_64(&xop->run_mask, HAMMER2_XOPMASK_FEED); 791 if (mask & HAMMER2_XOPMASK_WAIT) { 792 atomic_clear_64(&xop->run_mask, HAMMER2_XOPMASK_WAIT); 793 wakeup(xop); 794 } 795 error = 0; 796 797 /* 798 * Cleanup. If an error occurred we eat the lock. If no error 799 * occurred the fifo inherits the lock and gains an additional ref. 800 * 801 * The caller's ref remains in both cases. 802 */ 803 done: 804 return error; 805 } 806 807 /* 808 * (Frontend) collect a response from a running cluster op. 809 * 810 * Responses are fed from all appropriate nodes concurrently 811 * and collected into a cohesive response >= collect_key. 812 * 813 * The collector will return the instant quorum or other requirements 814 * are met, even if some nodes get behind or become non-responsive. 815 * 816 * HAMMER2_XOP_COLLECT_NOWAIT - Used to 'poll' a completed collection, 817 * usually called synchronously from the 818 * node XOPs for the strategy code to 819 * fake the frontend collection and complete 820 * the BIO as soon as possible. 821 * 822 * HAMMER2_XOP_SYNCHRONIZER - Reqeuest synchronization with a particular 823 * cluster index, prevents looping when that 824 * index is out of sync so caller can act on 825 * the out of sync element. ESRCH and EDEADLK 826 * can be returned if this flag is specified. 827 * 828 * Returns 0 on success plus a filled out xop->cluster structure. 829 * Return ENOENT on normal termination. 830 * Otherwise return an error. 831 * 832 * WARNING! If the xop returns a cluster with a non-NULL focus, note that 833 * none of the chains in the cluster (or the focus) are either 834 * locked or I/O synchronized with the cpu. hammer2_xop_gdata() 835 * and hammer2_xop_pdata() must be used to safely access the focus 836 * chain's content. 837 * 838 * The frontend can make certain assumptions based on higher-level 839 * locking done by the frontend, but data integrity absolutely 840 * requires using the gdata/pdata API. 841 */ 842 int 843 hammer2_xop_collect(hammer2_xop_head_t *xop, int flags) 844 { 845 hammer2_xop_fifo_t *fifo; 846 hammer2_chain_t *chain; 847 hammer2_key_t lokey; 848 uint64_t mask; 849 int error; 850 int keynull; 851 int adv; /* advance the element */ 852 int i; 853 854 loop: 855 /* 856 * First loop tries to advance pieces of the cluster which 857 * are out of sync. 858 */ 859 lokey = HAMMER2_KEY_MAX; 860 keynull = HAMMER2_CHECK_NULL; 861 mask = xop->run_mask; 862 cpu_lfence(); 863 864 for (i = 0; i < xop->cluster.nchains; ++i) { 865 chain = xop->cluster.array[i].chain; 866 if (chain == NULL) { 867 adv = 1; 868 } else if (chain->bref.key < xop->collect_key) { 869 adv = 1; 870 } else { 871 keynull &= ~HAMMER2_CHECK_NULL; 872 if (lokey > chain->bref.key) 873 lokey = chain->bref.key; 874 adv = 0; 875 } 876 if (adv == 0) 877 continue; 878 879 /* 880 * Advance element if possible, advanced element may be NULL. 881 */ 882 if (chain) 883 hammer2_chain_drop_unhold(chain); 884 885 fifo = &xop->collect[i]; 886 if (fifo->ri != fifo->wi) { 887 cpu_lfence(); 888 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 889 error = fifo->errors[fifo->ri & HAMMER2_XOPFIFO_MASK]; 890 ++fifo->ri; 891 xop->cluster.array[i].chain = chain; 892 xop->cluster.array[i].error = error; 893 if (chain == NULL) { 894 /* XXX */ 895 xop->cluster.array[i].flags |= 896 HAMMER2_CITEM_NULL; 897 } 898 if (fifo->wi - fifo->ri <= HAMMER2_XOPFIFO / 2) { 899 if (fifo->flags & HAMMER2_XOP_FIFO_STALL) { 900 atomic_clear_int(&fifo->flags, 901 HAMMER2_XOP_FIFO_STALL); 902 wakeup(xop); 903 lwkt_yield(); 904 } 905 } 906 --i; /* loop on same index */ 907 } else { 908 /* 909 * Retain CITEM_NULL flag. If set just repeat EOF. 910 * If not, the NULL,0 combination indicates an 911 * operation in-progress. 912 */ 913 xop->cluster.array[i].chain = NULL; 914 /* retain any CITEM_NULL setting */ 915 } 916 } 917 918 /* 919 * Determine whether the lowest collected key meets clustering 920 * requirements. Returns: 921 * 922 * 0 - key valid, cluster can be returned. 923 * 924 * ENOENT - normal end of scan, return ENOENT. 925 * 926 * ESRCH - sufficient elements collected, quorum agreement 927 * that lokey is not a valid element and should be 928 * skipped. 929 * 930 * EDEADLK - sufficient elements collected, no quorum agreement 931 * (and no agreement possible). In this situation a 932 * repair is needed, for now we loop. 933 * 934 * EINPROGRESS - insufficient elements collected to resolve, wait 935 * for event and loop. 936 */ 937 if ((flags & HAMMER2_XOP_COLLECT_WAITALL) && 938 (mask & HAMMER2_XOPMASK_ALLDONE) != HAMMER2_XOPMASK_VOP) { 939 error = HAMMER2_ERROR_EINPROGRESS; 940 } else { 941 error = hammer2_cluster_check(&xop->cluster, lokey, keynull); 942 } 943 if (error == HAMMER2_ERROR_EINPROGRESS) { 944 if (flags & HAMMER2_XOP_COLLECT_NOWAIT) 945 goto done; 946 tsleep_interlock(xop, 0); 947 if (atomic_cmpset_64(&xop->run_mask, 948 mask, mask | HAMMER2_XOPMASK_WAIT)) { 949 tsleep(xop, PINTERLOCKED, "h2coll", hz*60); 950 } 951 goto loop; 952 } 953 if (error == HAMMER2_ERROR_ESRCH) { 954 if (lokey != HAMMER2_KEY_MAX) { 955 xop->collect_key = lokey + 1; 956 goto loop; 957 } 958 error = HAMMER2_ERROR_ENOENT; 959 } 960 if (error == HAMMER2_ERROR_EDEADLK) { 961 kprintf("hammer2: no quorum possible lokey %016jx\n", 962 lokey); 963 if (lokey != HAMMER2_KEY_MAX) { 964 xop->collect_key = lokey + 1; 965 goto loop; 966 } 967 error = HAMMER2_ERROR_ENOENT; 968 } 969 if (lokey == HAMMER2_KEY_MAX) 970 xop->collect_key = lokey; 971 else 972 xop->collect_key = lokey + 1; 973 done: 974 return error; 975 } 976 977 /* 978 * N x M processing threads are available to handle XOPs, N per cluster 979 * index x M cluster nodes. 980 * 981 * Locate and return the next runnable xop, or NULL if no xops are 982 * present or none of the xops are currently runnable (for various reasons). 983 * The xop is left on the queue and serves to block other dependent xops 984 * from being run. 985 * 986 * Dependent xops will not be returned. 987 * 988 * Sets HAMMER2_XOP_FIFO_RUN on the returned xop or returns NULL. 989 * 990 * NOTE! Xops run concurrently for each cluster index. 991 */ 992 #define XOP_HASH_SIZE 16 993 #define XOP_HASH_MASK (XOP_HASH_SIZE - 1) 994 995 static __inline 996 int 997 xop_testhash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 998 { 999 uint32_t mask; 1000 int hv; 1001 1002 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 1003 mask = 1U << (hv & 31); 1004 hv >>= 5; 1005 1006 return ((int)(hash[hv & XOP_HASH_MASK] & mask)); 1007 } 1008 1009 static __inline 1010 void 1011 xop_sethash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 1012 { 1013 uint32_t mask; 1014 int hv; 1015 1016 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 1017 mask = 1U << (hv & 31); 1018 hv >>= 5; 1019 1020 hash[hv & XOP_HASH_MASK] |= mask; 1021 } 1022 1023 static 1024 hammer2_xop_head_t * 1025 hammer2_xop_next(hammer2_thread_t *thr) 1026 { 1027 hammer2_pfs_t *pmp = thr->pmp; 1028 int clindex = thr->clindex; 1029 uint32_t hash[XOP_HASH_SIZE] = { 0 }; 1030 hammer2_xop_head_t *xop; 1031 1032 hammer2_spin_ex(&pmp->xop_spin); 1033 TAILQ_FOREACH(xop, &thr->xopq, collect[clindex].entry) { 1034 /* 1035 * Check dependency 1036 */ 1037 if (xop_testhash(thr, xop->ip1, hash) || 1038 (xop->ip2 && xop_testhash(thr, xop->ip2, hash)) || 1039 (xop->ip3 && xop_testhash(thr, xop->ip3, hash)) || 1040 (xop->ip4 && xop_testhash(thr, xop->ip4, hash))) 1041 { 1042 continue; 1043 } 1044 xop_sethash(thr, xop->ip1, hash); 1045 if (xop->ip2) 1046 xop_sethash(thr, xop->ip2, hash); 1047 if (xop->ip3) 1048 xop_sethash(thr, xop->ip3, hash); 1049 if (xop->ip4) 1050 xop_sethash(thr, xop->ip4, hash); 1051 1052 /* 1053 * Check already running 1054 */ 1055 if (xop->collect[clindex].flags & HAMMER2_XOP_FIFO_RUN) 1056 continue; 1057 1058 /* 1059 * Found a good one, return it. 1060 */ 1061 atomic_set_int(&xop->collect[clindex].flags, 1062 HAMMER2_XOP_FIFO_RUN); 1063 break; 1064 } 1065 hammer2_spin_unex(&pmp->xop_spin); 1066 1067 return xop; 1068 } 1069 1070 /* 1071 * Remove the completed XOP from the queue, clear HAMMER2_XOP_FIFO_RUN. 1072 * 1073 * NOTE! Xops run concurrently for each cluster index. 1074 */ 1075 static 1076 void 1077 hammer2_xop_dequeue(hammer2_thread_t *thr, hammer2_xop_head_t *xop) 1078 { 1079 hammer2_pfs_t *pmp = thr->pmp; 1080 int clindex = thr->clindex; 1081 1082 hammer2_spin_ex(&pmp->xop_spin); 1083 TAILQ_REMOVE(&thr->xopq, xop, collect[clindex].entry); 1084 atomic_clear_int(&xop->collect[clindex].flags, 1085 HAMMER2_XOP_FIFO_RUN); 1086 hammer2_spin_unex(&pmp->xop_spin); 1087 if (TAILQ_FIRST(&thr->xopq)) 1088 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 1089 } 1090 1091 /* 1092 * Primary management thread for xops support. Each node has several such 1093 * threads which replicate front-end operations on cluster nodes. 1094 * 1095 * XOPS thread node operations, allowing the function to focus on a single 1096 * node in the cluster after validating the operation with the cluster. 1097 * This is primarily what prevents dead or stalled nodes from stalling 1098 * the front-end. 1099 */ 1100 void 1101 hammer2_primary_xops_thread(void *arg) 1102 { 1103 hammer2_thread_t *thr = arg; 1104 hammer2_pfs_t *pmp; 1105 hammer2_xop_head_t *xop; 1106 uint64_t mask; 1107 uint32_t flags; 1108 uint32_t nflags; 1109 hammer2_xop_desc_t *last_desc = NULL; 1110 1111 pmp = thr->pmp; 1112 /*xgrp = &pmp->xop_groups[thr->repidx]; not needed */ 1113 mask = 1LLU << thr->clindex; 1114 1115 for (;;) { 1116 flags = thr->flags; 1117 1118 /* 1119 * Handle stop request 1120 */ 1121 if (flags & HAMMER2_THREAD_STOP) 1122 break; 1123 1124 /* 1125 * Handle freeze request 1126 */ 1127 if (flags & HAMMER2_THREAD_FREEZE) { 1128 hammer2_thr_signal2(thr, HAMMER2_THREAD_FROZEN, 1129 HAMMER2_THREAD_FREEZE); 1130 continue; 1131 } 1132 1133 if (flags & HAMMER2_THREAD_UNFREEZE) { 1134 hammer2_thr_signal2(thr, 0, 1135 HAMMER2_THREAD_FROZEN | 1136 HAMMER2_THREAD_UNFREEZE); 1137 continue; 1138 } 1139 1140 /* 1141 * Force idle if frozen until unfrozen or stopped. 1142 */ 1143 if (flags & HAMMER2_THREAD_FROZEN) { 1144 hammer2_thr_wait_any(thr, 1145 HAMMER2_THREAD_UNFREEZE | 1146 HAMMER2_THREAD_STOP, 1147 0); 1148 continue; 1149 } 1150 1151 /* 1152 * Reset state on REMASTER request 1153 */ 1154 if (flags & HAMMER2_THREAD_REMASTER) { 1155 hammer2_thr_signal2(thr, 0, HAMMER2_THREAD_REMASTER); 1156 /* reset state here */ 1157 continue; 1158 } 1159 1160 /* 1161 * Process requests. Each request can be multi-queued. 1162 * 1163 * If we get behind and the frontend VOP is no longer active, 1164 * we retire the request without processing it. The callback 1165 * may also abort processing if the frontend VOP becomes 1166 * inactive. 1167 */ 1168 if (flags & HAMMER2_THREAD_XOPQ) { 1169 nflags = flags & ~HAMMER2_THREAD_XOPQ; 1170 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 1171 continue; 1172 flags = nflags; 1173 /* fall through */ 1174 } 1175 while ((xop = hammer2_xop_next(thr)) != NULL) { 1176 if (hammer2_xop_active(xop)) { 1177 last_desc = xop->desc; 1178 xop->desc->storage_func((hammer2_xop_t *)xop, 1179 thr->scratch, 1180 thr->clindex); 1181 hammer2_xop_dequeue(thr, xop); 1182 hammer2_xop_retire(xop, mask); 1183 } else { 1184 last_desc = xop->desc; 1185 hammer2_xop_feed(xop, NULL, thr->clindex, 1186 ECONNABORTED); 1187 hammer2_xop_dequeue(thr, xop); 1188 hammer2_xop_retire(xop, mask); 1189 } 1190 } 1191 1192 /* 1193 * Wait for event, interlock using THREAD_WAITING and 1194 * THREAD_SIGNAL. 1195 * 1196 * For robustness poll on a 30-second interval, but nominally 1197 * expect to be woken up. 1198 */ 1199 nflags = flags | HAMMER2_THREAD_WAITING; 1200 1201 tsleep_interlock(&thr->flags, 0); 1202 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1203 tsleep(&thr->flags, PINTERLOCKED, "h2idle", hz*30); 1204 } 1205 } 1206 1207 #if 0 1208 /* 1209 * Cleanup / termination 1210 */ 1211 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) { 1212 kprintf("hammer2_thread: aborting xop %s\n", xop->desc->id); 1213 TAILQ_REMOVE(&thr->xopq, xop, 1214 collect[thr->clindex].entry); 1215 hammer2_xop_retire(xop, mask); 1216 } 1217 #endif 1218 thr->td = NULL; 1219 hammer2_thr_signal(thr, HAMMER2_THREAD_STOPPED); 1220 /* thr structure can go invalid after this point */ 1221 } 1222