1 /* 2 * Copyright (c) 2015 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 /* 35 * This module implements the hammer2 helper thread API, including 36 * the frontend/backend XOP API. 37 */ 38 #include "hammer2.h" 39 40 /* 41 * Signal that the thread has work. 42 */ 43 void 44 hammer2_thr_signal(hammer2_thread_t *thr, uint32_t flags) 45 { 46 uint32_t oflags; 47 48 for (;;) { 49 oflags = thr->flags; 50 cpu_ccfence(); 51 if (oflags & HAMMER2_THREAD_WAITING) { 52 if (atomic_cmpset_int(&thr->flags, oflags, 53 (oflags | flags) & ~HAMMER2_THREAD_WAITING)) { 54 wakeup(&thr->flags); 55 break; 56 } 57 } else { 58 if (atomic_cmpset_int(&thr->flags, oflags, 59 oflags | flags)) { 60 break; 61 } 62 } 63 } 64 } 65 66 /* 67 * Return status to waiting client(s) 68 */ 69 void 70 hammer2_thr_return(hammer2_thread_t *thr, uint32_t flags) 71 { 72 uint32_t oflags; 73 uint32_t nflags; 74 75 for (;;) { 76 oflags = thr->flags; 77 cpu_ccfence(); 78 nflags = (oflags | flags) & ~HAMMER2_THREAD_CLIENTWAIT; 79 80 if (oflags & HAMMER2_THREAD_CLIENTWAIT) { 81 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 82 wakeup(thr); 83 break; 84 } 85 } else { 86 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 87 break; 88 } 89 } 90 } 91 92 /* 93 * Wait until the bits in flags are set. 94 */ 95 void 96 hammer2_thr_wait(hammer2_thread_t *thr, uint32_t flags) 97 { 98 uint32_t oflags; 99 uint32_t nflags; 100 101 for (;;) { 102 oflags = thr->flags; 103 cpu_ccfence(); 104 if ((oflags & flags) == flags) 105 break; 106 nflags = oflags | HAMMER2_THREAD_CLIENTWAIT; 107 tsleep_interlock(thr, 0); 108 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 109 tsleep(thr, PINTERLOCKED, "h2twait", hz*60); 110 } 111 } 112 } 113 114 /* 115 * Wait until the bits in flags are clear. 116 */ 117 void 118 hammer2_thr_wait_neg(hammer2_thread_t *thr, uint32_t flags) 119 { 120 uint32_t oflags; 121 uint32_t nflags; 122 123 for (;;) { 124 oflags = thr->flags; 125 cpu_ccfence(); 126 if ((oflags & flags) == 0) 127 break; 128 nflags = oflags | HAMMER2_THREAD_CLIENTWAIT; 129 tsleep_interlock(thr, 0); 130 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 131 tsleep(thr, PINTERLOCKED, "h2twait", hz*60); 132 } 133 } 134 } 135 136 /* 137 * Initialize the supplied thread structure, starting the specified 138 * thread. 139 */ 140 void 141 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp, 142 const char *id, int clindex, int repidx, 143 void (*func)(void *arg)) 144 { 145 thr->pmp = pmp; 146 thr->clindex = clindex; 147 thr->repidx = repidx; 148 TAILQ_INIT(&thr->xopq); 149 if (repidx >= 0) { 150 lwkt_create(func, thr, &thr->td, NULL, 0, repidx % ncpus, 151 "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx); 152 } else { 153 lwkt_create(func, thr, &thr->td, NULL, 0, -1, 154 "%s-%s", id, pmp->pfs_names[clindex]); 155 } 156 } 157 158 /* 159 * Terminate a thread. This function will silently return if the thread 160 * was never initialized or has already been deleted. 161 * 162 * This is accomplished by setting the STOP flag and waiting for the td 163 * structure to become NULL. 164 */ 165 void 166 hammer2_thr_delete(hammer2_thread_t *thr) 167 { 168 if (thr->td == NULL) 169 return; 170 hammer2_thr_signal(thr, HAMMER2_THREAD_STOP); 171 hammer2_thr_wait(thr, HAMMER2_THREAD_STOPPED); 172 thr->pmp = NULL; 173 KKASSERT(TAILQ_EMPTY(&thr->xopq)); 174 } 175 176 /* 177 * Asynchronous remaster request. Ask the synchronization thread to 178 * start over soon (as if it were frozen and unfrozen, but without waiting). 179 * The thread always recalculates mastership relationships when restarting. 180 */ 181 void 182 hammer2_thr_remaster(hammer2_thread_t *thr) 183 { 184 if (thr->td == NULL) 185 return; 186 hammer2_thr_signal(thr, HAMMER2_THREAD_REMASTER); 187 } 188 189 void 190 hammer2_thr_freeze_async(hammer2_thread_t *thr) 191 { 192 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 193 } 194 195 void 196 hammer2_thr_freeze(hammer2_thread_t *thr) 197 { 198 if (thr->td == NULL) 199 return; 200 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 201 hammer2_thr_wait(thr, HAMMER2_THREAD_FROZEN); 202 } 203 204 void 205 hammer2_thr_unfreeze(hammer2_thread_t *thr) 206 { 207 if (thr->td == NULL) 208 return; 209 hammer2_thr_signal(thr, HAMMER2_THREAD_UNFREEZE); 210 hammer2_thr_wait_neg(thr, HAMMER2_THREAD_FROZEN); 211 } 212 213 int 214 hammer2_thr_break(hammer2_thread_t *thr) 215 { 216 if (thr->flags & (HAMMER2_THREAD_STOP | 217 HAMMER2_THREAD_REMASTER | 218 HAMMER2_THREAD_FREEZE)) { 219 return 1; 220 } 221 return 0; 222 } 223 224 /**************************************************************************** 225 * HAMMER2 XOPS API * 226 ****************************************************************************/ 227 228 void 229 hammer2_xop_group_init(hammer2_pfs_t *pmp, hammer2_xop_group_t *xgrp) 230 { 231 /* no extra fields in structure at the moment */ 232 } 233 234 /* 235 * Allocate a XOP request. 236 * 237 * Once allocated a XOP request can be started, collected, and retired, 238 * and can be retired early if desired. 239 * 240 * NOTE: Fifo indices might not be zero but ri == wi on objcache_get(). 241 */ 242 void * 243 hammer2_xop_alloc(hammer2_inode_t *ip, int flags) 244 { 245 hammer2_xop_t *xop; 246 247 xop = objcache_get(cache_xops, M_WAITOK); 248 KKASSERT(xop->head.cluster.array[0].chain == NULL); 249 250 xop->head.ip1 = ip; 251 xop->head.func = NULL; 252 xop->head.flags = flags; 253 xop->head.state = 0; 254 xop->head.error = 0; 255 xop->head.collect_key = 0; 256 xop->head.check_counter = 0; 257 if (flags & HAMMER2_XOP_MODIFYING) 258 xop->head.mtid = hammer2_trans_sub(ip->pmp); 259 else 260 xop->head.mtid = 0; 261 262 xop->head.cluster.nchains = ip->cluster.nchains; 263 xop->head.cluster.pmp = ip->pmp; 264 xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED; 265 266 /* 267 * run_mask - Active thread (or frontend) associated with XOP 268 */ 269 xop->head.run_mask = HAMMER2_XOPMASK_VOP; 270 271 hammer2_inode_ref(ip); 272 273 return xop; 274 } 275 276 void 277 hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len) 278 { 279 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 280 xop->name1_len = name_len; 281 bcopy(name, xop->name1, name_len); 282 } 283 284 void 285 hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len) 286 { 287 xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 288 xop->name2_len = name_len; 289 bcopy(name, xop->name2, name_len); 290 } 291 292 size_t 293 hammer2_xop_setname_inum(hammer2_xop_head_t *xop, hammer2_key_t inum) 294 { 295 const size_t name_len = 18; 296 297 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 298 xop->name1_len = name_len; 299 ksnprintf(xop->name1, name_len + 1, "0x%016jx", (intmax_t)inum); 300 301 return name_len; 302 } 303 304 305 void 306 hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2) 307 { 308 xop->ip2 = ip2; 309 hammer2_inode_ref(ip2); 310 } 311 312 void 313 hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3) 314 { 315 xop->ip3 = ip3; 316 hammer2_inode_ref(ip3); 317 } 318 319 void 320 hammer2_xop_reinit(hammer2_xop_head_t *xop) 321 { 322 xop->state = 0; 323 xop->error = 0; 324 xop->collect_key = 0; 325 xop->run_mask = HAMMER2_XOPMASK_VOP; 326 } 327 328 /* 329 * A mounted PFS needs Xops threads to support frontend operations. 330 */ 331 void 332 hammer2_xop_helper_create(hammer2_pfs_t *pmp) 333 { 334 int i; 335 int j; 336 337 lockmgr(&pmp->lock, LK_EXCLUSIVE); 338 pmp->has_xop_threads = 1; 339 340 for (i = 0; i < pmp->iroot->cluster.nchains; ++i) { 341 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) { 342 if (pmp->xop_groups[j].thrs[i].td) 343 continue; 344 hammer2_thr_create(&pmp->xop_groups[j].thrs[i], pmp, 345 "h2xop", i, j, 346 hammer2_primary_xops_thread); 347 } 348 } 349 lockmgr(&pmp->lock, LK_RELEASE); 350 } 351 352 void 353 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp) 354 { 355 int i; 356 int j; 357 358 for (i = 0; i < pmp->pfs_nmasters; ++i) { 359 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) { 360 if (pmp->xop_groups[j].thrs[i].td) 361 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]); 362 } 363 } 364 } 365 366 /* 367 * Start a XOP request, queueing it to all nodes in the cluster to 368 * execute the cluster op. 369 * 370 * XXX optimize single-target case. 371 */ 372 void 373 hammer2_xop_start_except(hammer2_xop_head_t *xop, hammer2_xop_func_t func, 374 int notidx) 375 { 376 hammer2_inode_t *ip1; 377 hammer2_pfs_t *pmp; 378 hammer2_thread_t *thr; 379 int i; 380 int ng; 381 int nchains; 382 383 ip1 = xop->ip1; 384 pmp = ip1->pmp; 385 if (pmp->has_xop_threads == 0) 386 hammer2_xop_helper_create(pmp); 387 388 /* 389 * The intent of the XOP sequencer is to ensure that ops on the same inode 390 * execute in the same order. This is necessary when issuing modifying operations 391 * to multiple targets because some targets might get behind and the frontend is 392 * allowed to complete the moment a quorum of targets succeed. 393 * 394 * Strategy operations must be segregated from non-strategy operations to avoid 395 * a deadlock. For example, if a vfsync and a bread/bwrite were queued to 396 * the same worker thread, the locked buffer in the strategy operation can deadlock 397 * the vfsync's buffer list scan. 398 * 399 * TODO - RENAME fails here because it is potentially modifying three different 400 * inodes. 401 */ 402 if (xop->flags & HAMMER2_XOP_STRATEGY) { 403 hammer2_xop_strategy_t *xopst; 404 405 xopst = &((hammer2_xop_t *)xop)->xop_strategy; 406 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1)) ^ 407 hammer2_icrc32(&xopst->lbase, sizeof(xopst->lbase))); 408 ng = ng & (HAMMER2_XOPGROUPS_MASK >> 1); 409 ng += HAMMER2_XOPGROUPS / 2; 410 } else { 411 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1))); 412 ng = ng & (HAMMER2_XOPGROUPS_MASK >> 1); 413 } 414 xop->func = func; 415 416 /* 417 * The instant xop is queued another thread can pick it off. In the 418 * case of asynchronous ops, another thread might even finish and 419 * deallocate it. 420 */ 421 hammer2_spin_ex(&pmp->xop_spin); 422 nchains = ip1->cluster.nchains; 423 for (i = 0; i < nchains; ++i) { 424 /* 425 * XXX ip1->cluster.array* not stable here. This temporary 426 * hack fixes basic issues in target XOPs which need to 427 * obtain a starting chain from the inode but does not 428 * address possible races against inode updates which 429 * might NULL-out a chain. 430 */ 431 if (i != notidx && ip1->cluster.array[i].chain) { 432 thr = &pmp->xop_groups[ng].thrs[i]; 433 atomic_set_int(&xop->run_mask, 1U << i); 434 atomic_set_int(&xop->chk_mask, 1U << i); 435 TAILQ_INSERT_TAIL(&thr->xopq, xop, collect[i].entry); 436 } 437 } 438 hammer2_spin_unex(&pmp->xop_spin); 439 /* xop can become invalid at this point */ 440 441 /* 442 * Each thread has its own xopq 443 */ 444 for (i = 0; i < nchains; ++i) { 445 if (i != notidx) { 446 thr = &pmp->xop_groups[ng].thrs[i]; 447 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 448 } 449 } 450 } 451 452 void 453 hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_func_t func) 454 { 455 hammer2_xop_start_except(xop, func, -1); 456 } 457 458 /* 459 * Retire a XOP. Used by both the VOP frontend and by the XOP backend. 460 */ 461 void 462 hammer2_xop_retire(hammer2_xop_head_t *xop, uint32_t mask) 463 { 464 hammer2_chain_t *chain; 465 uint32_t nmask; 466 int i; 467 468 /* 469 * Remove the frontend collector or remove a backend feeder. 470 * When removing the frontend we must wakeup any backend feeders 471 * who are waiting for FIFO space. 472 * 473 * XXX optimize wakeup. 474 */ 475 KKASSERT(xop->run_mask & mask); 476 nmask = atomic_fetchadd_int(&xop->run_mask, -mask); 477 if ((nmask & ~HAMMER2_XOPMASK_FIFOW) != mask) { 478 if (mask == HAMMER2_XOPMASK_VOP) { 479 if (nmask & HAMMER2_XOPMASK_FIFOW) 480 wakeup(xop); 481 } 482 return; 483 } 484 /* else nobody else left, we can ignore FIFOW */ 485 486 /* 487 * All collectors are gone, we can cleanup and dispose of the XOP. 488 * Note that this can wind up being a frontend OR a backend. 489 * Pending chains are locked shared and not owned by any thread. 490 */ 491 #if 0 492 /* 493 * Cache the terminating cluster. 494 */ 495 hammer2_inode_t *ip; 496 if ((ip = xop->ip1) != NULL) { 497 hammer2_cluster_t *tmpclu; 498 499 tmpclu = hammer2_cluster_copy(&xop->cluster); 500 hammer2_spin_ex(&ip->cluster_spin); 501 tmpclu = atomic_swap_ptr((volatile void **)&ip->cluster_cache, 502 tmpclu); 503 hammer2_spin_unex(&ip->cluster_spin); 504 if (tmpclu) 505 hammer2_cluster_drop(tmpclu); 506 } 507 #endif 508 509 /* 510 * Cleanup the collection cluster. 511 */ 512 for (i = 0; i < xop->cluster.nchains; ++i) { 513 xop->cluster.array[i].flags = 0; 514 chain = xop->cluster.array[i].chain; 515 if (chain) { 516 xop->cluster.array[i].chain = NULL; 517 hammer2_chain_drop_unhold(chain); 518 } 519 } 520 521 /* 522 * Cleanup the fifos, use check_counter to optimize the loop. 523 * Since we are the only entity left on this xop we don't have 524 * to worry about fifo flow control, and one lfence() will do the 525 * job. 526 */ 527 cpu_lfence(); 528 mask = xop->chk_mask; 529 for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) { 530 hammer2_xop_fifo_t *fifo = &xop->collect[i]; 531 while (fifo->ri != fifo->wi) { 532 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 533 if (chain) 534 hammer2_chain_drop_unhold(chain); 535 ++fifo->ri; 536 } 537 mask &= ~(1U << i); 538 } 539 540 /* 541 * The inode is only held at this point, simply drop it. 542 */ 543 if (xop->ip1) { 544 hammer2_inode_drop(xop->ip1); 545 xop->ip1 = NULL; 546 } 547 if (xop->ip2) { 548 hammer2_inode_drop(xop->ip2); 549 xop->ip2 = NULL; 550 } 551 if (xop->ip3) { 552 hammer2_inode_drop(xop->ip3); 553 xop->ip3 = NULL; 554 } 555 if (xop->name1) { 556 kfree(xop->name1, M_HAMMER2); 557 xop->name1 = NULL; 558 xop->name1_len = 0; 559 } 560 if (xop->name2) { 561 kfree(xop->name2, M_HAMMER2); 562 xop->name2 = NULL; 563 xop->name2_len = 0; 564 } 565 566 objcache_put(cache_xops, xop); 567 } 568 569 /* 570 * (Backend) Returns non-zero if the frontend is still attached. 571 */ 572 int 573 hammer2_xop_active(hammer2_xop_head_t *xop) 574 { 575 if (xop->run_mask & HAMMER2_XOPMASK_VOP) 576 return 1; 577 else 578 return 0; 579 } 580 581 /* 582 * (Backend) Feed chain data through the cluster validator and back to 583 * the frontend. Chains are fed from multiple nodes concurrently 584 * and pipelined via per-node FIFOs in the XOP. 585 * 586 * The chain must be locked (either shared or exclusive). The caller may 587 * unlock and drop the chain on return. This function will add an extra 588 * ref and hold the chain's data for the pass-back. 589 * 590 * No xop lock is needed because we are only manipulating fields under 591 * our direct control. 592 * 593 * Returns 0 on success and a hammer error code if sync is permanently 594 * lost. The caller retains a ref on the chain but by convention 595 * the lock is typically inherited by the xop (caller loses lock). 596 * 597 * Returns non-zero on error. In this situation the caller retains a 598 * ref on the chain but loses the lock (we unlock here). 599 */ 600 int 601 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain, 602 int clindex, int error) 603 { 604 hammer2_xop_fifo_t *fifo; 605 uint32_t mask; 606 607 /* 608 * Early termination (typicaly of xop_readir) 609 */ 610 if (hammer2_xop_active(xop) == 0) { 611 error = EINTR; 612 goto done; 613 } 614 615 /* 616 * Multi-threaded entry into the XOP collector. We own the 617 * fifo->wi for our clindex. 618 */ 619 fifo = &xop->collect[clindex]; 620 621 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) 622 lwkt_yield(); 623 while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 624 atomic_set_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 625 mask = xop->run_mask; 626 if ((mask & HAMMER2_XOPMASK_VOP) == 0) { 627 error = EINTR; 628 goto done; 629 } 630 tsleep_interlock(xop, 0); 631 if (atomic_cmpset_int(&xop->run_mask, mask, 632 mask | HAMMER2_XOPMASK_FIFOW)) { 633 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 634 tsleep(xop, PINTERLOCKED, "h2feed", hz*60); 635 } 636 } 637 /* retry */ 638 } 639 atomic_clear_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 640 if (chain) 641 hammer2_chain_ref_hold(chain); 642 if (error == 0 && chain) 643 error = chain->error; 644 fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error; 645 fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain; 646 cpu_sfence(); 647 ++fifo->wi; 648 if (atomic_fetchadd_int(&xop->check_counter, HAMMER2_XOP_CHKINC) & 649 HAMMER2_XOP_CHKWAIT) { 650 atomic_clear_int(&xop->check_counter, HAMMER2_XOP_CHKWAIT); 651 wakeup(&xop->check_counter); 652 } 653 error = 0; 654 655 /* 656 * Cleanup. If an error occurred we eat the lock. If no error 657 * occurred the fifo inherits the lock and gains an additional ref. 658 * 659 * The caller's ref remains in both cases. 660 */ 661 done: 662 return error; 663 } 664 665 /* 666 * (Frontend) collect a response from a running cluster op. 667 * 668 * Responses are fed from all appropriate nodes concurrently 669 * and collected into a cohesive response >= collect_key. 670 * 671 * The collector will return the instant quorum or other requirements 672 * are met, even if some nodes get behind or become non-responsive. 673 * 674 * HAMMER2_XOP_COLLECT_NOWAIT - Used to 'poll' a completed collection, 675 * usually called synchronously from the 676 * node XOPs for the strategy code to 677 * fake the frontend collection and complete 678 * the BIO as soon as possible. 679 * 680 * HAMMER2_XOP_SYNCHRONIZER - Reqeuest synchronization with a particular 681 * cluster index, prevents looping when that 682 * index is out of sync so caller can act on 683 * the out of sync element. ESRCH and EDEADLK 684 * can be returned if this flag is specified. 685 * 686 * Returns 0 on success plus a filled out xop->cluster structure. 687 * Return ENOENT on normal termination. 688 * Otherwise return an error. 689 */ 690 int 691 hammer2_xop_collect(hammer2_xop_head_t *xop, int flags) 692 { 693 hammer2_xop_fifo_t *fifo; 694 hammer2_chain_t *chain; 695 hammer2_key_t lokey; 696 int error; 697 int keynull; 698 int adv; /* advance the element */ 699 int i; 700 uint32_t check_counter; 701 702 loop: 703 /* 704 * First loop tries to advance pieces of the cluster which 705 * are out of sync. 706 */ 707 lokey = HAMMER2_KEY_MAX; 708 keynull = HAMMER2_CHECK_NULL; 709 check_counter = xop->check_counter; 710 cpu_lfence(); 711 712 for (i = 0; i < xop->cluster.nchains; ++i) { 713 chain = xop->cluster.array[i].chain; 714 if (chain == NULL) { 715 adv = 1; 716 } else if (chain->bref.key < xop->collect_key) { 717 adv = 1; 718 } else { 719 keynull &= ~HAMMER2_CHECK_NULL; 720 if (lokey > chain->bref.key) 721 lokey = chain->bref.key; 722 adv = 0; 723 } 724 if (adv == 0) 725 continue; 726 727 /* 728 * Advance element if possible, advanced element may be NULL. 729 */ 730 if (chain) 731 hammer2_chain_drop_unhold(chain); 732 733 fifo = &xop->collect[i]; 734 if (fifo->ri != fifo->wi) { 735 cpu_lfence(); 736 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 737 error = fifo->errors[fifo->ri & HAMMER2_XOPFIFO_MASK]; 738 ++fifo->ri; 739 xop->cluster.array[i].chain = chain; 740 xop->cluster.array[i].error = error; 741 if (chain == NULL) { 742 /* XXX */ 743 xop->cluster.array[i].flags |= 744 HAMMER2_CITEM_NULL; 745 } 746 if (fifo->wi - fifo->ri <= HAMMER2_XOPFIFO / 2) { 747 if (fifo->flags & HAMMER2_XOP_FIFO_STALL) { 748 atomic_clear_int(&fifo->flags, 749 HAMMER2_XOP_FIFO_STALL); 750 wakeup(xop); 751 lwkt_yield(); 752 } 753 } 754 --i; /* loop on same index */ 755 } else { 756 /* 757 * Retain CITEM_NULL flag. If set just repeat EOF. 758 * If not, the NULL,0 combination indicates an 759 * operation in-progress. 760 */ 761 xop->cluster.array[i].chain = NULL; 762 /* retain any CITEM_NULL setting */ 763 } 764 } 765 766 /* 767 * Determine whether the lowest collected key meets clustering 768 * requirements. Returns: 769 * 770 * 0 - key valid, cluster can be returned. 771 * 772 * ENOENT - normal end of scan, return ENOENT. 773 * 774 * ESRCH - sufficient elements collected, quorum agreement 775 * that lokey is not a valid element and should be 776 * skipped. 777 * 778 * EDEADLK - sufficient elements collected, no quorum agreement 779 * (and no agreement possible). In this situation a 780 * repair is needed, for now we loop. 781 * 782 * EINPROGRESS - insufficient elements collected to resolve, wait 783 * for event and loop. 784 */ 785 if ((flags & HAMMER2_XOP_COLLECT_WAITALL) && 786 xop->run_mask != HAMMER2_XOPMASK_VOP) { 787 error = EINPROGRESS; 788 } else { 789 error = hammer2_cluster_check(&xop->cluster, lokey, keynull); 790 } 791 if (error == EINPROGRESS) { 792 if ((flags & HAMMER2_XOP_COLLECT_NOWAIT) == 0) 793 tsleep_interlock(&xop->check_counter, 0); 794 if (atomic_cmpset_int(&xop->check_counter, 795 check_counter, 796 check_counter | HAMMER2_XOP_CHKWAIT)) { 797 if (flags & HAMMER2_XOP_COLLECT_NOWAIT) 798 goto done; 799 tsleep(&xop->check_counter, PINTERLOCKED, "h2coll", hz*60); 800 } 801 goto loop; 802 } 803 if (error == ESRCH) { 804 if (lokey != HAMMER2_KEY_MAX) { 805 xop->collect_key = lokey + 1; 806 goto loop; 807 } 808 error = ENOENT; 809 } 810 if (error == EDEADLK) { 811 kprintf("hammer2: no quorum possible lokey %016jx\n", 812 lokey); 813 if (lokey != HAMMER2_KEY_MAX) { 814 xop->collect_key = lokey + 1; 815 goto loop; 816 } 817 error = ENOENT; 818 } 819 if (lokey == HAMMER2_KEY_MAX) 820 xop->collect_key = lokey; 821 else 822 xop->collect_key = lokey + 1; 823 done: 824 return error; 825 } 826 827 /* 828 * N x M processing threads are available to handle XOPs, N per cluster 829 * index x M cluster nodes. All the threads for any given cluster index 830 * share and pull from the same xopq. 831 * 832 * Locate and return the next runnable xop, or NULL if no xops are 833 * present or none of the xops are currently runnable (for various reasons). 834 * The xop is left on the queue and serves to block other dependent xops 835 * from being run. 836 * 837 * Dependent xops will not be returned. 838 * 839 * Sets HAMMER2_XOP_FIFO_RUN on the returned xop or returns NULL. 840 * 841 * NOTE! Xops run concurrently for each cluster index. 842 */ 843 #define XOP_HASH_SIZE 16 844 #define XOP_HASH_MASK (XOP_HASH_SIZE - 1) 845 846 static __inline 847 int 848 xop_testhash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 849 { 850 uint32_t mask; 851 int hv; 852 853 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 854 mask = 1U << (hv & 31); 855 hv >>= 5; 856 857 return ((int)(hash[hv & XOP_HASH_MASK] & mask)); 858 } 859 860 static __inline 861 void 862 xop_sethash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 863 { 864 uint32_t mask; 865 int hv; 866 867 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 868 mask = 1U << (hv & 31); 869 hv >>= 5; 870 871 hash[hv & XOP_HASH_MASK] |= mask; 872 } 873 874 static 875 hammer2_xop_head_t * 876 hammer2_xop_next(hammer2_thread_t *thr) 877 { 878 hammer2_pfs_t *pmp = thr->pmp; 879 int clindex = thr->clindex; 880 uint32_t hash[XOP_HASH_SIZE] = { 0 }; 881 hammer2_xop_head_t *xop; 882 883 hammer2_spin_ex(&pmp->xop_spin); 884 TAILQ_FOREACH(xop, &thr->xopq, collect[clindex].entry) { 885 /* 886 * Check dependency 887 */ 888 if (xop_testhash(thr, xop->ip1, hash) || 889 (xop->ip2 && xop_testhash(thr, xop->ip2, hash)) || 890 (xop->ip3 && xop_testhash(thr, xop->ip3, hash))) { 891 continue; 892 } 893 xop_sethash(thr, xop->ip1, hash); 894 if (xop->ip2) 895 xop_sethash(thr, xop->ip2, hash); 896 if (xop->ip3) 897 xop_sethash(thr, xop->ip3, hash); 898 899 /* 900 * Check already running 901 */ 902 if (xop->collect[clindex].flags & HAMMER2_XOP_FIFO_RUN) 903 continue; 904 905 /* 906 * Found a good one, return it. 907 */ 908 atomic_set_int(&xop->collect[clindex].flags, 909 HAMMER2_XOP_FIFO_RUN); 910 break; 911 } 912 hammer2_spin_unex(&pmp->xop_spin); 913 914 return xop; 915 } 916 917 /* 918 * Remove the completed XOP from the queue, clear HAMMER2_XOP_FIFO_RUN. 919 * 920 * NOTE! Xops run concurrently for each cluster index. 921 */ 922 static 923 void 924 hammer2_xop_dequeue(hammer2_thread_t *thr, hammer2_xop_head_t *xop) 925 { 926 hammer2_pfs_t *pmp = thr->pmp; 927 int clindex = thr->clindex; 928 929 hammer2_spin_ex(&pmp->xop_spin); 930 TAILQ_REMOVE(&thr->xopq, xop, collect[clindex].entry); 931 atomic_clear_int(&xop->collect[clindex].flags, 932 HAMMER2_XOP_FIFO_RUN); 933 hammer2_spin_unex(&pmp->xop_spin); 934 if (TAILQ_FIRST(&thr->xopq)) 935 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 936 } 937 938 /* 939 * Primary management thread for xops support. Each node has several such 940 * threads which replicate front-end operations on cluster nodes. 941 * 942 * XOPS thread node operations, allowing the function to focus on a single 943 * node in the cluster after validating the operation with the cluster. 944 * This is primarily what prevents dead or stalled nodes from stalling 945 * the front-end. 946 */ 947 void 948 hammer2_primary_xops_thread(void *arg) 949 { 950 hammer2_thread_t *thr = arg; 951 hammer2_pfs_t *pmp; 952 hammer2_xop_head_t *xop; 953 uint32_t mask; 954 uint32_t flags; 955 uint32_t nflags; 956 hammer2_xop_func_t last_func = NULL; 957 958 pmp = thr->pmp; 959 /*xgrp = &pmp->xop_groups[thr->repidx]; not needed */ 960 mask = 1U << thr->clindex; 961 962 for (;;) { 963 flags = thr->flags; 964 965 /* 966 * Handle stop request 967 */ 968 if (flags & HAMMER2_THREAD_STOP) 969 break; 970 971 /* 972 * Handle freeze request 973 */ 974 if (flags & HAMMER2_THREAD_FREEZE) { 975 nflags = (flags & ~(HAMMER2_THREAD_FREEZE | 976 HAMMER2_THREAD_CLIENTWAIT)) | 977 HAMMER2_THREAD_FROZEN; 978 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 979 continue; 980 if (flags & HAMMER2_THREAD_CLIENTWAIT) 981 wakeup(&thr->flags); 982 flags = nflags; 983 /* fall through */ 984 } 985 986 if (flags & HAMMER2_THREAD_UNFREEZE) { 987 nflags = flags & ~(HAMMER2_THREAD_UNFREEZE | 988 HAMMER2_THREAD_FROZEN | 989 HAMMER2_THREAD_CLIENTWAIT); 990 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 991 continue; 992 if (flags & HAMMER2_THREAD_CLIENTWAIT) 993 wakeup(&thr->flags); 994 flags = nflags; 995 /* fall through */ 996 } 997 998 /* 999 * Force idle if frozen until unfrozen or stopped. 1000 */ 1001 if (flags & HAMMER2_THREAD_FROZEN) { 1002 nflags = flags | HAMMER2_THREAD_WAITING; 1003 tsleep_interlock(&thr->flags, 0); 1004 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1005 tsleep(&thr->flags, PINTERLOCKED, "frozen", 0); 1006 atomic_clear_int(&thr->flags, 1007 HAMMER2_THREAD_WAITING); 1008 } 1009 continue; 1010 } 1011 1012 /* 1013 * Reset state on REMASTER request 1014 */ 1015 if (flags & HAMMER2_THREAD_REMASTER) { 1016 nflags = flags & ~HAMMER2_THREAD_REMASTER; 1017 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1018 /* reset state here */ 1019 } 1020 continue; 1021 } 1022 1023 /* 1024 * Process requests. Each request can be multi-queued. 1025 * 1026 * If we get behind and the frontend VOP is no longer active, 1027 * we retire the request without processing it. The callback 1028 * may also abort processing if the frontend VOP becomes 1029 * inactive. 1030 */ 1031 if (flags & HAMMER2_THREAD_XOPQ) { 1032 nflags = flags & ~HAMMER2_THREAD_XOPQ; 1033 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 1034 continue; 1035 flags = nflags; 1036 /* fall through */ 1037 } 1038 while ((xop = hammer2_xop_next(thr)) != NULL) { 1039 if (hammer2_xop_active(xop)) { 1040 last_func = xop->func; 1041 xop->func((hammer2_xop_t *)xop, thr->clindex); 1042 hammer2_xop_dequeue(thr, xop); 1043 hammer2_xop_retire(xop, mask); 1044 } else { 1045 last_func = xop->func; 1046 hammer2_xop_feed(xop, NULL, thr->clindex, 1047 ECONNABORTED); 1048 hammer2_xop_dequeue(thr, xop); 1049 hammer2_xop_retire(xop, mask); 1050 } 1051 } 1052 1053 /* 1054 * Wait for event, interlock using THREAD_WAITING and 1055 * THREAD_SIGNAL. 1056 * 1057 * For robustness poll on a 30-second interval, but nominally 1058 * expect to be woken up. 1059 */ 1060 nflags = flags | HAMMER2_THREAD_WAITING; 1061 1062 tsleep_interlock(&thr->flags, 0); 1063 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1064 tsleep(&thr->flags, PINTERLOCKED, "h2idle", hz*30); 1065 atomic_clear_int(&thr->flags, HAMMER2_THREAD_WAITING); 1066 } 1067 } 1068 1069 #if 0 1070 /* 1071 * Cleanup / termination 1072 */ 1073 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) { 1074 kprintf("hammer2_thread: aborting xop %p\n", xop->func); 1075 TAILQ_REMOVE(&thr->xopq, xop, 1076 collect[thr->clindex].entry); 1077 hammer2_xop_retire(xop, mask); 1078 } 1079 #endif 1080 thr->td = NULL; 1081 hammer2_thr_return(thr, HAMMER2_THREAD_STOPPED); 1082 /* thr structure can go invalid after this point */ 1083 wakeup(thr); 1084 } 1085