1 /* 2 * Copyright (c) 2015-2018 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 /* 35 * This module implements the hammer2 helper thread API, including 36 * the frontend/backend XOP API. 37 */ 38 #include "hammer2.h" 39 40 /* 41 * Set flags and wakeup any waiters. 42 * 43 * WARNING! During teardown (thr) can disappear the instant our cmpset 44 * succeeds. 45 */ 46 void 47 hammer2_thr_signal(hammer2_thread_t *thr, uint32_t flags) 48 { 49 uint32_t oflags; 50 uint32_t nflags; 51 52 for (;;) { 53 oflags = thr->flags; 54 cpu_ccfence(); 55 nflags = (oflags | flags) & ~HAMMER2_THREAD_WAITING; 56 57 if (oflags & HAMMER2_THREAD_WAITING) { 58 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 59 wakeup(&thr->flags); 60 break; 61 } 62 } else { 63 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 64 break; 65 } 66 } 67 } 68 69 /* 70 * Set and clear flags and wakeup any waiters. 71 * 72 * WARNING! During teardown (thr) can disappear the instant our cmpset 73 * succeeds. 74 */ 75 void 76 hammer2_thr_signal2(hammer2_thread_t *thr, uint32_t posflags, uint32_t negflags) 77 { 78 uint32_t oflags; 79 uint32_t nflags; 80 81 for (;;) { 82 oflags = thr->flags; 83 cpu_ccfence(); 84 nflags = (oflags | posflags) & 85 ~(negflags | HAMMER2_THREAD_WAITING); 86 if (oflags & HAMMER2_THREAD_WAITING) { 87 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 88 wakeup(&thr->flags); 89 break; 90 } 91 } else { 92 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 93 break; 94 } 95 } 96 } 97 98 /* 99 * Wait until all the bits in flags are set. 100 * 101 * WARNING! During teardown (thr) can disappear the instant our cmpset 102 * succeeds. 103 */ 104 void 105 hammer2_thr_wait(hammer2_thread_t *thr, uint32_t flags) 106 { 107 uint32_t oflags; 108 uint32_t nflags; 109 110 for (;;) { 111 oflags = thr->flags; 112 cpu_ccfence(); 113 if ((oflags & flags) == flags) 114 break; 115 nflags = oflags | HAMMER2_THREAD_WAITING; 116 tsleep_interlock(&thr->flags, 0); 117 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 118 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 119 } 120 } 121 } 122 123 /* 124 * Wait until any of the bits in flags are set, with timeout. 125 * 126 * WARNING! During teardown (thr) can disappear the instant our cmpset 127 * succeeds. 128 */ 129 int 130 hammer2_thr_wait_any(hammer2_thread_t *thr, uint32_t flags, int timo) 131 { 132 uint32_t oflags; 133 uint32_t nflags; 134 int error; 135 136 error = 0; 137 for (;;) { 138 oflags = thr->flags; 139 cpu_ccfence(); 140 if (oflags & flags) 141 break; 142 nflags = oflags | HAMMER2_THREAD_WAITING; 143 tsleep_interlock(&thr->flags, 0); 144 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 145 error = tsleep(&thr->flags, PINTERLOCKED, 146 "h2twait", timo); 147 } 148 if (error == ETIMEDOUT) { 149 error = HAMMER2_ERROR_ETIMEDOUT; 150 break; 151 } 152 } 153 return error; 154 } 155 156 /* 157 * Wait until the bits in flags are clear. 158 * 159 * WARNING! During teardown (thr) can disappear the instant our cmpset 160 * succeeds. 161 */ 162 void 163 hammer2_thr_wait_neg(hammer2_thread_t *thr, uint32_t flags) 164 { 165 uint32_t oflags; 166 uint32_t nflags; 167 168 for (;;) { 169 oflags = thr->flags; 170 cpu_ccfence(); 171 if ((oflags & flags) == 0) 172 break; 173 nflags = oflags | HAMMER2_THREAD_WAITING; 174 tsleep_interlock(&thr->flags, 0); 175 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 176 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 177 } 178 } 179 } 180 181 /* 182 * Initialize the supplied thread structure, starting the specified 183 * thread. 184 * 185 * NOTE: thr structure can be retained across mounts and unmounts for this 186 * pmp, so make sure the flags are in a sane state. 187 */ 188 void 189 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp, 190 hammer2_dev_t *hmp, 191 const char *id, int clindex, int repidx, 192 void (*func)(void *arg)) 193 { 194 thr->pmp = pmp; /* xop helpers */ 195 thr->hmp = hmp; /* bulkfree */ 196 thr->clindex = clindex; 197 thr->repidx = repidx; 198 TAILQ_INIT(&thr->xopq); 199 atomic_clear_int(&thr->flags, HAMMER2_THREAD_STOP | 200 HAMMER2_THREAD_STOPPED | 201 HAMMER2_THREAD_FREEZE | 202 HAMMER2_THREAD_FROZEN); 203 if (thr->scratch == NULL) 204 thr->scratch = kmalloc(MAXPHYS, M_HAMMER2, M_WAITOK | M_ZERO); 205 if (repidx >= 0) { 206 lwkt_create(func, thr, &thr->td, NULL, 0, repidx % ncpus, 207 "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx); 208 } else if (pmp) { 209 lwkt_create(func, thr, &thr->td, NULL, 0, -1, 210 "%s-%s", id, pmp->pfs_names[clindex]); 211 } else { 212 lwkt_create(func, thr, &thr->td, NULL, 0, -1, "%s", id); 213 } 214 } 215 216 /* 217 * Terminate a thread. This function will silently return if the thread 218 * was never initialized or has already been deleted. 219 * 220 * This is accomplished by setting the STOP flag and waiting for the td 221 * structure to become NULL. 222 */ 223 void 224 hammer2_thr_delete(hammer2_thread_t *thr) 225 { 226 if (thr->td == NULL) 227 return; 228 hammer2_thr_signal(thr, HAMMER2_THREAD_STOP); 229 hammer2_thr_wait(thr, HAMMER2_THREAD_STOPPED); 230 thr->pmp = NULL; 231 if (thr->scratch) { 232 kfree(thr->scratch, M_HAMMER2); 233 thr->scratch = NULL; 234 } 235 KKASSERT(TAILQ_EMPTY(&thr->xopq)); 236 } 237 238 /* 239 * Asynchronous remaster request. Ask the synchronization thread to 240 * start over soon (as if it were frozen and unfrozen, but without waiting). 241 * The thread always recalculates mastership relationships when restarting. 242 */ 243 void 244 hammer2_thr_remaster(hammer2_thread_t *thr) 245 { 246 if (thr->td == NULL) 247 return; 248 hammer2_thr_signal(thr, HAMMER2_THREAD_REMASTER); 249 } 250 251 void 252 hammer2_thr_freeze_async(hammer2_thread_t *thr) 253 { 254 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 255 } 256 257 void 258 hammer2_thr_freeze(hammer2_thread_t *thr) 259 { 260 if (thr->td == NULL) 261 return; 262 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 263 hammer2_thr_wait(thr, HAMMER2_THREAD_FROZEN); 264 } 265 266 void 267 hammer2_thr_unfreeze(hammer2_thread_t *thr) 268 { 269 if (thr->td == NULL) 270 return; 271 hammer2_thr_signal(thr, HAMMER2_THREAD_UNFREEZE); 272 hammer2_thr_wait_neg(thr, HAMMER2_THREAD_FROZEN); 273 } 274 275 int 276 hammer2_thr_break(hammer2_thread_t *thr) 277 { 278 if (thr->flags & (HAMMER2_THREAD_STOP | 279 HAMMER2_THREAD_REMASTER | 280 HAMMER2_THREAD_FREEZE)) { 281 return 1; 282 } 283 return 0; 284 } 285 286 /**************************************************************************** 287 * HAMMER2 XOPS API * 288 ****************************************************************************/ 289 290 void 291 hammer2_xop_group_init(hammer2_pfs_t *pmp, hammer2_xop_group_t *xgrp) 292 { 293 /* no extra fields in structure at the moment */ 294 } 295 296 /* 297 * Allocate a XOP request. 298 * 299 * Once allocated a XOP request can be started, collected, and retired, 300 * and can be retired early if desired. 301 * 302 * NOTE: Fifo indices might not be zero but ri == wi on objcache_get(). 303 */ 304 void * 305 hammer2_xop_alloc(hammer2_inode_t *ip, int flags) 306 { 307 hammer2_xop_t *xop; 308 309 xop = objcache_get(cache_xops, M_WAITOK); 310 KKASSERT(xop->head.cluster.array[0].chain == NULL); 311 312 xop->head.ip1 = ip; 313 xop->head.func = NULL; 314 xop->head.flags = flags; 315 xop->head.state = 0; 316 xop->head.error = 0; 317 xop->head.collect_key = 0; 318 xop->head.focus_dio = NULL; 319 320 if (flags & HAMMER2_XOP_MODIFYING) 321 xop->head.mtid = hammer2_trans_sub(ip->pmp); 322 else 323 xop->head.mtid = 0; 324 325 xop->head.cluster.nchains = ip->cluster.nchains; 326 xop->head.cluster.pmp = ip->pmp; 327 xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED; 328 329 /* 330 * run_mask - Active thread (or frontend) associated with XOP 331 */ 332 xop->head.run_mask = HAMMER2_XOPMASK_VOP; 333 334 hammer2_inode_ref(ip); 335 336 return xop; 337 } 338 339 void 340 hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len) 341 { 342 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 343 xop->name1_len = name_len; 344 bcopy(name, xop->name1, name_len); 345 } 346 347 void 348 hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len) 349 { 350 xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 351 xop->name2_len = name_len; 352 bcopy(name, xop->name2, name_len); 353 } 354 355 size_t 356 hammer2_xop_setname_inum(hammer2_xop_head_t *xop, hammer2_key_t inum) 357 { 358 const size_t name_len = 18; 359 360 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 361 xop->name1_len = name_len; 362 ksnprintf(xop->name1, name_len + 1, "0x%016jx", (intmax_t)inum); 363 364 return name_len; 365 } 366 367 368 void 369 hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2) 370 { 371 xop->ip2 = ip2; 372 hammer2_inode_ref(ip2); 373 } 374 375 void 376 hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3) 377 { 378 xop->ip3 = ip3; 379 hammer2_inode_ref(ip3); 380 } 381 382 void 383 hammer2_xop_reinit(hammer2_xop_head_t *xop) 384 { 385 xop->state = 0; 386 xop->error = 0; 387 xop->collect_key = 0; 388 xop->run_mask = HAMMER2_XOPMASK_VOP; 389 } 390 391 /* 392 * A mounted PFS needs Xops threads to support frontend operations. 393 */ 394 void 395 hammer2_xop_helper_create(hammer2_pfs_t *pmp) 396 { 397 int i; 398 int j; 399 400 lockmgr(&pmp->lock, LK_EXCLUSIVE); 401 pmp->has_xop_threads = 1; 402 403 for (i = 0; i < pmp->iroot->cluster.nchains; ++i) { 404 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) { 405 if (pmp->xop_groups[j].thrs[i].td) 406 continue; 407 hammer2_thr_create(&pmp->xop_groups[j].thrs[i], 408 pmp, NULL, 409 "h2xop", i, j, 410 hammer2_primary_xops_thread); 411 } 412 } 413 lockmgr(&pmp->lock, LK_RELEASE); 414 } 415 416 void 417 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp) 418 { 419 int i; 420 int j; 421 422 for (i = 0; i < pmp->pfs_nmasters; ++i) { 423 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) { 424 if (pmp->xop_groups[j].thrs[i].td) 425 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]); 426 } 427 } 428 pmp->has_xop_threads = 0; 429 } 430 431 /* 432 * Start a XOP request, queueing it to all nodes in the cluster to 433 * execute the cluster op. 434 * 435 * XXX optimize single-target case. 436 */ 437 void 438 hammer2_xop_start_except(hammer2_xop_head_t *xop, hammer2_xop_func_t func, 439 int notidx) 440 { 441 hammer2_inode_t *ip1; 442 hammer2_pfs_t *pmp; 443 hammer2_thread_t *thr; 444 int i; 445 int ng; 446 int nchains; 447 448 ip1 = xop->ip1; 449 pmp = ip1->pmp; 450 if (pmp->has_xop_threads == 0) 451 hammer2_xop_helper_create(pmp); 452 453 /* 454 * The intent of the XOP sequencer is to ensure that ops on the same 455 * inode execute in the same order. This is necessary when issuing 456 * modifying operations to multiple targets because some targets might 457 * get behind and the frontend is allowed to complete the moment a 458 * quorum of targets succeed. 459 * 460 * Strategy operations must be segregated from non-strategy operations 461 * to avoid a deadlock. For example, if a vfsync and a bread/bwrite 462 * were queued to the same worker thread, the locked buffer in the 463 * strategy operation can deadlock the vfsync's buffer list scan. 464 * 465 * TODO - RENAME fails here because it is potentially modifying 466 * three different inodes. 467 */ 468 if (xop->flags & HAMMER2_XOP_STRATEGY) { 469 hammer2_xop_strategy_t *xopst; 470 471 xopst = &((hammer2_xop_t *)xop)->xop_strategy; 472 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1)) ^ 473 hammer2_icrc32(&xopst->lbase, sizeof(xopst->lbase))); 474 ng = ng & (HAMMER2_XOPGROUPS_MASK >> 1); 475 ng += HAMMER2_XOPGROUPS / 2; 476 } else { 477 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1))); 478 ng = ng & (HAMMER2_XOPGROUPS_MASK >> 1); 479 } 480 xop->func = func; 481 482 /* 483 * The instant xop is queued another thread can pick it off. In the 484 * case of asynchronous ops, another thread might even finish and 485 * deallocate it. 486 */ 487 hammer2_spin_ex(&pmp->xop_spin); 488 nchains = ip1->cluster.nchains; 489 for (i = 0; i < nchains; ++i) { 490 /* 491 * XXX ip1->cluster.array* not stable here. This temporary 492 * hack fixes basic issues in target XOPs which need to 493 * obtain a starting chain from the inode but does not 494 * address possible races against inode updates which 495 * might NULL-out a chain. 496 */ 497 if (i != notidx && ip1->cluster.array[i].chain) { 498 thr = &pmp->xop_groups[ng].thrs[i]; 499 atomic_set_64(&xop->run_mask, 1LLU << i); 500 atomic_set_64(&xop->chk_mask, 1LLU << i); 501 xop->collect[i].thr = thr; 502 TAILQ_INSERT_TAIL(&thr->xopq, xop, collect[i].entry); 503 } 504 } 505 hammer2_spin_unex(&pmp->xop_spin); 506 /* xop can become invalid at this point */ 507 508 /* 509 * Each thread has its own xopq 510 */ 511 for (i = 0; i < nchains; ++i) { 512 if (i != notidx) { 513 thr = &pmp->xop_groups[ng].thrs[i]; 514 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 515 } 516 } 517 } 518 519 void 520 hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_func_t func) 521 { 522 hammer2_xop_start_except(xop, func, -1); 523 } 524 525 /* 526 * Retire a XOP. Used by both the VOP frontend and by the XOP backend. 527 */ 528 void 529 hammer2_xop_retire(hammer2_xop_head_t *xop, uint64_t mask) 530 { 531 hammer2_chain_t *chain; 532 uint64_t nmask; 533 int i; 534 535 /* 536 * Remove the frontend collector or remove a backend feeder. 537 * 538 * When removing the frontend we must wakeup any backend feeders 539 * who are waiting for FIFO space. 540 * 541 * When removing the last backend feeder we must wakeup any waiting 542 * frontend. 543 */ 544 KKASSERT(xop->run_mask & mask); 545 nmask = atomic_fetchadd_64(&xop->run_mask, 546 -mask + HAMMER2_XOPMASK_FEED); 547 548 /* 549 * More than one entity left 550 */ 551 if ((nmask & HAMMER2_XOPMASK_ALLDONE) != mask) { 552 /* 553 * Frontend terminating, wakeup any backends waiting on 554 * fifo full. 555 * 556 * NOTE!!! The xop can get ripped out from under us at 557 * this point, so do not reference it again. 558 * The wakeup(xop) doesn't touch the xop and 559 * is ok. 560 */ 561 if (mask == HAMMER2_XOPMASK_VOP) { 562 if (nmask & HAMMER2_XOPMASK_FIFOW) 563 wakeup(xop); 564 } 565 566 /* 567 * Wakeup frontend if the last backend is terminating. 568 */ 569 nmask -= mask; 570 if ((nmask & HAMMER2_XOPMASK_ALLDONE) == HAMMER2_XOPMASK_VOP) { 571 if (nmask & HAMMER2_XOPMASK_WAIT) 572 wakeup(xop); 573 } 574 575 return; 576 } 577 /* else nobody else left, we can ignore FIFOW */ 578 579 /* 580 * All collectors are gone, we can cleanup and dispose of the XOP. 581 * Note that this can wind up being a frontend OR a backend. 582 * Pending chains are locked shared and not owned by any thread. 583 * 584 * Cleanup the collection cluster. 585 */ 586 for (i = 0; i < xop->cluster.nchains; ++i) { 587 xop->cluster.array[i].flags = 0; 588 chain = xop->cluster.array[i].chain; 589 if (chain) { 590 xop->cluster.array[i].chain = NULL; 591 hammer2_chain_drop_unhold(chain); 592 } 593 } 594 595 /* 596 * Cleanup the fifos. Since we are the only entity left on this 597 * xop we don't have to worry about fifo flow control, and one 598 * lfence() will do the job. 599 */ 600 cpu_lfence(); 601 mask = xop->chk_mask; 602 for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) { 603 hammer2_xop_fifo_t *fifo = &xop->collect[i]; 604 while (fifo->ri != fifo->wi) { 605 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 606 if (chain) 607 hammer2_chain_drop_unhold(chain); 608 ++fifo->ri; 609 } 610 mask &= ~(1U << i); 611 } 612 613 /* 614 * The inode is only held at this point, simply drop it. 615 */ 616 if (xop->ip1) { 617 hammer2_inode_drop(xop->ip1); 618 xop->ip1 = NULL; 619 } 620 if (xop->ip2) { 621 hammer2_inode_drop(xop->ip2); 622 xop->ip2 = NULL; 623 } 624 if (xop->ip3) { 625 hammer2_inode_drop(xop->ip3); 626 xop->ip3 = NULL; 627 } 628 if (xop->name1) { 629 kfree(xop->name1, M_HAMMER2); 630 xop->name1 = NULL; 631 xop->name1_len = 0; 632 } 633 if (xop->name2) { 634 kfree(xop->name2, M_HAMMER2); 635 xop->name2 = NULL; 636 xop->name2_len = 0; 637 } 638 639 objcache_put(cache_xops, xop); 640 } 641 642 /* 643 * (Backend) Returns non-zero if the frontend is still attached. 644 */ 645 int 646 hammer2_xop_active(hammer2_xop_head_t *xop) 647 { 648 if (xop->run_mask & HAMMER2_XOPMASK_VOP) 649 return 1; 650 else 651 return 0; 652 } 653 654 /* 655 * (Backend) Feed chain data through the cluster validator and back to 656 * the frontend. Chains are fed from multiple nodes concurrently 657 * and pipelined via per-node FIFOs in the XOP. 658 * 659 * The chain must be locked (either shared or exclusive). The caller may 660 * unlock and drop the chain on return. This function will add an extra 661 * ref and hold the chain's data for the pass-back. 662 * 663 * No xop lock is needed because we are only manipulating fields under 664 * our direct control. 665 * 666 * Returns 0 on success and a hammer2 error code if sync is permanently 667 * lost. The caller retains a ref on the chain but by convention 668 * the lock is typically inherited by the xop (caller loses lock). 669 * 670 * Returns non-zero on error. In this situation the caller retains a 671 * ref on the chain but loses the lock (we unlock here). 672 */ 673 int 674 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain, 675 int clindex, int error) 676 { 677 hammer2_xop_fifo_t *fifo; 678 uint64_t mask; 679 680 /* 681 * Early termination (typicaly of xop_readir) 682 */ 683 if (hammer2_xop_active(xop) == 0) { 684 error = HAMMER2_ERROR_ABORTED; 685 goto done; 686 } 687 688 /* 689 * Multi-threaded entry into the XOP collector. We own the 690 * fifo->wi for our clindex. 691 */ 692 fifo = &xop->collect[clindex]; 693 694 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) 695 lwkt_yield(); 696 while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 697 atomic_set_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 698 mask = xop->run_mask; 699 if ((mask & HAMMER2_XOPMASK_VOP) == 0) { 700 error = HAMMER2_ERROR_ABORTED; 701 goto done; 702 } 703 tsleep_interlock(xop, 0); 704 if (atomic_cmpset_64(&xop->run_mask, mask, 705 mask | HAMMER2_XOPMASK_FIFOW)) { 706 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 707 tsleep(xop, PINTERLOCKED, "h2feed", hz*60); 708 } 709 } 710 /* retry */ 711 } 712 atomic_clear_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 713 if (chain) 714 hammer2_chain_ref_hold(chain); 715 if (error == 0 && chain) 716 error = chain->error; 717 fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error; 718 fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain; 719 cpu_sfence(); 720 ++fifo->wi; 721 722 mask = atomic_fetchadd_64(&xop->run_mask, HAMMER2_XOPMASK_FEED); 723 if (mask & HAMMER2_XOPMASK_WAIT) { 724 atomic_clear_64(&xop->run_mask, HAMMER2_XOPMASK_WAIT); 725 wakeup(xop); 726 } 727 error = 0; 728 729 /* 730 * Cleanup. If an error occurred we eat the lock. If no error 731 * occurred the fifo inherits the lock and gains an additional ref. 732 * 733 * The caller's ref remains in both cases. 734 */ 735 done: 736 return error; 737 } 738 739 /* 740 * (Frontend) collect a response from a running cluster op. 741 * 742 * Responses are fed from all appropriate nodes concurrently 743 * and collected into a cohesive response >= collect_key. 744 * 745 * The collector will return the instant quorum or other requirements 746 * are met, even if some nodes get behind or become non-responsive. 747 * 748 * HAMMER2_XOP_COLLECT_NOWAIT - Used to 'poll' a completed collection, 749 * usually called synchronously from the 750 * node XOPs for the strategy code to 751 * fake the frontend collection and complete 752 * the BIO as soon as possible. 753 * 754 * HAMMER2_XOP_SYNCHRONIZER - Reqeuest synchronization with a particular 755 * cluster index, prevents looping when that 756 * index is out of sync so caller can act on 757 * the out of sync element. ESRCH and EDEADLK 758 * can be returned if this flag is specified. 759 * 760 * Returns 0 on success plus a filled out xop->cluster structure. 761 * Return ENOENT on normal termination. 762 * Otherwise return an error. 763 * 764 * WARNING! If the xop returns a cluster with a non-NULL focus, note that 765 * none of the chains in the cluster (or the focus) are either 766 * locked or I/O synchronized with the cpu. hammer2_xop_gdata() 767 * and hammer2_xop_pdata() must be used to safely access the focus 768 * chain's content. 769 * 770 * The frontend can make certain assumptions based on higher-level 771 * locking done by the frontend, but data integrity absolutely 772 * requires using the gdata/pdata API. 773 */ 774 int 775 hammer2_xop_collect(hammer2_xop_head_t *xop, int flags) 776 { 777 hammer2_xop_fifo_t *fifo; 778 hammer2_chain_t *chain; 779 hammer2_key_t lokey; 780 uint64_t mask; 781 int error; 782 int keynull; 783 int adv; /* advance the element */ 784 int i; 785 786 loop: 787 /* 788 * First loop tries to advance pieces of the cluster which 789 * are out of sync. 790 */ 791 lokey = HAMMER2_KEY_MAX; 792 keynull = HAMMER2_CHECK_NULL; 793 mask = xop->run_mask; 794 cpu_lfence(); 795 796 for (i = 0; i < xop->cluster.nchains; ++i) { 797 chain = xop->cluster.array[i].chain; 798 if (chain == NULL) { 799 adv = 1; 800 } else if (chain->bref.key < xop->collect_key) { 801 adv = 1; 802 } else { 803 keynull &= ~HAMMER2_CHECK_NULL; 804 if (lokey > chain->bref.key) 805 lokey = chain->bref.key; 806 adv = 0; 807 } 808 if (adv == 0) 809 continue; 810 811 /* 812 * Advance element if possible, advanced element may be NULL. 813 */ 814 if (chain) 815 hammer2_chain_drop_unhold(chain); 816 817 fifo = &xop->collect[i]; 818 if (fifo->ri != fifo->wi) { 819 cpu_lfence(); 820 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 821 error = fifo->errors[fifo->ri & HAMMER2_XOPFIFO_MASK]; 822 ++fifo->ri; 823 xop->cluster.array[i].chain = chain; 824 xop->cluster.array[i].error = error; 825 if (chain == NULL) { 826 /* XXX */ 827 xop->cluster.array[i].flags |= 828 HAMMER2_CITEM_NULL; 829 } 830 if (fifo->wi - fifo->ri <= HAMMER2_XOPFIFO / 2) { 831 if (fifo->flags & HAMMER2_XOP_FIFO_STALL) { 832 atomic_clear_int(&fifo->flags, 833 HAMMER2_XOP_FIFO_STALL); 834 wakeup(xop); 835 lwkt_yield(); 836 } 837 } 838 --i; /* loop on same index */ 839 } else { 840 /* 841 * Retain CITEM_NULL flag. If set just repeat EOF. 842 * If not, the NULL,0 combination indicates an 843 * operation in-progress. 844 */ 845 xop->cluster.array[i].chain = NULL; 846 /* retain any CITEM_NULL setting */ 847 } 848 } 849 850 /* 851 * Determine whether the lowest collected key meets clustering 852 * requirements. Returns: 853 * 854 * 0 - key valid, cluster can be returned. 855 * 856 * ENOENT - normal end of scan, return ENOENT. 857 * 858 * ESRCH - sufficient elements collected, quorum agreement 859 * that lokey is not a valid element and should be 860 * skipped. 861 * 862 * EDEADLK - sufficient elements collected, no quorum agreement 863 * (and no agreement possible). In this situation a 864 * repair is needed, for now we loop. 865 * 866 * EINPROGRESS - insufficient elements collected to resolve, wait 867 * for event and loop. 868 */ 869 if ((flags & HAMMER2_XOP_COLLECT_WAITALL) && 870 (mask & HAMMER2_XOPMASK_ALLDONE) != HAMMER2_XOPMASK_VOP) { 871 error = HAMMER2_ERROR_EINPROGRESS; 872 } else { 873 error = hammer2_cluster_check(&xop->cluster, lokey, keynull); 874 } 875 if (error == HAMMER2_ERROR_EINPROGRESS) { 876 if (flags & HAMMER2_XOP_COLLECT_NOWAIT) 877 goto done; 878 tsleep_interlock(xop, 0); 879 if (atomic_cmpset_64(&xop->run_mask, 880 mask, mask | HAMMER2_XOPMASK_WAIT)) { 881 tsleep(xop, PINTERLOCKED, "h2coll", hz*60); 882 } 883 goto loop; 884 } 885 if (error == HAMMER2_ERROR_ESRCH) { 886 if (lokey != HAMMER2_KEY_MAX) { 887 xop->collect_key = lokey + 1; 888 goto loop; 889 } 890 error = HAMMER2_ERROR_ENOENT; 891 } 892 if (error == HAMMER2_ERROR_EDEADLK) { 893 kprintf("hammer2: no quorum possible lokey %016jx\n", 894 lokey); 895 if (lokey != HAMMER2_KEY_MAX) { 896 xop->collect_key = lokey + 1; 897 goto loop; 898 } 899 error = HAMMER2_ERROR_ENOENT; 900 } 901 if (lokey == HAMMER2_KEY_MAX) 902 xop->collect_key = lokey; 903 else 904 xop->collect_key = lokey + 1; 905 done: 906 return error; 907 } 908 909 /* 910 * N x M processing threads are available to handle XOPs, N per cluster 911 * index x M cluster nodes. 912 * 913 * Locate and return the next runnable xop, or NULL if no xops are 914 * present or none of the xops are currently runnable (for various reasons). 915 * The xop is left on the queue and serves to block other dependent xops 916 * from being run. 917 * 918 * Dependent xops will not be returned. 919 * 920 * Sets HAMMER2_XOP_FIFO_RUN on the returned xop or returns NULL. 921 * 922 * NOTE! Xops run concurrently for each cluster index. 923 */ 924 #define XOP_HASH_SIZE 16 925 #define XOP_HASH_MASK (XOP_HASH_SIZE - 1) 926 927 static __inline 928 int 929 xop_testhash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 930 { 931 uint32_t mask; 932 int hv; 933 934 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 935 mask = 1U << (hv & 31); 936 hv >>= 5; 937 938 return ((int)(hash[hv & XOP_HASH_MASK] & mask)); 939 } 940 941 static __inline 942 void 943 xop_sethash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 944 { 945 uint32_t mask; 946 int hv; 947 948 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 949 mask = 1U << (hv & 31); 950 hv >>= 5; 951 952 hash[hv & XOP_HASH_MASK] |= mask; 953 } 954 955 static 956 hammer2_xop_head_t * 957 hammer2_xop_next(hammer2_thread_t *thr) 958 { 959 hammer2_pfs_t *pmp = thr->pmp; 960 int clindex = thr->clindex; 961 uint32_t hash[XOP_HASH_SIZE] = { 0 }; 962 hammer2_xop_head_t *xop; 963 964 hammer2_spin_ex(&pmp->xop_spin); 965 TAILQ_FOREACH(xop, &thr->xopq, collect[clindex].entry) { 966 /* 967 * Check dependency 968 */ 969 if (xop_testhash(thr, xop->ip1, hash) || 970 (xop->ip2 && xop_testhash(thr, xop->ip2, hash)) || 971 (xop->ip3 && xop_testhash(thr, xop->ip3, hash))) { 972 continue; 973 } 974 xop_sethash(thr, xop->ip1, hash); 975 if (xop->ip2) 976 xop_sethash(thr, xop->ip2, hash); 977 if (xop->ip3) 978 xop_sethash(thr, xop->ip3, hash); 979 980 /* 981 * Check already running 982 */ 983 if (xop->collect[clindex].flags & HAMMER2_XOP_FIFO_RUN) 984 continue; 985 986 /* 987 * Found a good one, return it. 988 */ 989 atomic_set_int(&xop->collect[clindex].flags, 990 HAMMER2_XOP_FIFO_RUN); 991 break; 992 } 993 hammer2_spin_unex(&pmp->xop_spin); 994 995 return xop; 996 } 997 998 /* 999 * Remove the completed XOP from the queue, clear HAMMER2_XOP_FIFO_RUN. 1000 * 1001 * NOTE! Xops run concurrently for each cluster index. 1002 */ 1003 static 1004 void 1005 hammer2_xop_dequeue(hammer2_thread_t *thr, hammer2_xop_head_t *xop) 1006 { 1007 hammer2_pfs_t *pmp = thr->pmp; 1008 int clindex = thr->clindex; 1009 1010 hammer2_spin_ex(&pmp->xop_spin); 1011 TAILQ_REMOVE(&thr->xopq, xop, collect[clindex].entry); 1012 atomic_clear_int(&xop->collect[clindex].flags, 1013 HAMMER2_XOP_FIFO_RUN); 1014 hammer2_spin_unex(&pmp->xop_spin); 1015 if (TAILQ_FIRST(&thr->xopq)) 1016 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 1017 } 1018 1019 /* 1020 * Primary management thread for xops support. Each node has several such 1021 * threads which replicate front-end operations on cluster nodes. 1022 * 1023 * XOPS thread node operations, allowing the function to focus on a single 1024 * node in the cluster after validating the operation with the cluster. 1025 * This is primarily what prevents dead or stalled nodes from stalling 1026 * the front-end. 1027 */ 1028 void 1029 hammer2_primary_xops_thread(void *arg) 1030 { 1031 hammer2_thread_t *thr = arg; 1032 hammer2_pfs_t *pmp; 1033 hammer2_xop_head_t *xop; 1034 uint64_t mask; 1035 uint32_t flags; 1036 uint32_t nflags; 1037 hammer2_xop_func_t last_func = NULL; 1038 1039 pmp = thr->pmp; 1040 /*xgrp = &pmp->xop_groups[thr->repidx]; not needed */ 1041 mask = 1LLU << thr->clindex; 1042 1043 for (;;) { 1044 flags = thr->flags; 1045 1046 /* 1047 * Handle stop request 1048 */ 1049 if (flags & HAMMER2_THREAD_STOP) 1050 break; 1051 1052 /* 1053 * Handle freeze request 1054 */ 1055 if (flags & HAMMER2_THREAD_FREEZE) { 1056 hammer2_thr_signal2(thr, HAMMER2_THREAD_FROZEN, 1057 HAMMER2_THREAD_FREEZE); 1058 continue; 1059 } 1060 1061 if (flags & HAMMER2_THREAD_UNFREEZE) { 1062 hammer2_thr_signal2(thr, 0, 1063 HAMMER2_THREAD_FROZEN | 1064 HAMMER2_THREAD_UNFREEZE); 1065 continue; 1066 } 1067 1068 /* 1069 * Force idle if frozen until unfrozen or stopped. 1070 */ 1071 if (flags & HAMMER2_THREAD_FROZEN) { 1072 hammer2_thr_wait_any(thr, 1073 HAMMER2_THREAD_UNFREEZE | 1074 HAMMER2_THREAD_STOP, 1075 0); 1076 continue; 1077 } 1078 1079 /* 1080 * Reset state on REMASTER request 1081 */ 1082 if (flags & HAMMER2_THREAD_REMASTER) { 1083 hammer2_thr_signal2(thr, 0, HAMMER2_THREAD_REMASTER); 1084 /* reset state here */ 1085 continue; 1086 } 1087 1088 /* 1089 * Process requests. Each request can be multi-queued. 1090 * 1091 * If we get behind and the frontend VOP is no longer active, 1092 * we retire the request without processing it. The callback 1093 * may also abort processing if the frontend VOP becomes 1094 * inactive. 1095 */ 1096 if (flags & HAMMER2_THREAD_XOPQ) { 1097 nflags = flags & ~HAMMER2_THREAD_XOPQ; 1098 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 1099 continue; 1100 flags = nflags; 1101 /* fall through */ 1102 } 1103 while ((xop = hammer2_xop_next(thr)) != NULL) { 1104 if (hammer2_xop_active(xop)) { 1105 last_func = xop->func; 1106 xop->func(thr, (hammer2_xop_t *)xop); 1107 hammer2_xop_dequeue(thr, xop); 1108 hammer2_xop_retire(xop, mask); 1109 } else { 1110 last_func = xop->func; 1111 hammer2_xop_feed(xop, NULL, thr->clindex, 1112 ECONNABORTED); 1113 hammer2_xop_dequeue(thr, xop); 1114 hammer2_xop_retire(xop, mask); 1115 } 1116 } 1117 1118 /* 1119 * Wait for event, interlock using THREAD_WAITING and 1120 * THREAD_SIGNAL. 1121 * 1122 * For robustness poll on a 30-second interval, but nominally 1123 * expect to be woken up. 1124 */ 1125 nflags = flags | HAMMER2_THREAD_WAITING; 1126 1127 tsleep_interlock(&thr->flags, 0); 1128 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1129 tsleep(&thr->flags, PINTERLOCKED, "h2idle", hz*30); 1130 } 1131 } 1132 1133 #if 0 1134 /* 1135 * Cleanup / termination 1136 */ 1137 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) { 1138 kprintf("hammer2_thread: aborting xop %p\n", xop->func); 1139 TAILQ_REMOVE(&thr->xopq, xop, 1140 collect[thr->clindex].entry); 1141 hammer2_xop_retire(xop, mask); 1142 } 1143 #endif 1144 thr->td = NULL; 1145 hammer2_thr_signal(thr, HAMMER2_THREAD_STOPPED); 1146 /* thr structure can go invalid after this point */ 1147 } 1148