1 /* 2 * Copyright (c) 2015 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 /* 35 * This module implements the hammer2 helper thread API, including 36 * the frontend/backend XOP API. 37 */ 38 #include "hammer2.h" 39 40 /* 41 * Set flags and wakeup any waiters. 42 * 43 * WARNING! During teardown (thr) can disappear the instant our cmpset 44 * succeeds. 45 */ 46 void 47 hammer2_thr_signal(hammer2_thread_t *thr, uint32_t flags) 48 { 49 uint32_t oflags; 50 uint32_t nflags; 51 52 for (;;) { 53 oflags = thr->flags; 54 cpu_ccfence(); 55 nflags = (oflags | flags) & ~HAMMER2_THREAD_WAITING; 56 57 if (oflags & HAMMER2_THREAD_WAITING) { 58 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 59 wakeup(&thr->flags); 60 break; 61 } 62 } else { 63 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 64 break; 65 } 66 } 67 } 68 69 /* 70 * Set and clear flags and wakeup any waiters. 71 * 72 * WARNING! During teardown (thr) can disappear the instant our cmpset 73 * succeeds. 74 */ 75 void 76 hammer2_thr_signal2(hammer2_thread_t *thr, uint32_t posflags, uint32_t negflags) 77 { 78 uint32_t oflags; 79 uint32_t nflags; 80 81 for (;;) { 82 oflags = thr->flags; 83 cpu_ccfence(); 84 nflags = (oflags | posflags) & 85 ~(negflags | HAMMER2_THREAD_WAITING); 86 if (oflags & HAMMER2_THREAD_WAITING) { 87 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 88 wakeup(&thr->flags); 89 break; 90 } 91 } else { 92 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 93 break; 94 } 95 } 96 } 97 98 /* 99 * Wait until all the bits in flags are set. 100 * 101 * WARNING! During teardown (thr) can disappear the instant our cmpset 102 * succeeds. 103 */ 104 void 105 hammer2_thr_wait(hammer2_thread_t *thr, uint32_t flags) 106 { 107 uint32_t oflags; 108 uint32_t nflags; 109 110 for (;;) { 111 oflags = thr->flags; 112 cpu_ccfence(); 113 if ((oflags & flags) == flags) 114 break; 115 nflags = oflags | HAMMER2_THREAD_WAITING; 116 tsleep_interlock(&thr->flags, 0); 117 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 118 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 119 } 120 } 121 } 122 123 /* 124 * Wait until any of the bits in flags are set, with timeout. 125 * 126 * WARNING! During teardown (thr) can disappear the instant our cmpset 127 * succeeds. 128 */ 129 int 130 hammer2_thr_wait_any(hammer2_thread_t *thr, uint32_t flags, int timo) 131 { 132 uint32_t oflags; 133 uint32_t nflags; 134 int error; 135 136 error = 0; 137 for (;;) { 138 oflags = thr->flags; 139 cpu_ccfence(); 140 if (oflags & flags) 141 break; 142 nflags = oflags | HAMMER2_THREAD_WAITING; 143 tsleep_interlock(&thr->flags, 0); 144 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 145 error = tsleep(&thr->flags, PINTERLOCKED, 146 "h2twait", timo); 147 } 148 if (error == ETIMEDOUT) 149 break; 150 } 151 return error; 152 } 153 154 /* 155 * Wait until the bits in flags are clear. 156 * 157 * WARNING! During teardown (thr) can disappear the instant our cmpset 158 * succeeds. 159 */ 160 void 161 hammer2_thr_wait_neg(hammer2_thread_t *thr, uint32_t flags) 162 { 163 uint32_t oflags; 164 uint32_t nflags; 165 166 for (;;) { 167 oflags = thr->flags; 168 cpu_ccfence(); 169 if ((oflags & flags) == 0) 170 break; 171 nflags = oflags | HAMMER2_THREAD_WAITING; 172 tsleep_interlock(&thr->flags, 0); 173 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 174 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 175 } 176 } 177 } 178 179 /* 180 * Initialize the supplied thread structure, starting the specified 181 * thread. 182 * 183 * NOTE: thr structure can be retained across mounts and unmounts for this 184 * pmp, so make sure the flags are in a sane state. 185 */ 186 void 187 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp, 188 hammer2_dev_t *hmp, 189 const char *id, int clindex, int repidx, 190 void (*func)(void *arg)) 191 { 192 thr->pmp = pmp; /* xop helpers */ 193 thr->hmp = hmp; /* bulkfree */ 194 thr->clindex = clindex; 195 thr->repidx = repidx; 196 TAILQ_INIT(&thr->xopq); 197 atomic_clear_int(&thr->flags, HAMMER2_THREAD_STOP | 198 HAMMER2_THREAD_STOPPED | 199 HAMMER2_THREAD_FREEZE | 200 HAMMER2_THREAD_FROZEN); 201 if (thr->scratch == NULL) 202 thr->scratch = kmalloc(MAXPHYS, M_HAMMER2, M_WAITOK | M_ZERO); 203 if (repidx >= 0) { 204 lwkt_create(func, thr, &thr->td, NULL, 0, repidx % ncpus, 205 "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx); 206 } else if (pmp) { 207 lwkt_create(func, thr, &thr->td, NULL, 0, -1, 208 "%s-%s", id, pmp->pfs_names[clindex]); 209 } else { 210 lwkt_create(func, thr, &thr->td, NULL, 0, -1, "%s", id); 211 } 212 } 213 214 /* 215 * Terminate a thread. This function will silently return if the thread 216 * was never initialized or has already been deleted. 217 * 218 * This is accomplished by setting the STOP flag and waiting for the td 219 * structure to become NULL. 220 */ 221 void 222 hammer2_thr_delete(hammer2_thread_t *thr) 223 { 224 if (thr->td == NULL) 225 return; 226 hammer2_thr_signal(thr, HAMMER2_THREAD_STOP); 227 hammer2_thr_wait(thr, HAMMER2_THREAD_STOPPED); 228 thr->pmp = NULL; 229 if (thr->scratch) { 230 kfree(thr->scratch, M_HAMMER2); 231 thr->scratch = NULL; 232 } 233 KKASSERT(TAILQ_EMPTY(&thr->xopq)); 234 } 235 236 /* 237 * Asynchronous remaster request. Ask the synchronization thread to 238 * start over soon (as if it were frozen and unfrozen, but without waiting). 239 * The thread always recalculates mastership relationships when restarting. 240 */ 241 void 242 hammer2_thr_remaster(hammer2_thread_t *thr) 243 { 244 if (thr->td == NULL) 245 return; 246 hammer2_thr_signal(thr, HAMMER2_THREAD_REMASTER); 247 } 248 249 void 250 hammer2_thr_freeze_async(hammer2_thread_t *thr) 251 { 252 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 253 } 254 255 void 256 hammer2_thr_freeze(hammer2_thread_t *thr) 257 { 258 if (thr->td == NULL) 259 return; 260 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 261 hammer2_thr_wait(thr, HAMMER2_THREAD_FROZEN); 262 } 263 264 void 265 hammer2_thr_unfreeze(hammer2_thread_t *thr) 266 { 267 if (thr->td == NULL) 268 return; 269 hammer2_thr_signal(thr, HAMMER2_THREAD_UNFREEZE); 270 hammer2_thr_wait_neg(thr, HAMMER2_THREAD_FROZEN); 271 } 272 273 int 274 hammer2_thr_break(hammer2_thread_t *thr) 275 { 276 if (thr->flags & (HAMMER2_THREAD_STOP | 277 HAMMER2_THREAD_REMASTER | 278 HAMMER2_THREAD_FREEZE)) { 279 return 1; 280 } 281 return 0; 282 } 283 284 /**************************************************************************** 285 * HAMMER2 XOPS API * 286 ****************************************************************************/ 287 288 void 289 hammer2_xop_group_init(hammer2_pfs_t *pmp, hammer2_xop_group_t *xgrp) 290 { 291 /* no extra fields in structure at the moment */ 292 } 293 294 /* 295 * Allocate a XOP request. 296 * 297 * Once allocated a XOP request can be started, collected, and retired, 298 * and can be retired early if desired. 299 * 300 * NOTE: Fifo indices might not be zero but ri == wi on objcache_get(). 301 */ 302 void * 303 hammer2_xop_alloc(hammer2_inode_t *ip, int flags) 304 { 305 hammer2_xop_t *xop; 306 307 xop = objcache_get(cache_xops, M_WAITOK); 308 KKASSERT(xop->head.cluster.array[0].chain == NULL); 309 310 xop->head.ip1 = ip; 311 xop->head.func = NULL; 312 xop->head.flags = flags; 313 xop->head.state = 0; 314 xop->head.error = 0; 315 xop->head.collect_key = 0; 316 xop->head.check_counter = 0; 317 if (flags & HAMMER2_XOP_MODIFYING) 318 xop->head.mtid = hammer2_trans_sub(ip->pmp); 319 else 320 xop->head.mtid = 0; 321 322 xop->head.cluster.nchains = ip->cluster.nchains; 323 xop->head.cluster.pmp = ip->pmp; 324 xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED; 325 326 /* 327 * run_mask - Active thread (or frontend) associated with XOP 328 */ 329 xop->head.run_mask = HAMMER2_XOPMASK_VOP; 330 331 hammer2_inode_ref(ip); 332 333 return xop; 334 } 335 336 void 337 hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len) 338 { 339 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 340 xop->name1_len = name_len; 341 bcopy(name, xop->name1, name_len); 342 } 343 344 void 345 hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len) 346 { 347 xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 348 xop->name2_len = name_len; 349 bcopy(name, xop->name2, name_len); 350 } 351 352 size_t 353 hammer2_xop_setname_inum(hammer2_xop_head_t *xop, hammer2_key_t inum) 354 { 355 const size_t name_len = 18; 356 357 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 358 xop->name1_len = name_len; 359 ksnprintf(xop->name1, name_len + 1, "0x%016jx", (intmax_t)inum); 360 361 return name_len; 362 } 363 364 365 void 366 hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2) 367 { 368 xop->ip2 = ip2; 369 hammer2_inode_ref(ip2); 370 } 371 372 void 373 hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3) 374 { 375 xop->ip3 = ip3; 376 hammer2_inode_ref(ip3); 377 } 378 379 void 380 hammer2_xop_reinit(hammer2_xop_head_t *xop) 381 { 382 xop->state = 0; 383 xop->error = 0; 384 xop->collect_key = 0; 385 xop->run_mask = HAMMER2_XOPMASK_VOP; 386 } 387 388 /* 389 * A mounted PFS needs Xops threads to support frontend operations. 390 */ 391 void 392 hammer2_xop_helper_create(hammer2_pfs_t *pmp) 393 { 394 int i; 395 int j; 396 397 lockmgr(&pmp->lock, LK_EXCLUSIVE); 398 pmp->has_xop_threads = 1; 399 400 for (i = 0; i < pmp->iroot->cluster.nchains; ++i) { 401 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) { 402 if (pmp->xop_groups[j].thrs[i].td) 403 continue; 404 hammer2_thr_create(&pmp->xop_groups[j].thrs[i], 405 pmp, NULL, 406 "h2xop", i, j, 407 hammer2_primary_xops_thread); 408 } 409 } 410 lockmgr(&pmp->lock, LK_RELEASE); 411 } 412 413 void 414 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp) 415 { 416 int i; 417 int j; 418 419 for (i = 0; i < pmp->pfs_nmasters; ++i) { 420 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) { 421 if (pmp->xop_groups[j].thrs[i].td) 422 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]); 423 } 424 } 425 pmp->has_xop_threads = 0; 426 } 427 428 /* 429 * Start a XOP request, queueing it to all nodes in the cluster to 430 * execute the cluster op. 431 * 432 * XXX optimize single-target case. 433 */ 434 void 435 hammer2_xop_start_except(hammer2_xop_head_t *xop, hammer2_xop_func_t func, 436 int notidx) 437 { 438 hammer2_inode_t *ip1; 439 hammer2_pfs_t *pmp; 440 hammer2_thread_t *thr; 441 int i; 442 int ng; 443 int nchains; 444 445 ip1 = xop->ip1; 446 pmp = ip1->pmp; 447 if (pmp->has_xop_threads == 0) 448 hammer2_xop_helper_create(pmp); 449 450 /* 451 * The intent of the XOP sequencer is to ensure that ops on the same 452 * inode execute in the same order. This is necessary when issuing 453 * modifying operations to multiple targets because some targets might 454 * get behind and the frontend is allowed to complete the moment a 455 * quorum of targets succeed. 456 * 457 * Strategy operations must be segregated from non-strategy operations 458 * to avoid a deadlock. For example, if a vfsync and a bread/bwrite 459 * were queued to the same worker thread, the locked buffer in the 460 * strategy operation can deadlock the vfsync's buffer list scan. 461 * 462 * TODO - RENAME fails here because it is potentially modifying 463 * three different inodes. 464 */ 465 if (xop->flags & HAMMER2_XOP_STRATEGY) { 466 hammer2_xop_strategy_t *xopst; 467 468 xopst = &((hammer2_xop_t *)xop)->xop_strategy; 469 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1)) ^ 470 hammer2_icrc32(&xopst->lbase, sizeof(xopst->lbase))); 471 ng = ng & (HAMMER2_XOPGROUPS_MASK >> 1); 472 ng += HAMMER2_XOPGROUPS / 2; 473 } else { 474 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1))); 475 ng = ng & (HAMMER2_XOPGROUPS_MASK >> 1); 476 } 477 xop->func = func; 478 479 /* 480 * The instant xop is queued another thread can pick it off. In the 481 * case of asynchronous ops, another thread might even finish and 482 * deallocate it. 483 */ 484 hammer2_spin_ex(&pmp->xop_spin); 485 nchains = ip1->cluster.nchains; 486 for (i = 0; i < nchains; ++i) { 487 /* 488 * XXX ip1->cluster.array* not stable here. This temporary 489 * hack fixes basic issues in target XOPs which need to 490 * obtain a starting chain from the inode but does not 491 * address possible races against inode updates which 492 * might NULL-out a chain. 493 */ 494 if (i != notidx && ip1->cluster.array[i].chain) { 495 thr = &pmp->xop_groups[ng].thrs[i]; 496 atomic_set_int(&xop->run_mask, 1U << i); 497 atomic_set_int(&xop->chk_mask, 1U << i); 498 xop->collect[i].thr = thr; 499 TAILQ_INSERT_TAIL(&thr->xopq, xop, collect[i].entry); 500 } 501 } 502 hammer2_spin_unex(&pmp->xop_spin); 503 /* xop can become invalid at this point */ 504 505 /* 506 * Each thread has its own xopq 507 */ 508 for (i = 0; i < nchains; ++i) { 509 if (i != notidx) { 510 thr = &pmp->xop_groups[ng].thrs[i]; 511 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 512 } 513 } 514 } 515 516 void 517 hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_func_t func) 518 { 519 hammer2_xop_start_except(xop, func, -1); 520 } 521 522 /* 523 * Retire a XOP. Used by both the VOP frontend and by the XOP backend. 524 */ 525 void 526 hammer2_xop_retire(hammer2_xop_head_t *xop, uint32_t mask) 527 { 528 hammer2_chain_t *chain; 529 uint32_t nmask; 530 int i; 531 532 /* 533 * Remove the frontend collector or remove a backend feeder. 534 * When removing the frontend we must wakeup any backend feeders 535 * who are waiting for FIFO space. 536 * 537 * XXX optimize wakeup. 538 */ 539 KKASSERT(xop->run_mask & mask); 540 nmask = atomic_fetchadd_int(&xop->run_mask, -mask); 541 if ((nmask & ~HAMMER2_XOPMASK_FIFOW) != mask) { 542 if (mask == HAMMER2_XOPMASK_VOP) { 543 if (nmask & HAMMER2_XOPMASK_FIFOW) 544 wakeup(xop); 545 } 546 return; 547 } 548 /* else nobody else left, we can ignore FIFOW */ 549 550 /* 551 * All collectors are gone, we can cleanup and dispose of the XOP. 552 * Note that this can wind up being a frontend OR a backend. 553 * Pending chains are locked shared and not owned by any thread. 554 * 555 * Cleanup the collection cluster. 556 */ 557 for (i = 0; i < xop->cluster.nchains; ++i) { 558 xop->cluster.array[i].flags = 0; 559 chain = xop->cluster.array[i].chain; 560 if (chain) { 561 xop->cluster.array[i].chain = NULL; 562 hammer2_chain_drop_unhold(chain); 563 } 564 } 565 566 /* 567 * Cleanup the fifos, use check_counter to optimize the loop. 568 * Since we are the only entity left on this xop we don't have 569 * to worry about fifo flow control, and one lfence() will do the 570 * job. 571 */ 572 cpu_lfence(); 573 mask = xop->chk_mask; 574 for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) { 575 hammer2_xop_fifo_t *fifo = &xop->collect[i]; 576 while (fifo->ri != fifo->wi) { 577 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 578 if (chain) 579 hammer2_chain_drop_unhold(chain); 580 ++fifo->ri; 581 } 582 mask &= ~(1U << i); 583 } 584 585 /* 586 * The inode is only held at this point, simply drop it. 587 */ 588 if (xop->ip1) { 589 hammer2_inode_drop(xop->ip1); 590 xop->ip1 = NULL; 591 } 592 if (xop->ip2) { 593 hammer2_inode_drop(xop->ip2); 594 xop->ip2 = NULL; 595 } 596 if (xop->ip3) { 597 hammer2_inode_drop(xop->ip3); 598 xop->ip3 = NULL; 599 } 600 if (xop->name1) { 601 kfree(xop->name1, M_HAMMER2); 602 xop->name1 = NULL; 603 xop->name1_len = 0; 604 } 605 if (xop->name2) { 606 kfree(xop->name2, M_HAMMER2); 607 xop->name2 = NULL; 608 xop->name2_len = 0; 609 } 610 611 objcache_put(cache_xops, xop); 612 } 613 614 /* 615 * (Backend) Returns non-zero if the frontend is still attached. 616 */ 617 int 618 hammer2_xop_active(hammer2_xop_head_t *xop) 619 { 620 if (xop->run_mask & HAMMER2_XOPMASK_VOP) 621 return 1; 622 else 623 return 0; 624 } 625 626 /* 627 * (Backend) Feed chain data through the cluster validator and back to 628 * the frontend. Chains are fed from multiple nodes concurrently 629 * and pipelined via per-node FIFOs in the XOP. 630 * 631 * The chain must be locked (either shared or exclusive). The caller may 632 * unlock and drop the chain on return. This function will add an extra 633 * ref and hold the chain's data for the pass-back. 634 * 635 * No xop lock is needed because we are only manipulating fields under 636 * our direct control. 637 * 638 * Returns 0 on success and a hammer error code if sync is permanently 639 * lost. The caller retains a ref on the chain but by convention 640 * the lock is typically inherited by the xop (caller loses lock). 641 * 642 * Returns non-zero on error. In this situation the caller retains a 643 * ref on the chain but loses the lock (we unlock here). 644 */ 645 int 646 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain, 647 int clindex, int error) 648 { 649 hammer2_xop_fifo_t *fifo; 650 uint32_t mask; 651 652 /* 653 * Early termination (typicaly of xop_readir) 654 */ 655 if (hammer2_xop_active(xop) == 0) { 656 error = EINTR; 657 goto done; 658 } 659 660 /* 661 * Multi-threaded entry into the XOP collector. We own the 662 * fifo->wi for our clindex. 663 */ 664 fifo = &xop->collect[clindex]; 665 666 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) 667 lwkt_yield(); 668 while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 669 atomic_set_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 670 mask = xop->run_mask; 671 if ((mask & HAMMER2_XOPMASK_VOP) == 0) { 672 error = EINTR; 673 goto done; 674 } 675 tsleep_interlock(xop, 0); 676 if (atomic_cmpset_int(&xop->run_mask, mask, 677 mask | HAMMER2_XOPMASK_FIFOW)) { 678 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 679 tsleep(xop, PINTERLOCKED, "h2feed", hz*60); 680 } 681 } 682 /* retry */ 683 } 684 atomic_clear_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 685 if (chain) 686 hammer2_chain_ref_hold(chain); 687 if (error == 0 && chain) 688 error = chain->error; 689 fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error; 690 fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain; 691 cpu_sfence(); 692 ++fifo->wi; 693 if (atomic_fetchadd_int(&xop->check_counter, HAMMER2_XOP_CHKINC) & 694 HAMMER2_XOP_CHKWAIT) { 695 atomic_clear_int(&xop->check_counter, HAMMER2_XOP_CHKWAIT); 696 wakeup(&xop->check_counter); 697 } 698 error = 0; 699 700 /* 701 * Cleanup. If an error occurred we eat the lock. If no error 702 * occurred the fifo inherits the lock and gains an additional ref. 703 * 704 * The caller's ref remains in both cases. 705 */ 706 done: 707 return error; 708 } 709 710 /* 711 * (Frontend) collect a response from a running cluster op. 712 * 713 * Responses are fed from all appropriate nodes concurrently 714 * and collected into a cohesive response >= collect_key. 715 * 716 * The collector will return the instant quorum or other requirements 717 * are met, even if some nodes get behind or become non-responsive. 718 * 719 * HAMMER2_XOP_COLLECT_NOWAIT - Used to 'poll' a completed collection, 720 * usually called synchronously from the 721 * node XOPs for the strategy code to 722 * fake the frontend collection and complete 723 * the BIO as soon as possible. 724 * 725 * HAMMER2_XOP_SYNCHRONIZER - Reqeuest synchronization with a particular 726 * cluster index, prevents looping when that 727 * index is out of sync so caller can act on 728 * the out of sync element. ESRCH and EDEADLK 729 * can be returned if this flag is specified. 730 * 731 * Returns 0 on success plus a filled out xop->cluster structure. 732 * Return ENOENT on normal termination. 733 * Otherwise return an error. 734 */ 735 int 736 hammer2_xop_collect(hammer2_xop_head_t *xop, int flags) 737 { 738 hammer2_xop_fifo_t *fifo; 739 hammer2_chain_t *chain; 740 hammer2_key_t lokey; 741 int error; 742 int keynull; 743 int adv; /* advance the element */ 744 int i; 745 uint32_t check_counter; 746 747 loop: 748 /* 749 * First loop tries to advance pieces of the cluster which 750 * are out of sync. 751 */ 752 lokey = HAMMER2_KEY_MAX; 753 keynull = HAMMER2_CHECK_NULL; 754 check_counter = xop->check_counter; 755 cpu_lfence(); 756 757 for (i = 0; i < xop->cluster.nchains; ++i) { 758 chain = xop->cluster.array[i].chain; 759 if (chain == NULL) { 760 adv = 1; 761 } else if (chain->bref.key < xop->collect_key) { 762 adv = 1; 763 } else { 764 keynull &= ~HAMMER2_CHECK_NULL; 765 if (lokey > chain->bref.key) 766 lokey = chain->bref.key; 767 adv = 0; 768 } 769 if (adv == 0) 770 continue; 771 772 /* 773 * Advance element if possible, advanced element may be NULL. 774 */ 775 if (chain) 776 hammer2_chain_drop_unhold(chain); 777 778 fifo = &xop->collect[i]; 779 if (fifo->ri != fifo->wi) { 780 cpu_lfence(); 781 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 782 error = fifo->errors[fifo->ri & HAMMER2_XOPFIFO_MASK]; 783 ++fifo->ri; 784 xop->cluster.array[i].chain = chain; 785 xop->cluster.array[i].error = error; 786 if (chain == NULL) { 787 /* XXX */ 788 xop->cluster.array[i].flags |= 789 HAMMER2_CITEM_NULL; 790 } 791 if (fifo->wi - fifo->ri <= HAMMER2_XOPFIFO / 2) { 792 if (fifo->flags & HAMMER2_XOP_FIFO_STALL) { 793 atomic_clear_int(&fifo->flags, 794 HAMMER2_XOP_FIFO_STALL); 795 wakeup(xop); 796 lwkt_yield(); 797 } 798 } 799 --i; /* loop on same index */ 800 } else { 801 /* 802 * Retain CITEM_NULL flag. If set just repeat EOF. 803 * If not, the NULL,0 combination indicates an 804 * operation in-progress. 805 */ 806 xop->cluster.array[i].chain = NULL; 807 /* retain any CITEM_NULL setting */ 808 } 809 } 810 811 /* 812 * Determine whether the lowest collected key meets clustering 813 * requirements. Returns: 814 * 815 * 0 - key valid, cluster can be returned. 816 * 817 * ENOENT - normal end of scan, return ENOENT. 818 * 819 * ESRCH - sufficient elements collected, quorum agreement 820 * that lokey is not a valid element and should be 821 * skipped. 822 * 823 * EDEADLK - sufficient elements collected, no quorum agreement 824 * (and no agreement possible). In this situation a 825 * repair is needed, for now we loop. 826 * 827 * EINPROGRESS - insufficient elements collected to resolve, wait 828 * for event and loop. 829 */ 830 if ((flags & HAMMER2_XOP_COLLECT_WAITALL) && 831 xop->run_mask != HAMMER2_XOPMASK_VOP) { 832 error = EINPROGRESS; 833 } else { 834 error = hammer2_cluster_check(&xop->cluster, lokey, keynull); 835 } 836 if (error == EINPROGRESS) { 837 if ((flags & HAMMER2_XOP_COLLECT_NOWAIT) == 0) 838 tsleep_interlock(&xop->check_counter, 0); 839 if (atomic_cmpset_int(&xop->check_counter, 840 check_counter, 841 check_counter | HAMMER2_XOP_CHKWAIT)) { 842 if (flags & HAMMER2_XOP_COLLECT_NOWAIT) 843 goto done; 844 tsleep(&xop->check_counter, PINTERLOCKED, "h2coll", hz*60); 845 } 846 goto loop; 847 } 848 if (error == ESRCH) { 849 if (lokey != HAMMER2_KEY_MAX) { 850 xop->collect_key = lokey + 1; 851 goto loop; 852 } 853 error = ENOENT; 854 } 855 if (error == EDEADLK) { 856 kprintf("hammer2: no quorum possible lokey %016jx\n", 857 lokey); 858 if (lokey != HAMMER2_KEY_MAX) { 859 xop->collect_key = lokey + 1; 860 goto loop; 861 } 862 error = ENOENT; 863 } 864 if (lokey == HAMMER2_KEY_MAX) 865 xop->collect_key = lokey; 866 else 867 xop->collect_key = lokey + 1; 868 done: 869 return error; 870 } 871 872 /* 873 * N x M processing threads are available to handle XOPs, N per cluster 874 * index x M cluster nodes. 875 * 876 * Locate and return the next runnable xop, or NULL if no xops are 877 * present or none of the xops are currently runnable (for various reasons). 878 * The xop is left on the queue and serves to block other dependent xops 879 * from being run. 880 * 881 * Dependent xops will not be returned. 882 * 883 * Sets HAMMER2_XOP_FIFO_RUN on the returned xop or returns NULL. 884 * 885 * NOTE! Xops run concurrently for each cluster index. 886 */ 887 #define XOP_HASH_SIZE 16 888 #define XOP_HASH_MASK (XOP_HASH_SIZE - 1) 889 890 static __inline 891 int 892 xop_testhash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 893 { 894 uint32_t mask; 895 int hv; 896 897 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 898 mask = 1U << (hv & 31); 899 hv >>= 5; 900 901 return ((int)(hash[hv & XOP_HASH_MASK] & mask)); 902 } 903 904 static __inline 905 void 906 xop_sethash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 907 { 908 uint32_t mask; 909 int hv; 910 911 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 912 mask = 1U << (hv & 31); 913 hv >>= 5; 914 915 hash[hv & XOP_HASH_MASK] |= mask; 916 } 917 918 static 919 hammer2_xop_head_t * 920 hammer2_xop_next(hammer2_thread_t *thr) 921 { 922 hammer2_pfs_t *pmp = thr->pmp; 923 int clindex = thr->clindex; 924 uint32_t hash[XOP_HASH_SIZE] = { 0 }; 925 hammer2_xop_head_t *xop; 926 927 hammer2_spin_ex(&pmp->xop_spin); 928 TAILQ_FOREACH(xop, &thr->xopq, collect[clindex].entry) { 929 /* 930 * Check dependency 931 */ 932 if (xop_testhash(thr, xop->ip1, hash) || 933 (xop->ip2 && xop_testhash(thr, xop->ip2, hash)) || 934 (xop->ip3 && xop_testhash(thr, xop->ip3, hash))) { 935 continue; 936 } 937 xop_sethash(thr, xop->ip1, hash); 938 if (xop->ip2) 939 xop_sethash(thr, xop->ip2, hash); 940 if (xop->ip3) 941 xop_sethash(thr, xop->ip3, hash); 942 943 /* 944 * Check already running 945 */ 946 if (xop->collect[clindex].flags & HAMMER2_XOP_FIFO_RUN) 947 continue; 948 949 /* 950 * Found a good one, return it. 951 */ 952 atomic_set_int(&xop->collect[clindex].flags, 953 HAMMER2_XOP_FIFO_RUN); 954 break; 955 } 956 hammer2_spin_unex(&pmp->xop_spin); 957 958 return xop; 959 } 960 961 /* 962 * Remove the completed XOP from the queue, clear HAMMER2_XOP_FIFO_RUN. 963 * 964 * NOTE! Xops run concurrently for each cluster index. 965 */ 966 static 967 void 968 hammer2_xop_dequeue(hammer2_thread_t *thr, hammer2_xop_head_t *xop) 969 { 970 hammer2_pfs_t *pmp = thr->pmp; 971 int clindex = thr->clindex; 972 973 hammer2_spin_ex(&pmp->xop_spin); 974 TAILQ_REMOVE(&thr->xopq, xop, collect[clindex].entry); 975 atomic_clear_int(&xop->collect[clindex].flags, 976 HAMMER2_XOP_FIFO_RUN); 977 hammer2_spin_unex(&pmp->xop_spin); 978 if (TAILQ_FIRST(&thr->xopq)) 979 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 980 } 981 982 /* 983 * Primary management thread for xops support. Each node has several such 984 * threads which replicate front-end operations on cluster nodes. 985 * 986 * XOPS thread node operations, allowing the function to focus on a single 987 * node in the cluster after validating the operation with the cluster. 988 * This is primarily what prevents dead or stalled nodes from stalling 989 * the front-end. 990 */ 991 void 992 hammer2_primary_xops_thread(void *arg) 993 { 994 hammer2_thread_t *thr = arg; 995 hammer2_pfs_t *pmp; 996 hammer2_xop_head_t *xop; 997 uint32_t mask; 998 uint32_t flags; 999 uint32_t nflags; 1000 hammer2_xop_func_t last_func = NULL; 1001 1002 pmp = thr->pmp; 1003 /*xgrp = &pmp->xop_groups[thr->repidx]; not needed */ 1004 mask = 1U << thr->clindex; 1005 1006 for (;;) { 1007 flags = thr->flags; 1008 1009 /* 1010 * Handle stop request 1011 */ 1012 if (flags & HAMMER2_THREAD_STOP) 1013 break; 1014 1015 /* 1016 * Handle freeze request 1017 */ 1018 if (flags & HAMMER2_THREAD_FREEZE) { 1019 hammer2_thr_signal2(thr, HAMMER2_THREAD_FROZEN, 1020 HAMMER2_THREAD_FREEZE); 1021 continue; 1022 } 1023 1024 if (flags & HAMMER2_THREAD_UNFREEZE) { 1025 hammer2_thr_signal2(thr, 0, 1026 HAMMER2_THREAD_FROZEN | 1027 HAMMER2_THREAD_UNFREEZE); 1028 continue; 1029 } 1030 1031 /* 1032 * Force idle if frozen until unfrozen or stopped. 1033 */ 1034 if (flags & HAMMER2_THREAD_FROZEN) { 1035 hammer2_thr_wait_any(thr, 1036 HAMMER2_THREAD_UNFREEZE | 1037 HAMMER2_THREAD_STOP, 1038 0); 1039 continue; 1040 } 1041 1042 /* 1043 * Reset state on REMASTER request 1044 */ 1045 if (flags & HAMMER2_THREAD_REMASTER) { 1046 hammer2_thr_signal2(thr, 0, HAMMER2_THREAD_REMASTER); 1047 /* reset state here */ 1048 continue; 1049 } 1050 1051 /* 1052 * Process requests. Each request can be multi-queued. 1053 * 1054 * If we get behind and the frontend VOP is no longer active, 1055 * we retire the request without processing it. The callback 1056 * may also abort processing if the frontend VOP becomes 1057 * inactive. 1058 */ 1059 if (flags & HAMMER2_THREAD_XOPQ) { 1060 nflags = flags & ~HAMMER2_THREAD_XOPQ; 1061 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 1062 continue; 1063 flags = nflags; 1064 /* fall through */ 1065 } 1066 while ((xop = hammer2_xop_next(thr)) != NULL) { 1067 if (hammer2_xop_active(xop)) { 1068 last_func = xop->func; 1069 xop->func(thr, (hammer2_xop_t *)xop); 1070 hammer2_xop_dequeue(thr, xop); 1071 hammer2_xop_retire(xop, mask); 1072 } else { 1073 last_func = xop->func; 1074 hammer2_xop_feed(xop, NULL, thr->clindex, 1075 ECONNABORTED); 1076 hammer2_xop_dequeue(thr, xop); 1077 hammer2_xop_retire(xop, mask); 1078 } 1079 } 1080 1081 /* 1082 * Wait for event, interlock using THREAD_WAITING and 1083 * THREAD_SIGNAL. 1084 * 1085 * For robustness poll on a 30-second interval, but nominally 1086 * expect to be woken up. 1087 */ 1088 nflags = flags | HAMMER2_THREAD_WAITING; 1089 1090 tsleep_interlock(&thr->flags, 0); 1091 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1092 tsleep(&thr->flags, PINTERLOCKED, "h2idle", hz*30); 1093 } 1094 } 1095 1096 #if 0 1097 /* 1098 * Cleanup / termination 1099 */ 1100 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) { 1101 kprintf("hammer2_thread: aborting xop %p\n", xop->func); 1102 TAILQ_REMOVE(&thr->xopq, xop, 1103 collect[thr->clindex].entry); 1104 hammer2_xop_retire(xop, mask); 1105 } 1106 #endif 1107 thr->td = NULL; 1108 hammer2_thr_signal(thr, HAMMER2_THREAD_STOPPED); 1109 /* thr structure can go invalid after this point */ 1110 } 1111