1 /* 2 * Copyright (c) 2015 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 /* 35 * This module implements the hammer2 helper thread API, including 36 * the frontend/backend XOP API. 37 */ 38 #include "hammer2.h" 39 40 /* 41 * Signal that the thread has work. 42 */ 43 void 44 hammer2_thr_signal(hammer2_thread_t *thr, uint32_t flags) 45 { 46 uint32_t oflags; 47 48 for (;;) { 49 oflags = thr->flags; 50 cpu_ccfence(); 51 if (oflags & HAMMER2_THREAD_WAITING) { 52 if (atomic_cmpset_int(&thr->flags, oflags, 53 (oflags | flags) & ~HAMMER2_THREAD_WAITING)) { 54 wakeup(&thr->flags); 55 break; 56 } 57 } else { 58 if (atomic_cmpset_int(&thr->flags, oflags, 59 oflags | flags)) { 60 break; 61 } 62 } 63 } 64 } 65 66 /* 67 * Return status to waiting client(s) 68 * 69 * WARNING! During teardown (thr) can disappear the instant our cmpset 70 * succeeds. 71 */ 72 void 73 hammer2_thr_return(hammer2_thread_t *thr, uint32_t flags) 74 { 75 uint32_t oflags; 76 uint32_t nflags; 77 78 for (;;) { 79 oflags = thr->flags; 80 cpu_ccfence(); 81 nflags = (oflags | flags) & ~HAMMER2_THREAD_CLIENTWAIT; 82 83 if (oflags & HAMMER2_THREAD_CLIENTWAIT) { 84 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 85 wakeup(&thr->flags); 86 break; 87 } 88 } else { 89 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 90 break; 91 } 92 } 93 } 94 95 /* 96 * Wait until the bits in flags are set. 97 * 98 * WARNING! During teardown (thr) can disappear the instant our cmpset 99 * succeeds. 100 */ 101 void 102 hammer2_thr_wait(hammer2_thread_t *thr, uint32_t flags) 103 { 104 uint32_t oflags; 105 uint32_t nflags; 106 107 for (;;) { 108 oflags = thr->flags; 109 cpu_ccfence(); 110 if ((oflags & flags) == flags) 111 break; 112 nflags = oflags | HAMMER2_THREAD_CLIENTWAIT; 113 tsleep_interlock(&thr->flags, 0); 114 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 115 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 116 } 117 } 118 } 119 120 /* 121 * Wait until the bits in flags are clear. 122 * 123 * WARNING! During teardown (thr) can disappear the instant our cmpset 124 * succeeds. 125 */ 126 void 127 hammer2_thr_wait_neg(hammer2_thread_t *thr, uint32_t flags) 128 { 129 uint32_t oflags; 130 uint32_t nflags; 131 132 for (;;) { 133 oflags = thr->flags; 134 cpu_ccfence(); 135 if ((oflags & flags) == 0) 136 break; 137 nflags = oflags | HAMMER2_THREAD_CLIENTWAIT; 138 tsleep_interlock(&thr->flags, 0); 139 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 140 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 141 } 142 } 143 } 144 145 /* 146 * Initialize the supplied thread structure, starting the specified 147 * thread. 148 * 149 * NOTE: thr structure can be retained across mounts and unmounts for this 150 * pmp, so make sure the flags are in a sane state. 151 */ 152 void 153 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp, 154 const char *id, int clindex, int repidx, 155 void (*func)(void *arg)) 156 { 157 thr->pmp = pmp; 158 thr->clindex = clindex; 159 thr->repidx = repidx; 160 TAILQ_INIT(&thr->xopq); 161 atomic_clear_int(&thr->flags, HAMMER2_THREAD_STOP | 162 HAMMER2_THREAD_STOPPED | 163 HAMMER2_THREAD_FREEZE | 164 HAMMER2_THREAD_FROZEN); 165 if (thr->scratch == NULL) 166 thr->scratch = kmalloc(MAXPHYS, M_HAMMER2, M_WAITOK | M_ZERO); 167 if (repidx >= 0) { 168 lwkt_create(func, thr, &thr->td, NULL, 0, repidx % ncpus, 169 "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx); 170 } else { 171 lwkt_create(func, thr, &thr->td, NULL, 0, -1, 172 "%s-%s", id, pmp->pfs_names[clindex]); 173 } 174 } 175 176 /* 177 * Terminate a thread. This function will silently return if the thread 178 * was never initialized or has already been deleted. 179 * 180 * This is accomplished by setting the STOP flag and waiting for the td 181 * structure to become NULL. 182 */ 183 void 184 hammer2_thr_delete(hammer2_thread_t *thr) 185 { 186 if (thr->td == NULL) 187 return; 188 hammer2_thr_signal(thr, HAMMER2_THREAD_STOP); 189 hammer2_thr_wait(thr, HAMMER2_THREAD_STOPPED); 190 thr->pmp = NULL; 191 if (thr->scratch) { 192 kfree(thr->scratch, M_HAMMER2); 193 thr->scratch = NULL; 194 } 195 KKASSERT(TAILQ_EMPTY(&thr->xopq)); 196 } 197 198 /* 199 * Asynchronous remaster request. Ask the synchronization thread to 200 * start over soon (as if it were frozen and unfrozen, but without waiting). 201 * The thread always recalculates mastership relationships when restarting. 202 */ 203 void 204 hammer2_thr_remaster(hammer2_thread_t *thr) 205 { 206 if (thr->td == NULL) 207 return; 208 hammer2_thr_signal(thr, HAMMER2_THREAD_REMASTER); 209 } 210 211 void 212 hammer2_thr_freeze_async(hammer2_thread_t *thr) 213 { 214 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 215 } 216 217 void 218 hammer2_thr_freeze(hammer2_thread_t *thr) 219 { 220 if (thr->td == NULL) 221 return; 222 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 223 hammer2_thr_wait(thr, HAMMER2_THREAD_FROZEN); 224 } 225 226 void 227 hammer2_thr_unfreeze(hammer2_thread_t *thr) 228 { 229 if (thr->td == NULL) 230 return; 231 hammer2_thr_signal(thr, HAMMER2_THREAD_UNFREEZE); 232 hammer2_thr_wait_neg(thr, HAMMER2_THREAD_FROZEN); 233 } 234 235 int 236 hammer2_thr_break(hammer2_thread_t *thr) 237 { 238 if (thr->flags & (HAMMER2_THREAD_STOP | 239 HAMMER2_THREAD_REMASTER | 240 HAMMER2_THREAD_FREEZE)) { 241 return 1; 242 } 243 return 0; 244 } 245 246 /**************************************************************************** 247 * HAMMER2 XOPS API * 248 ****************************************************************************/ 249 250 void 251 hammer2_xop_group_init(hammer2_pfs_t *pmp, hammer2_xop_group_t *xgrp) 252 { 253 /* no extra fields in structure at the moment */ 254 } 255 256 /* 257 * Allocate a XOP request. 258 * 259 * Once allocated a XOP request can be started, collected, and retired, 260 * and can be retired early if desired. 261 * 262 * NOTE: Fifo indices might not be zero but ri == wi on objcache_get(). 263 */ 264 void * 265 hammer2_xop_alloc(hammer2_inode_t *ip, int flags) 266 { 267 hammer2_xop_t *xop; 268 269 xop = objcache_get(cache_xops, M_WAITOK); 270 KKASSERT(xop->head.cluster.array[0].chain == NULL); 271 272 xop->head.ip1 = ip; 273 xop->head.func = NULL; 274 xop->head.flags = flags; 275 xop->head.state = 0; 276 xop->head.error = 0; 277 xop->head.collect_key = 0; 278 xop->head.check_counter = 0; 279 if (flags & HAMMER2_XOP_MODIFYING) 280 xop->head.mtid = hammer2_trans_sub(ip->pmp); 281 else 282 xop->head.mtid = 0; 283 284 xop->head.cluster.nchains = ip->cluster.nchains; 285 xop->head.cluster.pmp = ip->pmp; 286 xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED; 287 288 /* 289 * run_mask - Active thread (or frontend) associated with XOP 290 */ 291 xop->head.run_mask = HAMMER2_XOPMASK_VOP; 292 293 hammer2_inode_ref(ip); 294 295 return xop; 296 } 297 298 void 299 hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len) 300 { 301 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 302 xop->name1_len = name_len; 303 bcopy(name, xop->name1, name_len); 304 } 305 306 void 307 hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len) 308 { 309 xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 310 xop->name2_len = name_len; 311 bcopy(name, xop->name2, name_len); 312 } 313 314 size_t 315 hammer2_xop_setname_inum(hammer2_xop_head_t *xop, hammer2_key_t inum) 316 { 317 const size_t name_len = 18; 318 319 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 320 xop->name1_len = name_len; 321 ksnprintf(xop->name1, name_len + 1, "0x%016jx", (intmax_t)inum); 322 323 return name_len; 324 } 325 326 327 void 328 hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2) 329 { 330 xop->ip2 = ip2; 331 hammer2_inode_ref(ip2); 332 } 333 334 void 335 hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3) 336 { 337 xop->ip3 = ip3; 338 hammer2_inode_ref(ip3); 339 } 340 341 void 342 hammer2_xop_reinit(hammer2_xop_head_t *xop) 343 { 344 xop->state = 0; 345 xop->error = 0; 346 xop->collect_key = 0; 347 xop->run_mask = HAMMER2_XOPMASK_VOP; 348 } 349 350 /* 351 * A mounted PFS needs Xops threads to support frontend operations. 352 */ 353 void 354 hammer2_xop_helper_create(hammer2_pfs_t *pmp) 355 { 356 int i; 357 int j; 358 359 lockmgr(&pmp->lock, LK_EXCLUSIVE); 360 pmp->has_xop_threads = 1; 361 362 for (i = 0; i < pmp->iroot->cluster.nchains; ++i) { 363 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) { 364 if (pmp->xop_groups[j].thrs[i].td) 365 continue; 366 hammer2_thr_create(&pmp->xop_groups[j].thrs[i], pmp, 367 "h2xop", i, j, 368 hammer2_primary_xops_thread); 369 } 370 } 371 lockmgr(&pmp->lock, LK_RELEASE); 372 } 373 374 void 375 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp) 376 { 377 int i; 378 int j; 379 380 for (i = 0; i < pmp->pfs_nmasters; ++i) { 381 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) { 382 if (pmp->xop_groups[j].thrs[i].td) 383 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]); 384 } 385 } 386 pmp->has_xop_threads = 0; 387 } 388 389 /* 390 * Start a XOP request, queueing it to all nodes in the cluster to 391 * execute the cluster op. 392 * 393 * XXX optimize single-target case. 394 */ 395 void 396 hammer2_xop_start_except(hammer2_xop_head_t *xop, hammer2_xop_func_t func, 397 int notidx) 398 { 399 hammer2_inode_t *ip1; 400 hammer2_pfs_t *pmp; 401 hammer2_thread_t *thr; 402 int i; 403 int ng; 404 int nchains; 405 406 ip1 = xop->ip1; 407 pmp = ip1->pmp; 408 if (pmp->has_xop_threads == 0) 409 hammer2_xop_helper_create(pmp); 410 411 /* 412 * The intent of the XOP sequencer is to ensure that ops on the same 413 * inode execute in the same order. This is necessary when issuing 414 * modifying operations to multiple targets because some targets might 415 * get behind and the frontend is allowed to complete the moment a 416 * quorum of targets succeed. 417 * 418 * Strategy operations must be segregated from non-strategy operations 419 * to avoid a deadlock. For example, if a vfsync and a bread/bwrite 420 * were queued to the same worker thread, the locked buffer in the 421 * strategy operation can deadlock the vfsync's buffer list scan. 422 * 423 * TODO - RENAME fails here because it is potentially modifying 424 * three different inodes. 425 */ 426 if (xop->flags & HAMMER2_XOP_STRATEGY) { 427 hammer2_xop_strategy_t *xopst; 428 429 xopst = &((hammer2_xop_t *)xop)->xop_strategy; 430 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1)) ^ 431 hammer2_icrc32(&xopst->lbase, sizeof(xopst->lbase))); 432 ng = ng & (HAMMER2_XOPGROUPS_MASK >> 1); 433 ng += HAMMER2_XOPGROUPS / 2; 434 } else { 435 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1))); 436 ng = ng & (HAMMER2_XOPGROUPS_MASK >> 1); 437 } 438 xop->func = func; 439 440 /* 441 * The instant xop is queued another thread can pick it off. In the 442 * case of asynchronous ops, another thread might even finish and 443 * deallocate it. 444 */ 445 hammer2_spin_ex(&pmp->xop_spin); 446 nchains = ip1->cluster.nchains; 447 for (i = 0; i < nchains; ++i) { 448 /* 449 * XXX ip1->cluster.array* not stable here. This temporary 450 * hack fixes basic issues in target XOPs which need to 451 * obtain a starting chain from the inode but does not 452 * address possible races against inode updates which 453 * might NULL-out a chain. 454 */ 455 if (i != notidx && ip1->cluster.array[i].chain) { 456 thr = &pmp->xop_groups[ng].thrs[i]; 457 atomic_set_int(&xop->run_mask, 1U << i); 458 atomic_set_int(&xop->chk_mask, 1U << i); 459 xop->collect[i].thr = thr; 460 TAILQ_INSERT_TAIL(&thr->xopq, xop, collect[i].entry); 461 } 462 } 463 hammer2_spin_unex(&pmp->xop_spin); 464 /* xop can become invalid at this point */ 465 466 /* 467 * Each thread has its own xopq 468 */ 469 for (i = 0; i < nchains; ++i) { 470 if (i != notidx) { 471 thr = &pmp->xop_groups[ng].thrs[i]; 472 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 473 } 474 } 475 } 476 477 void 478 hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_func_t func) 479 { 480 hammer2_xop_start_except(xop, func, -1); 481 } 482 483 /* 484 * Retire a XOP. Used by both the VOP frontend and by the XOP backend. 485 */ 486 void 487 hammer2_xop_retire(hammer2_xop_head_t *xop, uint32_t mask) 488 { 489 hammer2_chain_t *chain; 490 uint32_t nmask; 491 int i; 492 493 /* 494 * Remove the frontend collector or remove a backend feeder. 495 * When removing the frontend we must wakeup any backend feeders 496 * who are waiting for FIFO space. 497 * 498 * XXX optimize wakeup. 499 */ 500 KKASSERT(xop->run_mask & mask); 501 nmask = atomic_fetchadd_int(&xop->run_mask, -mask); 502 if ((nmask & ~HAMMER2_XOPMASK_FIFOW) != mask) { 503 if (mask == HAMMER2_XOPMASK_VOP) { 504 if (nmask & HAMMER2_XOPMASK_FIFOW) 505 wakeup(xop); 506 } 507 return; 508 } 509 /* else nobody else left, we can ignore FIFOW */ 510 511 /* 512 * All collectors are gone, we can cleanup and dispose of the XOP. 513 * Note that this can wind up being a frontend OR a backend. 514 * Pending chains are locked shared and not owned by any thread. 515 */ 516 #if 0 517 /* 518 * Cache the terminating cluster. 519 */ 520 hammer2_inode_t *ip; 521 if ((ip = xop->ip1) != NULL) { 522 hammer2_cluster_t *tmpclu; 523 524 tmpclu = hammer2_cluster_copy(&xop->cluster); 525 hammer2_spin_ex(&ip->cluster_spin); 526 tmpclu = atomic_swap_ptr((volatile void **)&ip->cluster_cache, 527 tmpclu); 528 hammer2_spin_unex(&ip->cluster_spin); 529 if (tmpclu) 530 hammer2_cluster_drop(tmpclu); 531 } 532 #endif 533 534 /* 535 * Cleanup the collection cluster. 536 */ 537 for (i = 0; i < xop->cluster.nchains; ++i) { 538 xop->cluster.array[i].flags = 0; 539 chain = xop->cluster.array[i].chain; 540 if (chain) { 541 xop->cluster.array[i].chain = NULL; 542 hammer2_chain_drop_unhold(chain); 543 } 544 } 545 546 /* 547 * Cleanup the fifos, use check_counter to optimize the loop. 548 * Since we are the only entity left on this xop we don't have 549 * to worry about fifo flow control, and one lfence() will do the 550 * job. 551 */ 552 cpu_lfence(); 553 mask = xop->chk_mask; 554 for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) { 555 hammer2_xop_fifo_t *fifo = &xop->collect[i]; 556 while (fifo->ri != fifo->wi) { 557 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 558 if (chain) 559 hammer2_chain_drop_unhold(chain); 560 ++fifo->ri; 561 } 562 mask &= ~(1U << i); 563 } 564 565 /* 566 * The inode is only held at this point, simply drop it. 567 */ 568 if (xop->ip1) { 569 hammer2_inode_drop(xop->ip1); 570 xop->ip1 = NULL; 571 } 572 if (xop->ip2) { 573 hammer2_inode_drop(xop->ip2); 574 xop->ip2 = NULL; 575 } 576 if (xop->ip3) { 577 hammer2_inode_drop(xop->ip3); 578 xop->ip3 = NULL; 579 } 580 if (xop->name1) { 581 kfree(xop->name1, M_HAMMER2); 582 xop->name1 = NULL; 583 xop->name1_len = 0; 584 } 585 if (xop->name2) { 586 kfree(xop->name2, M_HAMMER2); 587 xop->name2 = NULL; 588 xop->name2_len = 0; 589 } 590 591 objcache_put(cache_xops, xop); 592 } 593 594 /* 595 * (Backend) Returns non-zero if the frontend is still attached. 596 */ 597 int 598 hammer2_xop_active(hammer2_xop_head_t *xop) 599 { 600 if (xop->run_mask & HAMMER2_XOPMASK_VOP) 601 return 1; 602 else 603 return 0; 604 } 605 606 /* 607 * (Backend) Feed chain data through the cluster validator and back to 608 * the frontend. Chains are fed from multiple nodes concurrently 609 * and pipelined via per-node FIFOs in the XOP. 610 * 611 * The chain must be locked (either shared or exclusive). The caller may 612 * unlock and drop the chain on return. This function will add an extra 613 * ref and hold the chain's data for the pass-back. 614 * 615 * No xop lock is needed because we are only manipulating fields under 616 * our direct control. 617 * 618 * Returns 0 on success and a hammer error code if sync is permanently 619 * lost. The caller retains a ref on the chain but by convention 620 * the lock is typically inherited by the xop (caller loses lock). 621 * 622 * Returns non-zero on error. In this situation the caller retains a 623 * ref on the chain but loses the lock (we unlock here). 624 */ 625 int 626 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain, 627 int clindex, int error) 628 { 629 hammer2_xop_fifo_t *fifo; 630 uint32_t mask; 631 632 /* 633 * Early termination (typicaly of xop_readir) 634 */ 635 if (hammer2_xop_active(xop) == 0) { 636 error = EINTR; 637 goto done; 638 } 639 640 /* 641 * Multi-threaded entry into the XOP collector. We own the 642 * fifo->wi for our clindex. 643 */ 644 fifo = &xop->collect[clindex]; 645 646 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) 647 lwkt_yield(); 648 while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 649 atomic_set_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 650 mask = xop->run_mask; 651 if ((mask & HAMMER2_XOPMASK_VOP) == 0) { 652 error = EINTR; 653 goto done; 654 } 655 tsleep_interlock(xop, 0); 656 if (atomic_cmpset_int(&xop->run_mask, mask, 657 mask | HAMMER2_XOPMASK_FIFOW)) { 658 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 659 tsleep(xop, PINTERLOCKED, "h2feed", hz*60); 660 } 661 } 662 /* retry */ 663 } 664 atomic_clear_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 665 if (chain) 666 hammer2_chain_ref_hold(chain); 667 if (error == 0 && chain) 668 error = chain->error; 669 fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error; 670 fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain; 671 cpu_sfence(); 672 ++fifo->wi; 673 if (atomic_fetchadd_int(&xop->check_counter, HAMMER2_XOP_CHKINC) & 674 HAMMER2_XOP_CHKWAIT) { 675 atomic_clear_int(&xop->check_counter, HAMMER2_XOP_CHKWAIT); 676 wakeup(&xop->check_counter); 677 } 678 error = 0; 679 680 /* 681 * Cleanup. If an error occurred we eat the lock. If no error 682 * occurred the fifo inherits the lock and gains an additional ref. 683 * 684 * The caller's ref remains in both cases. 685 */ 686 done: 687 return error; 688 } 689 690 /* 691 * (Frontend) collect a response from a running cluster op. 692 * 693 * Responses are fed from all appropriate nodes concurrently 694 * and collected into a cohesive response >= collect_key. 695 * 696 * The collector will return the instant quorum or other requirements 697 * are met, even if some nodes get behind or become non-responsive. 698 * 699 * HAMMER2_XOP_COLLECT_NOWAIT - Used to 'poll' a completed collection, 700 * usually called synchronously from the 701 * node XOPs for the strategy code to 702 * fake the frontend collection and complete 703 * the BIO as soon as possible. 704 * 705 * HAMMER2_XOP_SYNCHRONIZER - Reqeuest synchronization with a particular 706 * cluster index, prevents looping when that 707 * index is out of sync so caller can act on 708 * the out of sync element. ESRCH and EDEADLK 709 * can be returned if this flag is specified. 710 * 711 * Returns 0 on success plus a filled out xop->cluster structure. 712 * Return ENOENT on normal termination. 713 * Otherwise return an error. 714 */ 715 int 716 hammer2_xop_collect(hammer2_xop_head_t *xop, int flags) 717 { 718 hammer2_xop_fifo_t *fifo; 719 hammer2_chain_t *chain; 720 hammer2_key_t lokey; 721 int error; 722 int keynull; 723 int adv; /* advance the element */ 724 int i; 725 uint32_t check_counter; 726 727 loop: 728 /* 729 * First loop tries to advance pieces of the cluster which 730 * are out of sync. 731 */ 732 lokey = HAMMER2_KEY_MAX; 733 keynull = HAMMER2_CHECK_NULL; 734 check_counter = xop->check_counter; 735 cpu_lfence(); 736 737 for (i = 0; i < xop->cluster.nchains; ++i) { 738 chain = xop->cluster.array[i].chain; 739 if (chain == NULL) { 740 adv = 1; 741 } else if (chain->bref.key < xop->collect_key) { 742 adv = 1; 743 } else { 744 keynull &= ~HAMMER2_CHECK_NULL; 745 if (lokey > chain->bref.key) 746 lokey = chain->bref.key; 747 adv = 0; 748 } 749 if (adv == 0) 750 continue; 751 752 /* 753 * Advance element if possible, advanced element may be NULL. 754 */ 755 if (chain) 756 hammer2_chain_drop_unhold(chain); 757 758 fifo = &xop->collect[i]; 759 if (fifo->ri != fifo->wi) { 760 cpu_lfence(); 761 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 762 error = fifo->errors[fifo->ri & HAMMER2_XOPFIFO_MASK]; 763 ++fifo->ri; 764 xop->cluster.array[i].chain = chain; 765 xop->cluster.array[i].error = error; 766 if (chain == NULL) { 767 /* XXX */ 768 xop->cluster.array[i].flags |= 769 HAMMER2_CITEM_NULL; 770 } 771 if (fifo->wi - fifo->ri <= HAMMER2_XOPFIFO / 2) { 772 if (fifo->flags & HAMMER2_XOP_FIFO_STALL) { 773 atomic_clear_int(&fifo->flags, 774 HAMMER2_XOP_FIFO_STALL); 775 wakeup(xop); 776 lwkt_yield(); 777 } 778 } 779 --i; /* loop on same index */ 780 } else { 781 /* 782 * Retain CITEM_NULL flag. If set just repeat EOF. 783 * If not, the NULL,0 combination indicates an 784 * operation in-progress. 785 */ 786 xop->cluster.array[i].chain = NULL; 787 /* retain any CITEM_NULL setting */ 788 } 789 } 790 791 /* 792 * Determine whether the lowest collected key meets clustering 793 * requirements. Returns: 794 * 795 * 0 - key valid, cluster can be returned. 796 * 797 * ENOENT - normal end of scan, return ENOENT. 798 * 799 * ESRCH - sufficient elements collected, quorum agreement 800 * that lokey is not a valid element and should be 801 * skipped. 802 * 803 * EDEADLK - sufficient elements collected, no quorum agreement 804 * (and no agreement possible). In this situation a 805 * repair is needed, for now we loop. 806 * 807 * EINPROGRESS - insufficient elements collected to resolve, wait 808 * for event and loop. 809 */ 810 if ((flags & HAMMER2_XOP_COLLECT_WAITALL) && 811 xop->run_mask != HAMMER2_XOPMASK_VOP) { 812 error = EINPROGRESS; 813 } else { 814 error = hammer2_cluster_check(&xop->cluster, lokey, keynull); 815 } 816 if (error == EINPROGRESS) { 817 if ((flags & HAMMER2_XOP_COLLECT_NOWAIT) == 0) 818 tsleep_interlock(&xop->check_counter, 0); 819 if (atomic_cmpset_int(&xop->check_counter, 820 check_counter, 821 check_counter | HAMMER2_XOP_CHKWAIT)) { 822 if (flags & HAMMER2_XOP_COLLECT_NOWAIT) 823 goto done; 824 tsleep(&xop->check_counter, PINTERLOCKED, "h2coll", hz*60); 825 } 826 goto loop; 827 } 828 if (error == ESRCH) { 829 if (lokey != HAMMER2_KEY_MAX) { 830 xop->collect_key = lokey + 1; 831 goto loop; 832 } 833 error = ENOENT; 834 } 835 if (error == EDEADLK) { 836 kprintf("hammer2: no quorum possible lokey %016jx\n", 837 lokey); 838 if (lokey != HAMMER2_KEY_MAX) { 839 xop->collect_key = lokey + 1; 840 goto loop; 841 } 842 error = ENOENT; 843 } 844 if (lokey == HAMMER2_KEY_MAX) 845 xop->collect_key = lokey; 846 else 847 xop->collect_key = lokey + 1; 848 done: 849 return error; 850 } 851 852 /* 853 * N x M processing threads are available to handle XOPs, N per cluster 854 * index x M cluster nodes. 855 * 856 * Locate and return the next runnable xop, or NULL if no xops are 857 * present or none of the xops are currently runnable (for various reasons). 858 * The xop is left on the queue and serves to block other dependent xops 859 * from being run. 860 * 861 * Dependent xops will not be returned. 862 * 863 * Sets HAMMER2_XOP_FIFO_RUN on the returned xop or returns NULL. 864 * 865 * NOTE! Xops run concurrently for each cluster index. 866 */ 867 #define XOP_HASH_SIZE 16 868 #define XOP_HASH_MASK (XOP_HASH_SIZE - 1) 869 870 static __inline 871 int 872 xop_testhash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 873 { 874 uint32_t mask; 875 int hv; 876 877 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 878 mask = 1U << (hv & 31); 879 hv >>= 5; 880 881 return ((int)(hash[hv & XOP_HASH_MASK] & mask)); 882 } 883 884 static __inline 885 void 886 xop_sethash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 887 { 888 uint32_t mask; 889 int hv; 890 891 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 892 mask = 1U << (hv & 31); 893 hv >>= 5; 894 895 hash[hv & XOP_HASH_MASK] |= mask; 896 } 897 898 static 899 hammer2_xop_head_t * 900 hammer2_xop_next(hammer2_thread_t *thr) 901 { 902 hammer2_pfs_t *pmp = thr->pmp; 903 int clindex = thr->clindex; 904 uint32_t hash[XOP_HASH_SIZE] = { 0 }; 905 hammer2_xop_head_t *xop; 906 907 hammer2_spin_ex(&pmp->xop_spin); 908 TAILQ_FOREACH(xop, &thr->xopq, collect[clindex].entry) { 909 /* 910 * Check dependency 911 */ 912 if (xop_testhash(thr, xop->ip1, hash) || 913 (xop->ip2 && xop_testhash(thr, xop->ip2, hash)) || 914 (xop->ip3 && xop_testhash(thr, xop->ip3, hash))) { 915 continue; 916 } 917 xop_sethash(thr, xop->ip1, hash); 918 if (xop->ip2) 919 xop_sethash(thr, xop->ip2, hash); 920 if (xop->ip3) 921 xop_sethash(thr, xop->ip3, hash); 922 923 /* 924 * Check already running 925 */ 926 if (xop->collect[clindex].flags & HAMMER2_XOP_FIFO_RUN) 927 continue; 928 929 /* 930 * Found a good one, return it. 931 */ 932 atomic_set_int(&xop->collect[clindex].flags, 933 HAMMER2_XOP_FIFO_RUN); 934 break; 935 } 936 hammer2_spin_unex(&pmp->xop_spin); 937 938 return xop; 939 } 940 941 /* 942 * Remove the completed XOP from the queue, clear HAMMER2_XOP_FIFO_RUN. 943 * 944 * NOTE! Xops run concurrently for each cluster index. 945 */ 946 static 947 void 948 hammer2_xop_dequeue(hammer2_thread_t *thr, hammer2_xop_head_t *xop) 949 { 950 hammer2_pfs_t *pmp = thr->pmp; 951 int clindex = thr->clindex; 952 953 hammer2_spin_ex(&pmp->xop_spin); 954 TAILQ_REMOVE(&thr->xopq, xop, collect[clindex].entry); 955 atomic_clear_int(&xop->collect[clindex].flags, 956 HAMMER2_XOP_FIFO_RUN); 957 hammer2_spin_unex(&pmp->xop_spin); 958 if (TAILQ_FIRST(&thr->xopq)) 959 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 960 } 961 962 /* 963 * Primary management thread for xops support. Each node has several such 964 * threads which replicate front-end operations on cluster nodes. 965 * 966 * XOPS thread node operations, allowing the function to focus on a single 967 * node in the cluster after validating the operation with the cluster. 968 * This is primarily what prevents dead or stalled nodes from stalling 969 * the front-end. 970 */ 971 void 972 hammer2_primary_xops_thread(void *arg) 973 { 974 hammer2_thread_t *thr = arg; 975 hammer2_pfs_t *pmp; 976 hammer2_xop_head_t *xop; 977 uint32_t mask; 978 uint32_t flags; 979 uint32_t nflags; 980 hammer2_xop_func_t last_func = NULL; 981 982 pmp = thr->pmp; 983 /*xgrp = &pmp->xop_groups[thr->repidx]; not needed */ 984 mask = 1U << thr->clindex; 985 986 for (;;) { 987 flags = thr->flags; 988 989 /* 990 * Handle stop request 991 */ 992 if (flags & HAMMER2_THREAD_STOP) 993 break; 994 995 /* 996 * Handle freeze request 997 */ 998 if (flags & HAMMER2_THREAD_FREEZE) { 999 nflags = (flags & ~(HAMMER2_THREAD_FREEZE | 1000 HAMMER2_THREAD_CLIENTWAIT)) | 1001 HAMMER2_THREAD_FROZEN; 1002 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 1003 continue; 1004 if (flags & HAMMER2_THREAD_CLIENTWAIT) 1005 wakeup(&thr->flags); 1006 flags = nflags; 1007 /* fall through */ 1008 } 1009 1010 if (flags & HAMMER2_THREAD_UNFREEZE) { 1011 nflags = flags & ~(HAMMER2_THREAD_UNFREEZE | 1012 HAMMER2_THREAD_FROZEN | 1013 HAMMER2_THREAD_CLIENTWAIT); 1014 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 1015 continue; 1016 if (flags & HAMMER2_THREAD_CLIENTWAIT) 1017 wakeup(&thr->flags); 1018 flags = nflags; 1019 /* fall through */ 1020 } 1021 1022 /* 1023 * Force idle if frozen until unfrozen or stopped. 1024 */ 1025 if (flags & HAMMER2_THREAD_FROZEN) { 1026 nflags = flags | HAMMER2_THREAD_WAITING; 1027 tsleep_interlock(&thr->flags, 0); 1028 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1029 tsleep(&thr->flags, PINTERLOCKED, "frozen", 0); 1030 atomic_clear_int(&thr->flags, 1031 HAMMER2_THREAD_WAITING); 1032 } 1033 continue; 1034 } 1035 1036 /* 1037 * Reset state on REMASTER request 1038 */ 1039 if (flags & HAMMER2_THREAD_REMASTER) { 1040 nflags = flags & ~HAMMER2_THREAD_REMASTER; 1041 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1042 /* reset state here */ 1043 } 1044 continue; 1045 } 1046 1047 /* 1048 * Process requests. Each request can be multi-queued. 1049 * 1050 * If we get behind and the frontend VOP is no longer active, 1051 * we retire the request without processing it. The callback 1052 * may also abort processing if the frontend VOP becomes 1053 * inactive. 1054 */ 1055 if (flags & HAMMER2_THREAD_XOPQ) { 1056 nflags = flags & ~HAMMER2_THREAD_XOPQ; 1057 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 1058 continue; 1059 flags = nflags; 1060 /* fall through */ 1061 } 1062 while ((xop = hammer2_xop_next(thr)) != NULL) { 1063 if (hammer2_xop_active(xop)) { 1064 last_func = xop->func; 1065 xop->func(thr, (hammer2_xop_t *)xop); 1066 hammer2_xop_dequeue(thr, xop); 1067 hammer2_xop_retire(xop, mask); 1068 } else { 1069 last_func = xop->func; 1070 hammer2_xop_feed(xop, NULL, thr->clindex, 1071 ECONNABORTED); 1072 hammer2_xop_dequeue(thr, xop); 1073 hammer2_xop_retire(xop, mask); 1074 } 1075 } 1076 1077 /* 1078 * Wait for event, interlock using THREAD_WAITING and 1079 * THREAD_SIGNAL. 1080 * 1081 * For robustness poll on a 30-second interval, but nominally 1082 * expect to be woken up. 1083 */ 1084 nflags = flags | HAMMER2_THREAD_WAITING; 1085 1086 tsleep_interlock(&thr->flags, 0); 1087 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1088 tsleep(&thr->flags, PINTERLOCKED, "h2idle", hz*30); 1089 atomic_clear_int(&thr->flags, HAMMER2_THREAD_WAITING); 1090 } 1091 } 1092 1093 #if 0 1094 /* 1095 * Cleanup / termination 1096 */ 1097 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) { 1098 kprintf("hammer2_thread: aborting xop %p\n", xop->func); 1099 TAILQ_REMOVE(&thr->xopq, xop, 1100 collect[thr->clindex].entry); 1101 hammer2_xop_retire(xop, mask); 1102 } 1103 #endif 1104 thr->td = NULL; 1105 hammer2_thr_return(thr, HAMMER2_THREAD_STOPPED); 1106 /* thr structure can go invalid after this point */ 1107 } 1108