1 /* 2 * Copyright (c) 2015 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 /* 35 * This module implements the hammer2 helper thread API, including 36 * the frontend/backend XOP API. 37 */ 38 #include "hammer2.h" 39 40 /* 41 * Signal that the thread has work. 42 */ 43 void 44 hammer2_thr_signal(hammer2_thread_t *thr, uint32_t flags) 45 { 46 uint32_t oflags; 47 48 for (;;) { 49 oflags = thr->flags; 50 cpu_ccfence(); 51 if (oflags & HAMMER2_THREAD_WAITING) { 52 if (atomic_cmpset_int(&thr->flags, oflags, 53 (oflags | flags) & ~HAMMER2_THREAD_WAITING)) { 54 wakeup(&thr->flags); 55 break; 56 } 57 } else { 58 if (atomic_cmpset_int(&thr->flags, oflags, 59 oflags | flags)) { 60 break; 61 } 62 } 63 } 64 } 65 66 /* 67 * Return status to waiting client(s) 68 */ 69 void 70 hammer2_thr_return(hammer2_thread_t *thr, uint32_t flags) 71 { 72 uint32_t oflags; 73 uint32_t nflags; 74 75 for (;;) { 76 oflags = thr->flags; 77 cpu_ccfence(); 78 nflags = (oflags | flags) & ~HAMMER2_THREAD_CLIENTWAIT; 79 80 if (oflags & HAMMER2_THREAD_CLIENTWAIT) { 81 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 82 wakeup(thr); 83 break; 84 } 85 } else { 86 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 87 break; 88 } 89 } 90 } 91 92 /* 93 * Wait until the bits in flags are set. 94 */ 95 void 96 hammer2_thr_wait(hammer2_thread_t *thr, uint32_t flags) 97 { 98 uint32_t oflags; 99 uint32_t nflags; 100 101 for (;;) { 102 oflags = thr->flags; 103 cpu_ccfence(); 104 if ((oflags & flags) == flags) 105 break; 106 nflags = oflags | HAMMER2_THREAD_CLIENTWAIT; 107 tsleep_interlock(thr, 0); 108 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 109 tsleep(thr, PINTERLOCKED, "h2twait", hz*60); 110 } 111 } 112 } 113 114 /* 115 * Wait until the bits in flags are clear. 116 */ 117 void 118 hammer2_thr_wait_neg(hammer2_thread_t *thr, uint32_t flags) 119 { 120 uint32_t oflags; 121 uint32_t nflags; 122 123 for (;;) { 124 oflags = thr->flags; 125 cpu_ccfence(); 126 if ((oflags & flags) == 0) 127 break; 128 nflags = oflags | HAMMER2_THREAD_CLIENTWAIT; 129 tsleep_interlock(thr, 0); 130 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 131 tsleep(thr, PINTERLOCKED, "h2twait", hz*60); 132 } 133 } 134 } 135 136 /* 137 * Initialize the supplied thread structure, starting the specified 138 * thread. 139 */ 140 void 141 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp, 142 const char *id, int clindex, int repidx, 143 void (*func)(void *arg)) 144 { 145 thr->pmp = pmp; 146 thr->clindex = clindex; 147 thr->repidx = repidx; 148 if (repidx >= 0) { 149 thr->xopq = &pmp->xopq[clindex][repidx]; 150 lwkt_create(func, thr, &thr->td, NULL, 0, repidx % ncpus, 151 "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx); 152 } else { 153 thr->xopq = &pmp->xopq[clindex][HAMMER2_XOPGROUPS-repidx]; 154 lwkt_create(func, thr, &thr->td, NULL, 0, -1, 155 "%s-%s", id, pmp->pfs_names[clindex]); 156 } 157 } 158 159 /* 160 * Terminate a thread. This function will silently return if the thread 161 * was never initialized or has already been deleted. 162 * 163 * This is accomplished by setting the STOP flag and waiting for the td 164 * structure to become NULL. 165 */ 166 void 167 hammer2_thr_delete(hammer2_thread_t *thr) 168 { 169 if (thr->td == NULL) 170 return; 171 hammer2_thr_signal(thr, HAMMER2_THREAD_STOP); 172 hammer2_thr_wait(thr, HAMMER2_THREAD_STOPPED); 173 thr->pmp = NULL; 174 thr->xopq = NULL; 175 } 176 177 /* 178 * Asynchronous remaster request. Ask the synchronization thread to 179 * start over soon (as if it were frozen and unfrozen, but without waiting). 180 * The thread always recalculates mastership relationships when restarting. 181 */ 182 void 183 hammer2_thr_remaster(hammer2_thread_t *thr) 184 { 185 if (thr->td == NULL) 186 return; 187 hammer2_thr_signal(thr, HAMMER2_THREAD_REMASTER); 188 } 189 190 void 191 hammer2_thr_freeze_async(hammer2_thread_t *thr) 192 { 193 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 194 } 195 196 void 197 hammer2_thr_freeze(hammer2_thread_t *thr) 198 { 199 if (thr->td == NULL) 200 return; 201 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 202 hammer2_thr_wait(thr, HAMMER2_THREAD_FROZEN); 203 } 204 205 void 206 hammer2_thr_unfreeze(hammer2_thread_t *thr) 207 { 208 if (thr->td == NULL) 209 return; 210 hammer2_thr_signal(thr, HAMMER2_THREAD_UNFREEZE); 211 hammer2_thr_wait_neg(thr, HAMMER2_THREAD_FROZEN); 212 } 213 214 int 215 hammer2_thr_break(hammer2_thread_t *thr) 216 { 217 if (thr->flags & (HAMMER2_THREAD_STOP | 218 HAMMER2_THREAD_REMASTER | 219 HAMMER2_THREAD_FREEZE)) { 220 return 1; 221 } 222 return 0; 223 } 224 225 /**************************************************************************** 226 * HAMMER2 XOPS API * 227 ****************************************************************************/ 228 229 void 230 hammer2_xop_group_init(hammer2_pfs_t *pmp, hammer2_xop_group_t *xgrp) 231 { 232 /* no extra fields in structure at the moment */ 233 } 234 235 /* 236 * Allocate a XOP request. 237 * 238 * Once allocated a XOP request can be started, collected, and retired, 239 * and can be retired early if desired. 240 * 241 * NOTE: Fifo indices might not be zero but ri == wi on objcache_get(). 242 */ 243 void * 244 hammer2_xop_alloc(hammer2_inode_t *ip, int flags) 245 { 246 hammer2_xop_t *xop; 247 248 xop = objcache_get(cache_xops, M_WAITOK); 249 KKASSERT(xop->head.cluster.array[0].chain == NULL); 250 251 xop->head.ip1 = ip; 252 xop->head.func = NULL; 253 xop->head.flags = flags; 254 xop->head.state = 0; 255 xop->head.error = 0; 256 xop->head.collect_key = 0; 257 if (flags & HAMMER2_XOP_MODIFYING) 258 xop->head.mtid = hammer2_trans_sub(ip->pmp); 259 else 260 xop->head.mtid = 0; 261 262 xop->head.cluster.nchains = ip->cluster.nchains; 263 xop->head.cluster.pmp = ip->pmp; 264 xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED; 265 266 /* 267 * run_mask - Active thread (or frontend) associated with XOP 268 */ 269 xop->head.run_mask = HAMMER2_XOPMASK_VOP; 270 271 hammer2_inode_ref(ip); 272 273 return xop; 274 } 275 276 void 277 hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len) 278 { 279 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 280 xop->name1_len = name_len; 281 bcopy(name, xop->name1, name_len); 282 } 283 284 void 285 hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len) 286 { 287 xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 288 xop->name2_len = name_len; 289 bcopy(name, xop->name2, name_len); 290 } 291 292 size_t 293 hammer2_xop_setname_inum(hammer2_xop_head_t *xop, hammer2_key_t inum) 294 { 295 const size_t name_len = 18; 296 297 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 298 xop->name1_len = name_len; 299 ksnprintf(xop->name1, name_len + 1, "0x%016jx", (intmax_t)inum); 300 301 return name_len; 302 } 303 304 305 void 306 hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2) 307 { 308 xop->ip2 = ip2; 309 hammer2_inode_ref(ip2); 310 } 311 312 void 313 hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3) 314 { 315 xop->ip3 = ip3; 316 hammer2_inode_ref(ip3); 317 } 318 319 void 320 hammer2_xop_reinit(hammer2_xop_head_t *xop) 321 { 322 xop->state = 0; 323 xop->error = 0; 324 xop->collect_key = 0; 325 xop->run_mask = HAMMER2_XOPMASK_VOP; 326 } 327 328 /* 329 * A mounted PFS needs Xops threads to support frontend operations. 330 */ 331 void 332 hammer2_xop_helper_create(hammer2_pfs_t *pmp) 333 { 334 int i; 335 int j; 336 337 lockmgr(&pmp->lock, LK_EXCLUSIVE); 338 pmp->has_xop_threads = 1; 339 340 for (i = 0; i < pmp->iroot->cluster.nchains; ++i) { 341 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) { 342 if (pmp->xop_groups[j].thrs[i].td) 343 continue; 344 hammer2_thr_create(&pmp->xop_groups[j].thrs[i], pmp, 345 "h2xop", i, j, 346 hammer2_primary_xops_thread); 347 } 348 } 349 lockmgr(&pmp->lock, LK_RELEASE); 350 } 351 352 void 353 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp) 354 { 355 int i; 356 int j; 357 358 for (i = 0; i < pmp->pfs_nmasters; ++i) { 359 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) { 360 if (pmp->xop_groups[j].thrs[i].td) 361 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]); 362 } 363 } 364 } 365 366 /* 367 * Start a XOP request, queueing it to all nodes in the cluster to 368 * execute the cluster op. 369 * 370 * XXX optimize single-target case. 371 */ 372 void 373 hammer2_xop_start_except(hammer2_xop_head_t *xop, hammer2_xop_func_t func, 374 int notidx) 375 { 376 hammer2_inode_t *ip1; 377 #if 0 378 hammer2_xop_group_t *xgrp; 379 hammer2_thread_t *thr; 380 #endif 381 hammer2_pfs_t *pmp; 382 int i; 383 int ng; 384 int nchains; 385 386 ip1 = xop->ip1; 387 pmp = ip1->pmp; 388 if (pmp->has_xop_threads == 0) 389 hammer2_xop_helper_create(pmp); 390 391 if (xop->flags & HAMMER2_XOP_ITERATOR) { 392 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1)) ^ 393 pmp->xop_iterator++); 394 } else { 395 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1)) ^ 396 hammer2_icrc32(&func, sizeof(func))); 397 } 398 ng = ng & HAMMER2_XOPGROUPS_MASK; 399 #if 0 400 g = pmp->xop_iterator++; 401 g = g & HAMMER2_XOPGROUPS_MASK; 402 xgrp = &pmp->xop_groups[g]; 403 xop->xgrp = xgrp; 404 #endif 405 xop->func = func; 406 407 /* 408 * The XOP sequencer is based on ip1, ip2, and ip3. Because ops can 409 * finish early and unlock the related inodes, some targets may get 410 * behind. The sequencer ensures that ops on the same inode execute 411 * in the same order. 412 * 413 * The instant xop is queued another thread can pick it off. In the 414 * case of asynchronous ops, another thread might even finish and 415 * deallocate it. 416 */ 417 hammer2_spin_ex(&pmp->xop_spin); 418 nchains = ip1->cluster.nchains; 419 for (i = 0; i < nchains; ++i) { 420 /* 421 * XXX ip1->cluster.array* not stable here. This temporary 422 * hack fixes basic issues in target XOPs which need to 423 * obtain a starting chain from the inode but does not 424 * address possible races against inode updates which 425 * might NULL-out a chain. 426 */ 427 if (i != notidx && ip1->cluster.array[i].chain) { 428 atomic_set_int(&xop->run_mask, 1U << i); 429 atomic_set_int(&xop->chk_mask, 1U << i); 430 TAILQ_INSERT_TAIL(&pmp->xopq[i][ng], xop, collect[i].entry); 431 } 432 } 433 hammer2_spin_unex(&pmp->xop_spin); 434 /* xop can become invalid at this point */ 435 436 /* 437 * Try to wakeup just one xop thread for each cluster node. 438 */ 439 for (i = 0; i < nchains; ++i) { 440 if (i != notidx) { 441 hammer2_thr_signal(&pmp->xop_groups[ng].thrs[i], 442 HAMMER2_THREAD_XOPQ); 443 } 444 } 445 } 446 447 void 448 hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_func_t func) 449 { 450 hammer2_xop_start_except(xop, func, -1); 451 } 452 453 /* 454 * Retire a XOP. Used by both the VOP frontend and by the XOP backend. 455 */ 456 void 457 hammer2_xop_retire(hammer2_xop_head_t *xop, uint32_t mask) 458 { 459 hammer2_chain_t *chain; 460 uint32_t nmask; 461 int i; 462 463 /* 464 * Remove the frontend collector or remove a backend feeder. 465 * When removing the frontend we must wakeup any backend feeders 466 * who are waiting for FIFO space. 467 * 468 * XXX optimize wakeup. 469 */ 470 KKASSERT(xop->run_mask & mask); 471 nmask = atomic_fetchadd_int(&xop->run_mask, -mask); 472 if ((nmask & ~HAMMER2_XOPMASK_FIFOW) != mask) { 473 if (mask == HAMMER2_XOPMASK_VOP) { 474 if (nmask & HAMMER2_XOPMASK_FIFOW) 475 wakeup(xop); 476 } 477 return; 478 } 479 /* else nobody else left, we can ignore FIFOW */ 480 481 /* 482 * All collectors are gone, we can cleanup and dispose of the XOP. 483 * Note that this can wind up being a frontend OR a backend. 484 * Pending chains are locked shared and not owned by any thread. 485 */ 486 #if 0 487 /* 488 * Cache the terminating cluster. 489 */ 490 hammer2_inode_t *ip; 491 if ((ip = xop->ip1) != NULL) { 492 hammer2_cluster_t *tmpclu; 493 494 tmpclu = hammer2_cluster_copy(&xop->cluster); 495 hammer2_spin_ex(&ip->cluster_spin); 496 tmpclu = atomic_swap_ptr((volatile void **)&ip->cluster_cache, 497 tmpclu); 498 hammer2_spin_unex(&ip->cluster_spin); 499 if (tmpclu) 500 hammer2_cluster_drop(tmpclu); 501 } 502 #endif 503 504 /* 505 * Cleanup the collection cluster. 506 */ 507 for (i = 0; i < xop->cluster.nchains; ++i) { 508 xop->cluster.array[i].flags = 0; 509 chain = xop->cluster.array[i].chain; 510 if (chain) { 511 xop->cluster.array[i].chain = NULL; 512 hammer2_chain_drop_unhold(chain); 513 } 514 } 515 516 /* 517 * Cleanup the fifos, use check_counter to optimize the loop. 518 * Since we are the only entity left on this xop we don't have 519 * to worry about fifo flow control, and one lfence() will do the 520 * job. 521 */ 522 cpu_lfence(); 523 mask = xop->chk_mask; 524 for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) { 525 hammer2_xop_fifo_t *fifo = &xop->collect[i]; 526 while (fifo->ri != fifo->wi) { 527 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 528 if (chain) 529 hammer2_chain_drop_unhold(chain); 530 ++fifo->ri; 531 } 532 mask &= ~(1U << i); 533 } 534 535 /* 536 * The inode is only held at this point, simply drop it. 537 */ 538 if (xop->ip1) { 539 hammer2_inode_drop(xop->ip1); 540 xop->ip1 = NULL; 541 } 542 if (xop->ip2) { 543 hammer2_inode_drop(xop->ip2); 544 xop->ip2 = NULL; 545 } 546 if (xop->ip3) { 547 hammer2_inode_drop(xop->ip3); 548 xop->ip3 = NULL; 549 } 550 if (xop->name1) { 551 kfree(xop->name1, M_HAMMER2); 552 xop->name1 = NULL; 553 xop->name1_len = 0; 554 } 555 if (xop->name2) { 556 kfree(xop->name2, M_HAMMER2); 557 xop->name2 = NULL; 558 xop->name2_len = 0; 559 } 560 561 objcache_put(cache_xops, xop); 562 } 563 564 /* 565 * (Backend) Returns non-zero if the frontend is still attached. 566 */ 567 int 568 hammer2_xop_active(hammer2_xop_head_t *xop) 569 { 570 if (xop->run_mask & HAMMER2_XOPMASK_VOP) 571 return 1; 572 else 573 return 0; 574 } 575 576 /* 577 * (Backend) Feed chain data through the cluster validator and back to 578 * the frontend. Chains are fed from multiple nodes concurrently 579 * and pipelined via per-node FIFOs in the XOP. 580 * 581 * The chain must be locked (either shared or exclusive). The caller may 582 * unlock and drop the chain on return. This function will add an extra 583 * ref and hold the chain's data for the pass-back. 584 * 585 * No xop lock is needed because we are only manipulating fields under 586 * our direct control. 587 * 588 * Returns 0 on success and a hammer error code if sync is permanently 589 * lost. The caller retains a ref on the chain but by convention 590 * the lock is typically inherited by the xop (caller loses lock). 591 * 592 * Returns non-zero on error. In this situation the caller retains a 593 * ref on the chain but loses the lock (we unlock here). 594 */ 595 int 596 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain, 597 int clindex, int error) 598 { 599 hammer2_xop_fifo_t *fifo; 600 uint32_t mask; 601 602 /* 603 * Early termination (typicaly of xop_readir) 604 */ 605 if (hammer2_xop_active(xop) == 0) { 606 error = EINTR; 607 goto done; 608 } 609 610 /* 611 * Multi-threaded entry into the XOP collector. We own the 612 * fifo->wi for our clindex. 613 */ 614 fifo = &xop->collect[clindex]; 615 616 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) 617 lwkt_yield(); 618 while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 619 atomic_set_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 620 mask = xop->run_mask; 621 if ((mask & HAMMER2_XOPMASK_VOP) == 0) { 622 error = EINTR; 623 goto done; 624 } 625 tsleep_interlock(xop, 0); 626 if (atomic_cmpset_int(&xop->run_mask, mask, 627 mask | HAMMER2_XOPMASK_FIFOW)) { 628 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 629 tsleep(xop, PINTERLOCKED, "h2feed", hz*60); 630 } 631 } 632 /* retry */ 633 } 634 atomic_clear_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 635 if (chain) 636 hammer2_chain_ref_hold(chain); 637 if (error == 0 && chain) 638 error = chain->error; 639 fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error; 640 fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain; 641 cpu_sfence(); 642 ++fifo->wi; 643 atomic_add_int(&xop->check_counter, 1); 644 wakeup(&xop->check_counter); /* XXX optimize */ 645 error = 0; 646 647 /* 648 * Cleanup. If an error occurred we eat the lock. If no error 649 * occurred the fifo inherits the lock and gains an additional ref. 650 * 651 * The caller's ref remains in both cases. 652 */ 653 done: 654 return error; 655 } 656 657 /* 658 * (Frontend) collect a response from a running cluster op. 659 * 660 * Responses are fed from all appropriate nodes concurrently 661 * and collected into a cohesive response >= collect_key. 662 * 663 * The collector will return the instant quorum or other requirements 664 * are met, even if some nodes get behind or become non-responsive. 665 * 666 * HAMMER2_XOP_COLLECT_NOWAIT - Used to 'poll' a completed collection, 667 * usually called synchronously from the 668 * node XOPs for the strategy code to 669 * fake the frontend collection and complete 670 * the BIO as soon as possible. 671 * 672 * HAMMER2_XOP_SYNCHRONIZER - Reqeuest synchronization with a particular 673 * cluster index, prevents looping when that 674 * index is out of sync so caller can act on 675 * the out of sync element. ESRCH and EDEADLK 676 * can be returned if this flag is specified. 677 * 678 * Returns 0 on success plus a filled out xop->cluster structure. 679 * Return ENOENT on normal termination. 680 * Otherwise return an error. 681 */ 682 int 683 hammer2_xop_collect(hammer2_xop_head_t *xop, int flags) 684 { 685 hammer2_xop_fifo_t *fifo; 686 hammer2_chain_t *chain; 687 hammer2_key_t lokey; 688 int error; 689 int keynull; 690 int adv; /* advance the element */ 691 int i; 692 uint32_t check_counter; 693 694 loop: 695 /* 696 * First loop tries to advance pieces of the cluster which 697 * are out of sync. 698 */ 699 lokey = HAMMER2_KEY_MAX; 700 keynull = HAMMER2_CHECK_NULL; 701 check_counter = xop->check_counter; 702 cpu_lfence(); 703 704 for (i = 0; i < xop->cluster.nchains; ++i) { 705 chain = xop->cluster.array[i].chain; 706 if (chain == NULL) { 707 adv = 1; 708 } else if (chain->bref.key < xop->collect_key) { 709 adv = 1; 710 } else { 711 keynull &= ~HAMMER2_CHECK_NULL; 712 if (lokey > chain->bref.key) 713 lokey = chain->bref.key; 714 adv = 0; 715 } 716 if (adv == 0) 717 continue; 718 719 /* 720 * Advance element if possible, advanced element may be NULL. 721 */ 722 if (chain) 723 hammer2_chain_drop_unhold(chain); 724 725 fifo = &xop->collect[i]; 726 if (fifo->ri != fifo->wi) { 727 cpu_lfence(); 728 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 729 error = fifo->errors[fifo->ri & HAMMER2_XOPFIFO_MASK]; 730 ++fifo->ri; 731 xop->cluster.array[i].chain = chain; 732 xop->cluster.array[i].error = error; 733 if (chain == NULL) { 734 /* XXX */ 735 xop->cluster.array[i].flags |= 736 HAMMER2_CITEM_NULL; 737 } 738 if (fifo->wi - fifo->ri <= HAMMER2_XOPFIFO / 2) { 739 if (fifo->flags & HAMMER2_XOP_FIFO_STALL) { 740 atomic_clear_int(&fifo->flags, 741 HAMMER2_XOP_FIFO_STALL); 742 wakeup(xop); 743 lwkt_yield(); 744 } 745 } 746 --i; /* loop on same index */ 747 } else { 748 /* 749 * Retain CITEM_NULL flag. If set just repeat EOF. 750 * If not, the NULL,0 combination indicates an 751 * operation in-progress. 752 */ 753 xop->cluster.array[i].chain = NULL; 754 /* retain any CITEM_NULL setting */ 755 } 756 } 757 758 /* 759 * Determine whether the lowest collected key meets clustering 760 * requirements. Returns: 761 * 762 * 0 - key valid, cluster can be returned. 763 * 764 * ENOENT - normal end of scan, return ENOENT. 765 * 766 * ESRCH - sufficient elements collected, quorum agreement 767 * that lokey is not a valid element and should be 768 * skipped. 769 * 770 * EDEADLK - sufficient elements collected, no quorum agreement 771 * (and no agreement possible). In this situation a 772 * repair is needed, for now we loop. 773 * 774 * EINPROGRESS - insufficient elements collected to resolve, wait 775 * for event and loop. 776 */ 777 if ((flags & HAMMER2_XOP_COLLECT_WAITALL) && 778 xop->run_mask != HAMMER2_XOPMASK_VOP) { 779 error = EINPROGRESS; 780 } else { 781 error = hammer2_cluster_check(&xop->cluster, lokey, keynull); 782 } 783 if (error == EINPROGRESS) { 784 if (xop->check_counter == check_counter) { 785 if (flags & HAMMER2_XOP_COLLECT_NOWAIT) 786 goto done; 787 tsleep_interlock(&xop->check_counter, 0); 788 cpu_lfence(); 789 if (xop->check_counter == check_counter) { 790 tsleep(&xop->check_counter, PINTERLOCKED, 791 "h2coll", hz*60); 792 } 793 } 794 goto loop; 795 } 796 if (error == ESRCH) { 797 if (lokey != HAMMER2_KEY_MAX) { 798 xop->collect_key = lokey + 1; 799 goto loop; 800 } 801 error = ENOENT; 802 } 803 if (error == EDEADLK) { 804 kprintf("hammer2: no quorum possible lokey %016jx\n", 805 lokey); 806 if (lokey != HAMMER2_KEY_MAX) { 807 xop->collect_key = lokey + 1; 808 goto loop; 809 } 810 error = ENOENT; 811 } 812 if (lokey == HAMMER2_KEY_MAX) 813 xop->collect_key = lokey; 814 else 815 xop->collect_key = lokey + 1; 816 done: 817 return error; 818 } 819 820 /* 821 * N x M processing threads are available to handle XOPs, N per cluster 822 * index x M cluster nodes. All the threads for any given cluster index 823 * share and pull from the same xopq. 824 * 825 * Locate and return the next runnable xop, or NULL if no xops are 826 * present or none of the xops are currently runnable (for various reasons). 827 * The xop is left on the queue and serves to block other dependent xops 828 * from being run. 829 * 830 * Dependent xops will not be returned. 831 * 832 * Sets HAMMER2_XOP_FIFO_RUN on the returned xop or returns NULL. 833 * 834 * NOTE! Xops run concurrently for each cluster index. 835 */ 836 #define XOP_HASH_SIZE 16 837 #define XOP_HASH_MASK (XOP_HASH_SIZE - 1) 838 839 static __inline 840 int 841 xop_testhash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 842 { 843 uint32_t mask; 844 int hv; 845 846 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 847 mask = 1U << (hv & 31); 848 hv >>= 5; 849 850 return ((int)(hash[hv & XOP_HASH_MASK] & mask)); 851 } 852 853 static __inline 854 void 855 xop_sethash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 856 { 857 uint32_t mask; 858 int hv; 859 860 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 861 mask = 1U << (hv & 31); 862 hv >>= 5; 863 864 hash[hv & XOP_HASH_MASK] |= mask; 865 } 866 867 static 868 hammer2_xop_head_t * 869 hammer2_xop_next(hammer2_thread_t *thr) 870 { 871 hammer2_pfs_t *pmp = thr->pmp; 872 int clindex = thr->clindex; 873 uint32_t hash[XOP_HASH_SIZE] = { 0 }; 874 hammer2_xop_head_t *xop; 875 876 hammer2_spin_ex(&pmp->xop_spin); 877 TAILQ_FOREACH(xop, thr->xopq, collect[clindex].entry) { 878 /* 879 * Check dependency 880 */ 881 if (xop_testhash(thr, xop->ip1, hash) || 882 (xop->ip2 && xop_testhash(thr, xop->ip2, hash)) || 883 (xop->ip3 && xop_testhash(thr, xop->ip3, hash))) { 884 continue; 885 } 886 xop_sethash(thr, xop->ip1, hash); 887 if (xop->ip2) 888 xop_sethash(thr, xop->ip2, hash); 889 if (xop->ip3) 890 xop_sethash(thr, xop->ip3, hash); 891 892 /* 893 * Check already running 894 */ 895 if (xop->collect[clindex].flags & HAMMER2_XOP_FIFO_RUN) 896 continue; 897 898 /* 899 * Found a good one, return it. 900 */ 901 atomic_set_int(&xop->collect[clindex].flags, 902 HAMMER2_XOP_FIFO_RUN); 903 break; 904 } 905 hammer2_spin_unex(&pmp->xop_spin); 906 907 return xop; 908 } 909 910 /* 911 * Remove the completed XOP from the queue, clear HAMMER2_XOP_FIFO_RUN. 912 * 913 * NOTE! Xops run concurrently for each cluster index. 914 */ 915 static 916 void 917 hammer2_xop_dequeue(hammer2_thread_t *thr, hammer2_xop_head_t *xop) 918 { 919 hammer2_pfs_t *pmp = thr->pmp; 920 int clindex = thr->clindex; 921 922 hammer2_spin_ex(&pmp->xop_spin); 923 TAILQ_REMOVE(thr->xopq, xop, collect[clindex].entry); 924 atomic_clear_int(&xop->collect[clindex].flags, 925 HAMMER2_XOP_FIFO_RUN); 926 hammer2_spin_unex(&pmp->xop_spin); 927 } 928 929 /* 930 * Primary management thread for xops support. Each node has several such 931 * threads which replicate front-end operations on cluster nodes. 932 * 933 * XOPS thread node operations, allowing the function to focus on a single 934 * node in the cluster after validating the operation with the cluster. 935 * This is primarily what prevents dead or stalled nodes from stalling 936 * the front-end. 937 */ 938 void 939 hammer2_primary_xops_thread(void *arg) 940 { 941 hammer2_thread_t *thr = arg; 942 hammer2_pfs_t *pmp; 943 hammer2_xop_head_t *xop; 944 uint32_t mask; 945 uint32_t flags; 946 uint32_t nflags; 947 hammer2_xop_func_t last_func = NULL; 948 949 pmp = thr->pmp; 950 /*xgrp = &pmp->xop_groups[thr->repidx]; not needed */ 951 mask = 1U << thr->clindex; 952 953 for (;;) { 954 flags = thr->flags; 955 956 /* 957 * Handle stop request 958 */ 959 if (flags & HAMMER2_THREAD_STOP) 960 break; 961 962 /* 963 * Handle freeze request 964 */ 965 if (flags & HAMMER2_THREAD_FREEZE) { 966 nflags = (flags & ~(HAMMER2_THREAD_FREEZE | 967 HAMMER2_THREAD_CLIENTWAIT)) | 968 HAMMER2_THREAD_FROZEN; 969 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 970 continue; 971 if (flags & HAMMER2_THREAD_CLIENTWAIT) 972 wakeup(&thr->flags); 973 flags = nflags; 974 /* fall through */ 975 } 976 977 if (flags & HAMMER2_THREAD_UNFREEZE) { 978 nflags = flags & ~(HAMMER2_THREAD_UNFREEZE | 979 HAMMER2_THREAD_FROZEN | 980 HAMMER2_THREAD_CLIENTWAIT); 981 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 982 continue; 983 if (flags & HAMMER2_THREAD_CLIENTWAIT) 984 wakeup(&thr->flags); 985 flags = nflags; 986 /* fall through */ 987 } 988 989 /* 990 * Force idle if frozen until unfrozen or stopped. 991 */ 992 if (flags & HAMMER2_THREAD_FROZEN) { 993 nflags = flags | HAMMER2_THREAD_WAITING; 994 tsleep_interlock(&thr->flags, 0); 995 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 996 tsleep(&thr->flags, PINTERLOCKED, "frozen", 0); 997 atomic_clear_int(&thr->flags, 998 HAMMER2_THREAD_WAITING); 999 } 1000 continue; 1001 } 1002 1003 /* 1004 * Reset state on REMASTER request 1005 */ 1006 if (flags & HAMMER2_THREAD_REMASTER) { 1007 nflags = flags & ~HAMMER2_THREAD_REMASTER; 1008 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1009 /* reset state here */ 1010 } 1011 continue; 1012 } 1013 1014 /* 1015 * Process requests. Each request can be multi-queued. 1016 * 1017 * If we get behind and the frontend VOP is no longer active, 1018 * we retire the request without processing it. The callback 1019 * may also abort processing if the frontend VOP becomes 1020 * inactive. 1021 */ 1022 if (flags & HAMMER2_THREAD_XOPQ) { 1023 nflags = flags & ~HAMMER2_THREAD_XOPQ; 1024 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 1025 continue; 1026 flags = nflags; 1027 /* fall through */ 1028 } 1029 while ((xop = hammer2_xop_next(thr)) != NULL) { 1030 if (hammer2_xop_active(xop)) { 1031 last_func = xop->func; 1032 xop->func((hammer2_xop_t *)xop, thr->clindex); 1033 hammer2_xop_dequeue(thr, xop); 1034 hammer2_xop_retire(xop, mask); 1035 } else { 1036 last_func = xop->func; 1037 hammer2_xop_feed(xop, NULL, thr->clindex, 1038 ECONNABORTED); 1039 hammer2_xop_dequeue(thr, xop); 1040 hammer2_xop_retire(xop, mask); 1041 } 1042 } 1043 1044 /* 1045 * Wait for event, interlock using THREAD_WAITING and 1046 * THREAD_SIGNAL. 1047 * 1048 * For robustness poll on a 30-second interval, but nominally 1049 * expect to be woken up. 1050 */ 1051 nflags = flags | HAMMER2_THREAD_WAITING; 1052 1053 tsleep_interlock(&thr->flags, 0); 1054 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1055 tsleep(&thr->flags, PINTERLOCKED, "h2idle", hz*30); 1056 atomic_clear_int(&thr->flags, HAMMER2_THREAD_WAITING); 1057 } 1058 } 1059 1060 #if 0 1061 /* 1062 * Cleanup / termination 1063 */ 1064 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) { 1065 kprintf("hammer2_thread: aborting xop %p\n", xop->func); 1066 TAILQ_REMOVE(&thr->xopq, xop, 1067 collect[thr->clindex].entry); 1068 hammer2_xop_retire(xop, mask); 1069 } 1070 #endif 1071 thr->td = NULL; 1072 hammer2_thr_return(thr, HAMMER2_THREAD_STOPPED); 1073 /* thr structure can go invalid after this point */ 1074 wakeup(thr); 1075 } 1076