1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * 4 * Copyright (c) 2022 Tomohiro Kusumi <tkusumi@netbsd.org> 5 * Copyright (c) 2011-2022 The DragonFly Project. All rights reserved. 6 * 7 * This code is derived from software contributed to The DragonFly Project 8 * by Matthew Dillon <dillon@dragonflybsd.org> 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * 1. Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 2. Redistributions in binary form must reproduce the above copyright 17 * notice, this list of conditions and the following disclaimer in 18 * the documentation and/or other materials provided with the 19 * distribution. 20 * 3. Neither the name of The DragonFly Project nor the names of its 21 * contributors may be used to endorse or promote products derived 22 * from this software without specific, prior written permission. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 32 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 33 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 34 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 35 * SUCH DAMAGE. 36 */ 37 /* 38 * This module implements the hammer2 helper thread API, including 39 * the frontend/backend XOP API. 40 */ 41 #include "hammer2.h" 42 43 #define H2XOPDESCRIPTOR(label) \ 44 hammer2_xop_desc_t hammer2_##label##_desc = { \ 45 .storage_func = hammer2_xop_##label, \ 46 .id = #label \ 47 } 48 49 H2XOPDESCRIPTOR(ipcluster); 50 H2XOPDESCRIPTOR(readdir); 51 H2XOPDESCRIPTOR(nresolve); 52 H2XOPDESCRIPTOR(unlink); 53 H2XOPDESCRIPTOR(nrename); 54 H2XOPDESCRIPTOR(scanlhc); 55 H2XOPDESCRIPTOR(scanall); 56 H2XOPDESCRIPTOR(lookup); 57 H2XOPDESCRIPTOR(delete); 58 H2XOPDESCRIPTOR(inode_mkdirent); 59 H2XOPDESCRIPTOR(inode_create); 60 H2XOPDESCRIPTOR(inode_create_det); 61 H2XOPDESCRIPTOR(inode_create_ins); 62 H2XOPDESCRIPTOR(inode_destroy); 63 H2XOPDESCRIPTOR(inode_chain_sync); 64 H2XOPDESCRIPTOR(inode_unlinkall); 65 H2XOPDESCRIPTOR(inode_connect); 66 H2XOPDESCRIPTOR(inode_flush); 67 H2XOPDESCRIPTOR(strategy_read); 68 H2XOPDESCRIPTOR(strategy_write); 69 70 //struct objcache *cache_xops; 71 static struct thread dummy_td; 72 struct thread *curthread = &dummy_td; 73 74 /* 75 * Set flags and wakeup any waiters. 76 * 77 * WARNING! During teardown (thr) can disappear the instant our cmpset 78 * succeeds. 79 */ 80 void 81 hammer2_thr_signal(hammer2_thread_t *thr, uint32_t flags) 82 { 83 uint32_t oflags; 84 uint32_t nflags; 85 86 for (;;) { 87 oflags = thr->flags; 88 cpu_ccfence(); 89 nflags = (oflags | flags) & ~HAMMER2_THREAD_WAITING; 90 91 if (oflags & HAMMER2_THREAD_WAITING) { 92 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 93 wakeup(&thr->flags); 94 break; 95 } 96 } else { 97 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 98 break; 99 } 100 } 101 } 102 103 /* 104 * Set and clear flags and wakeup any waiters. 105 * 106 * WARNING! During teardown (thr) can disappear the instant our cmpset 107 * succeeds. 108 */ 109 void 110 hammer2_thr_signal2(hammer2_thread_t *thr, uint32_t posflags, uint32_t negflags) 111 { 112 uint32_t oflags; 113 uint32_t nflags; 114 115 for (;;) { 116 oflags = thr->flags; 117 cpu_ccfence(); 118 nflags = (oflags | posflags) & 119 ~(negflags | HAMMER2_THREAD_WAITING); 120 if (oflags & HAMMER2_THREAD_WAITING) { 121 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 122 wakeup(&thr->flags); 123 break; 124 } 125 } else { 126 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 127 break; 128 } 129 } 130 } 131 132 /* 133 * Wait until all the bits in flags are set. 134 * 135 * WARNING! During teardown (thr) can disappear the instant our cmpset 136 * succeeds. 137 */ 138 void 139 hammer2_thr_wait(hammer2_thread_t *thr, uint32_t flags) 140 { 141 uint32_t oflags; 142 uint32_t nflags; 143 144 for (;;) { 145 oflags = thr->flags; 146 cpu_ccfence(); 147 if ((oflags & flags) == flags) 148 break; 149 nflags = oflags | HAMMER2_THREAD_WAITING; 150 tsleep_interlock(&thr->flags, 0); 151 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 152 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 153 } 154 } 155 } 156 157 /* 158 * Wait until any of the bits in flags are set, with timeout. 159 * 160 * WARNING! During teardown (thr) can disappear the instant our cmpset 161 * succeeds. 162 */ 163 int 164 hammer2_thr_wait_any(hammer2_thread_t *thr, uint32_t flags, int timo) 165 { 166 uint32_t oflags; 167 uint32_t nflags; 168 int error; 169 170 error = 0; 171 for (;;) { 172 oflags = thr->flags; 173 cpu_ccfence(); 174 if (oflags & flags) 175 break; 176 nflags = oflags | HAMMER2_THREAD_WAITING; 177 tsleep_interlock(&thr->flags, 0); 178 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 179 error = tsleep(&thr->flags, PINTERLOCKED, 180 "h2twait", timo); 181 } 182 if (error == ETIMEDOUT) { 183 error = HAMMER2_ERROR_ETIMEDOUT; 184 break; 185 } 186 } 187 return error; 188 } 189 190 /* 191 * Wait until the bits in flags are clear. 192 * 193 * WARNING! During teardown (thr) can disappear the instant our cmpset 194 * succeeds. 195 */ 196 void 197 hammer2_thr_wait_neg(hammer2_thread_t *thr, uint32_t flags) 198 { 199 uint32_t oflags; 200 uint32_t nflags; 201 202 for (;;) { 203 oflags = thr->flags; 204 cpu_ccfence(); 205 if ((oflags & flags) == 0) 206 break; 207 nflags = oflags | HAMMER2_THREAD_WAITING; 208 tsleep_interlock(&thr->flags, 0); 209 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 210 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 211 } 212 } 213 } 214 215 /* 216 * Initialize the supplied thread structure, starting the specified 217 * thread. 218 * 219 * NOTE: thr structure can be retained across mounts and unmounts for this 220 * pmp, so make sure the flags are in a sane state. 221 */ 222 void 223 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp, 224 hammer2_dev_t *hmp, 225 const char *id, int clindex, int repidx, 226 void (*func)(void *arg)) 227 { 228 thr->pmp = pmp; /* xop helpers */ 229 thr->hmp = hmp; /* bulkfree */ 230 thr->clindex = clindex; 231 thr->repidx = repidx; 232 TAILQ_INIT(&thr->xopq); 233 atomic_clear_int(&thr->flags, HAMMER2_THREAD_STOP | 234 HAMMER2_THREAD_STOPPED | 235 HAMMER2_THREAD_FREEZE | 236 HAMMER2_THREAD_FROZEN); 237 if (thr->scratch == NULL) 238 thr->scratch = kmalloc(MAXPHYS, M_HAMMER2, M_WAITOK | M_ZERO); 239 #if 0 240 if (repidx >= 0) { 241 lwkt_create(func, thr, &thr->td, NULL, 0, repidx % ncpus, 242 "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx); 243 } else if (pmp) { 244 lwkt_create(func, thr, &thr->td, NULL, 0, -1, 245 "%s-%s", id, pmp->pfs_names[clindex]); 246 } else { 247 lwkt_create(func, thr, &thr->td, NULL, 0, -1, "%s", id); 248 } 249 #else 250 thr->td = &dummy_td; 251 #endif 252 } 253 254 /* 255 * Terminate a thread. This function will silently return if the thread 256 * was never initialized or has already been deleted. 257 * 258 * This is accomplished by setting the STOP flag and waiting for the td 259 * structure to become NULL. 260 */ 261 void 262 hammer2_thr_delete(hammer2_thread_t *thr) 263 { 264 if (thr->td == NULL) 265 return; 266 hammer2_thr_signal(thr, HAMMER2_THREAD_STOP); 267 /* Don't wait, there's no such thread in makefs */ 268 //hammer2_thr_wait(thr, HAMMER2_THREAD_STOPPED); 269 thr->pmp = NULL; 270 if (thr->scratch) { 271 kfree(thr->scratch, M_HAMMER2); 272 thr->scratch = NULL; 273 } 274 KKASSERT(TAILQ_EMPTY(&thr->xopq)); 275 } 276 277 /* 278 * Asynchronous remaster request. Ask the synchronization thread to 279 * start over soon (as if it were frozen and unfrozen, but without waiting). 280 * The thread always recalculates mastership relationships when restarting. 281 */ 282 void 283 hammer2_thr_remaster(hammer2_thread_t *thr) 284 { 285 if (thr->td == NULL) 286 return; 287 hammer2_thr_signal(thr, HAMMER2_THREAD_REMASTER); 288 } 289 290 void 291 hammer2_thr_freeze_async(hammer2_thread_t *thr) 292 { 293 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 294 } 295 296 void 297 hammer2_thr_freeze(hammer2_thread_t *thr) 298 { 299 if (thr->td == NULL) 300 return; 301 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 302 hammer2_thr_wait(thr, HAMMER2_THREAD_FROZEN); 303 } 304 305 void 306 hammer2_thr_unfreeze(hammer2_thread_t *thr) 307 { 308 if (thr->td == NULL) 309 return; 310 hammer2_thr_signal(thr, HAMMER2_THREAD_UNFREEZE); 311 hammer2_thr_wait_neg(thr, HAMMER2_THREAD_FROZEN); 312 } 313 314 int 315 hammer2_thr_break(hammer2_thread_t *thr) 316 { 317 if (thr->flags & (HAMMER2_THREAD_STOP | 318 HAMMER2_THREAD_REMASTER | 319 HAMMER2_THREAD_FREEZE)) { 320 return 1; 321 } 322 return 0; 323 } 324 325 /**************************************************************************** 326 * HAMMER2 XOPS API * 327 ****************************************************************************/ 328 329 /* 330 * Allocate a XOP request. 331 * 332 * Once allocated a XOP request can be started, collected, and retired, 333 * and can be retired early if desired. 334 * 335 * NOTE: Fifo indices might not be zero but ri == wi on objcache_get(). 336 */ 337 void * 338 hammer2_xop_alloc(hammer2_inode_t *ip, int flags) 339 { 340 hammer2_xop_t *xop; 341 342 xop = ecalloc(1, sizeof(*xop)); 343 KKASSERT(xop->head.cluster.array[0].chain == NULL); 344 345 xop->head.ip1 = ip; 346 xop->head.desc = NULL; 347 xop->head.flags = flags; 348 xop->head.state = 0; 349 xop->head.error = 0; 350 xop->head.collect_key = 0; 351 xop->head.focus_dio = NULL; 352 353 if (flags & HAMMER2_XOP_MODIFYING) 354 xop->head.mtid = hammer2_trans_sub(ip->pmp); 355 else 356 xop->head.mtid = 0; 357 358 xop->head.cluster.nchains = ip->cluster.nchains; 359 xop->head.cluster.pmp = ip->pmp; 360 xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED; 361 362 /* 363 * run_mask - Active thread (or frontend) associated with XOP 364 */ 365 xop->head.run_mask = HAMMER2_XOPMASK_VOP; 366 367 hammer2_inode_ref(ip); 368 369 return xop; 370 } 371 372 void 373 hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len) 374 { 375 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 376 xop->name1_len = name_len; 377 bcopy(name, xop->name1, name_len); 378 } 379 380 void 381 hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len) 382 { 383 xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 384 xop->name2_len = name_len; 385 bcopy(name, xop->name2, name_len); 386 } 387 388 size_t 389 hammer2_xop_setname_inum(hammer2_xop_head_t *xop, hammer2_key_t inum) 390 { 391 const size_t name_len = 18; 392 393 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 394 xop->name1_len = name_len; 395 ksnprintf(xop->name1, name_len + 1, "0x%016jx", (intmax_t)inum); 396 397 return name_len; 398 } 399 400 401 void 402 hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2) 403 { 404 xop->ip2 = ip2; 405 hammer2_inode_ref(ip2); 406 } 407 408 void 409 hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3) 410 { 411 xop->ip3 = ip3; 412 hammer2_inode_ref(ip3); 413 } 414 415 void 416 hammer2_xop_setip4(hammer2_xop_head_t *xop, hammer2_inode_t *ip4) 417 { 418 xop->ip4 = ip4; 419 hammer2_inode_ref(ip4); 420 } 421 422 void 423 hammer2_xop_reinit(hammer2_xop_head_t *xop) 424 { 425 xop->state = 0; 426 xop->error = 0; 427 xop->collect_key = 0; 428 xop->run_mask = HAMMER2_XOPMASK_VOP; 429 } 430 431 /* 432 * A mounted PFS needs Xops threads to support frontend operations. 433 */ 434 void 435 hammer2_xop_helper_create(hammer2_pfs_t *pmp) 436 { 437 int i; 438 int j; 439 440 lockmgr(&pmp->lock, LK_EXCLUSIVE); 441 pmp->has_xop_threads = 1; 442 443 pmp->xop_groups = kmalloc(hammer2_xop_nthreads * 444 sizeof(hammer2_xop_group_t), 445 M_HAMMER2, M_WAITOK | M_ZERO); 446 for (i = 0; i < pmp->iroot->cluster.nchains; ++i) { 447 for (j = 0; j < hammer2_xop_nthreads; ++j) { 448 if (pmp->xop_groups[j].thrs[i].td) 449 continue; 450 hammer2_thr_create(&pmp->xop_groups[j].thrs[i], 451 pmp, NULL, 452 "h2xop", i, j, 453 hammer2_primary_xops_thread); 454 } 455 } 456 lockmgr(&pmp->lock, LK_RELEASE); 457 } 458 459 void 460 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp) 461 { 462 int i; 463 int j; 464 465 if (pmp->xop_groups == NULL) { 466 KKASSERT(pmp->has_xop_threads == 0); 467 return; 468 } 469 470 for (i = 0; i < pmp->pfs_nmasters; ++i) { 471 for (j = 0; j < hammer2_xop_nthreads; ++j) { 472 if (pmp->xop_groups[j].thrs[i].td) 473 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]); 474 } 475 } 476 pmp->has_xop_threads = 0; 477 kfree(pmp->xop_groups, M_HAMMER2); 478 pmp->xop_groups = NULL; 479 } 480 481 /* 482 * Start a XOP request, queueing it to all nodes in the cluster to 483 * execute the cluster op. 484 * 485 * XXX optimize single-target case. 486 */ 487 void 488 hammer2_xop_start_except(hammer2_xop_head_t *xop, hammer2_xop_desc_t *desc, 489 int notidx) 490 { 491 hammer2_inode_t *ip1; 492 hammer2_pfs_t *pmp; 493 hammer2_thread_t *thr; 494 int i; 495 int ng; 496 int nchains; 497 498 ip1 = xop->ip1; 499 pmp = ip1->pmp; 500 if (pmp->has_xop_threads == 0) 501 hammer2_xop_helper_create(pmp); 502 503 /* 504 * The sequencer assigns a worker thread to the XOP. 505 * 506 * (1) The worker threads are partitioned into two sets, one for 507 * NON-STRATEGY XOPs, and the other for STRATEGY XOPs. This 508 * guarantees that strategy calls will always be able to make 509 * progress and will not deadlock against non-strategy calls. 510 * 511 * (2) If clustered, non-strategy operations to the same inode must 512 * be serialized. This is to avoid confusion when issuing 513 * modifying operations because a XOP completes the instant a 514 * quorum is reached. 515 * 516 * TODO - RENAME fails here because it is potentially modifying 517 * three different inodes, but we triple-lock the inodes 518 * involved so it shouldn't create a sequencing schism. 519 */ 520 if (xop->flags & HAMMER2_XOP_STRATEGY) { 521 /* 522 * Use worker space 0 associated with the current cpu 523 * for strategy ops. 524 */ 525 /* 526 hammer2_xop_strategy_t *xopst; 527 u_int which; 528 529 xopst = &((hammer2_xop_t *)xop)->xop_strategy; 530 which = ((unsigned int)ip1->ihash + 531 ((unsigned int)xopst->lbase >> HAMMER2_PBUFRADIX)) % 532 hammer2_xop_sgroups; 533 ng = mycpu->gd_cpuid % hammer2_xop_mod + 534 hammer2_xop_mod * which; 535 */ 536 ng = 0; 537 } else if (hammer2_spread_workers == 0 && ip1->cluster.nchains == 1) { 538 /* 539 * For now try to keep the work on the same cpu to reduce 540 * IPI overhead. Several threads are assigned to each cpu, 541 * don't be very smart and select the one to use based on 542 * the inode hash. 543 */ 544 /* 545 u_int which; 546 547 which = (unsigned int)ip1->ihash % hammer2_xop_xgroups; 548 ng = mycpu->gd_cpuid % hammer2_xop_mod + 549 (which * hammer2_xop_mod) + 550 hammer2_xop_xbase; 551 */ 552 ng = 0; 553 } else { 554 /* 555 * Hash based on inode only, must serialize inode to same 556 * thread regardless of current cpu. 557 */ 558 /* 559 ng = (unsigned int)ip1->ihash % 560 (hammer2_xop_mod * hammer2_xop_xgroups) + 561 hammer2_xop_xbase; 562 */ 563 ng = 0; 564 } 565 xop->desc = desc; 566 567 /* 568 * The instant xop is queued another thread can pick it off. In the 569 * case of asynchronous ops, another thread might even finish and 570 * deallocate it. 571 */ 572 hammer2_spin_ex(&pmp->xop_spin); 573 nchains = ip1->cluster.nchains; 574 for (i = 0; i < nchains; ++i) { 575 /* 576 * XXX ip1->cluster.array* not stable here. This temporary 577 * hack fixes basic issues in target XOPs which need to 578 * obtain a starting chain from the inode but does not 579 * address possible races against inode updates which 580 * might NULL-out a chain. 581 */ 582 if (i != notidx && ip1->cluster.array[i].chain) { 583 thr = &pmp->xop_groups[ng].thrs[i]; 584 atomic_set_64(&xop->run_mask, 1LLU << i); 585 atomic_set_64(&xop->chk_mask, 1LLU << i); 586 xop->collect[i].thr = thr; 587 TAILQ_INSERT_TAIL(&thr->xopq, xop, collect[i].entry); 588 } 589 } 590 hammer2_spin_unex(&pmp->xop_spin); 591 /* xop can become invalid at this point */ 592 593 /* 594 * Each thread has its own xopq 595 */ 596 for (i = 0; i < nchains; ++i) { 597 if (i != notidx) { 598 thr = &pmp->xop_groups[ng].thrs[i]; 599 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 600 hammer2_primary_xops_thread(thr); 601 } 602 } 603 } 604 605 void 606 hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_desc_t *desc) 607 { 608 hammer2_xop_start_except(xop, desc, -1); 609 } 610 611 /* 612 * Retire a XOP. Used by both the VOP frontend and by the XOP backend. 613 */ 614 void 615 hammer2_xop_retire(hammer2_xop_head_t *xop, uint64_t mask) 616 { 617 hammer2_chain_t *chain; 618 uint64_t nmask; 619 int i; 620 621 /* 622 * Remove the frontend collector or remove a backend feeder. 623 * 624 * When removing the frontend we must wakeup any backend feeders 625 * who are waiting for FIFO space. 626 * 627 * When removing the last backend feeder we must wakeup any waiting 628 * frontend. 629 */ 630 KKASSERT(xop->run_mask & mask); 631 nmask = atomic_fetchadd_64(&xop->run_mask, 632 -mask + HAMMER2_XOPMASK_FEED); 633 634 /* 635 * More than one entity left 636 */ 637 if ((nmask & HAMMER2_XOPMASK_ALLDONE) != mask) { 638 /* 639 * Frontend terminating, wakeup any backends waiting on 640 * fifo full. 641 * 642 * NOTE!!! The xop can get ripped out from under us at 643 * this point, so do not reference it again. 644 * The wakeup(xop) doesn't touch the xop and 645 * is ok. 646 */ 647 if (mask == HAMMER2_XOPMASK_VOP) { 648 if (nmask & HAMMER2_XOPMASK_FIFOW) 649 wakeup(xop); 650 } 651 652 /* 653 * Wakeup frontend if the last backend is terminating. 654 */ 655 nmask -= mask; 656 if ((nmask & HAMMER2_XOPMASK_ALLDONE) == HAMMER2_XOPMASK_VOP) { 657 if (nmask & HAMMER2_XOPMASK_WAIT) 658 wakeup(xop); 659 } 660 661 return; 662 } 663 /* else nobody else left, we can ignore FIFOW */ 664 665 /* 666 * All collectors are gone, we can cleanup and dispose of the XOP. 667 * Note that this can wind up being a frontend OR a backend. 668 * Pending chains are locked shared and not owned by any thread. 669 * 670 * Cleanup the collection cluster. 671 */ 672 for (i = 0; i < xop->cluster.nchains; ++i) { 673 xop->cluster.array[i].flags = 0; 674 chain = xop->cluster.array[i].chain; 675 if (chain) { 676 xop->cluster.array[i].chain = NULL; 677 hammer2_chain_drop_unhold(chain); 678 } 679 } 680 681 /* 682 * Cleanup the fifos. Since we are the only entity left on this 683 * xop we don't have to worry about fifo flow control, and one 684 * lfence() will do the job. 685 */ 686 cpu_lfence(); 687 mask = xop->chk_mask; 688 for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) { 689 hammer2_xop_fifo_t *fifo = &xop->collect[i]; 690 while (fifo->ri != fifo->wi) { 691 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 692 if (chain) 693 hammer2_chain_drop_unhold(chain); 694 ++fifo->ri; 695 } 696 mask &= ~(1U << i); 697 } 698 699 /* 700 * The inode is only held at this point, simply drop it. 701 */ 702 if (xop->ip1) { 703 hammer2_inode_drop(xop->ip1); 704 xop->ip1 = NULL; 705 } 706 if (xop->ip2) { 707 hammer2_inode_drop(xop->ip2); 708 xop->ip2 = NULL; 709 } 710 if (xop->ip3) { 711 hammer2_inode_drop(xop->ip3); 712 xop->ip3 = NULL; 713 } 714 if (xop->ip4) { 715 hammer2_inode_drop(xop->ip4); 716 xop->ip4 = NULL; 717 } 718 if (xop->name1) { 719 kfree(xop->name1, M_HAMMER2); 720 xop->name1 = NULL; 721 xop->name1_len = 0; 722 } 723 if (xop->name2) { 724 kfree(xop->name2, M_HAMMER2); 725 xop->name2 = NULL; 726 xop->name2_len = 0; 727 } 728 729 free(xop); 730 } 731 732 /* 733 * (Backend) Returns non-zero if the frontend is still attached. 734 */ 735 int 736 hammer2_xop_active(hammer2_xop_head_t *xop) 737 { 738 if (xop->run_mask & HAMMER2_XOPMASK_VOP) 739 return 1; 740 else 741 return 0; 742 } 743 744 /* 745 * (Backend) Feed chain data through the cluster validator and back to 746 * the frontend. Chains are fed from multiple nodes concurrently 747 * and pipelined via per-node FIFOs in the XOP. 748 * 749 * The chain must be locked (either shared or exclusive). The caller may 750 * unlock and drop the chain on return. This function will add an extra 751 * ref and hold the chain's data for the pass-back. 752 * 753 * No xop lock is needed because we are only manipulating fields under 754 * our direct control. 755 * 756 * Returns 0 on success and a hammer2 error code if sync is permanently 757 * lost. The caller retains a ref on the chain but by convention 758 * the lock is typically inherited by the xop (caller loses lock). 759 * 760 * Returns non-zero on error. In this situation the caller retains a 761 * ref on the chain but loses the lock (we unlock here). 762 */ 763 int 764 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain, 765 int clindex, int error) 766 { 767 hammer2_xop_fifo_t *fifo; 768 uint64_t mask; 769 770 /* 771 * Early termination (typicaly of xop_readir) 772 */ 773 if (hammer2_xop_active(xop) == 0) { 774 error = HAMMER2_ERROR_ABORTED; 775 goto done; 776 } 777 778 /* 779 * Multi-threaded entry into the XOP collector. We own the 780 * fifo->wi for our clindex. 781 */ 782 fifo = &xop->collect[clindex]; 783 784 /* 785 * makefs HAMMER2 has no dedicated XOP threads, 786 * so nothing we can do once fifo reaches HAMMER2_XOPFIFO. 787 * Fortunately, readdir VOP is the only VOP causes this, 788 * and makefs HAMMER2 doesn't implement it. 789 */ 790 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) 791 panic("hammer2: \"%s\" reached fifo limit %d", 792 xop->desc->id, HAMMER2_XOPFIFO); 793 794 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) 795 lwkt_yield(); 796 while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 797 atomic_set_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 798 mask = xop->run_mask; 799 if ((mask & HAMMER2_XOPMASK_VOP) == 0) { 800 error = HAMMER2_ERROR_ABORTED; 801 goto done; 802 } 803 tsleep_interlock(xop, 0); 804 if (atomic_cmpset_64(&xop->run_mask, mask, 805 mask | HAMMER2_XOPMASK_FIFOW)) { 806 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 807 tsleep(xop, PINTERLOCKED, "h2feed", hz*60); 808 } 809 } 810 /* retry */ 811 } 812 atomic_clear_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 813 if (chain) 814 hammer2_chain_ref_hold(chain); 815 if (error == 0 && chain) 816 error = chain->error; 817 fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error; 818 fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain; 819 cpu_sfence(); 820 ++fifo->wi; 821 822 mask = atomic_fetchadd_64(&xop->run_mask, HAMMER2_XOPMASK_FEED); 823 if (mask & HAMMER2_XOPMASK_WAIT) { 824 atomic_clear_64(&xop->run_mask, HAMMER2_XOPMASK_WAIT); 825 wakeup(xop); 826 } 827 error = 0; 828 829 /* 830 * Cleanup. If an error occurred we eat the lock. If no error 831 * occurred the fifo inherits the lock and gains an additional ref. 832 * 833 * The caller's ref remains in both cases. 834 */ 835 done: 836 return error; 837 } 838 839 /* 840 * (Frontend) collect a response from a running cluster op. 841 * 842 * Responses are fed from all appropriate nodes concurrently 843 * and collected into a cohesive response >= collect_key. 844 * 845 * The collector will return the instant quorum or other requirements 846 * are met, even if some nodes get behind or become non-responsive. 847 * 848 * HAMMER2_XOP_COLLECT_NOWAIT - Used to 'poll' a completed collection, 849 * usually called synchronously from the 850 * node XOPs for the strategy code to 851 * fake the frontend collection and complete 852 * the BIO as soon as possible. 853 * 854 * HAMMER2_XOP_SYNCHRONIZER - Reqeuest synchronization with a particular 855 * cluster index, prevents looping when that 856 * index is out of sync so caller can act on 857 * the out of sync element. ESRCH and EDEADLK 858 * can be returned if this flag is specified. 859 * 860 * Returns 0 on success plus a filled out xop->cluster structure. 861 * Return ENOENT on normal termination. 862 * Otherwise return an error. 863 * 864 * WARNING! If the xop returns a cluster with a non-NULL focus, note that 865 * none of the chains in the cluster (or the focus) are either 866 * locked or I/O synchronized with the cpu. hammer2_xop_gdata() 867 * and hammer2_xop_pdata() must be used to safely access the focus 868 * chain's content. 869 * 870 * The frontend can make certain assumptions based on higher-level 871 * locking done by the frontend, but data integrity absolutely 872 * requires using the gdata/pdata API. 873 */ 874 int 875 hammer2_xop_collect(hammer2_xop_head_t *xop, int flags) 876 { 877 hammer2_xop_fifo_t *fifo; 878 hammer2_chain_t *chain; 879 hammer2_key_t lokey; 880 uint64_t mask; 881 int error; 882 int keynull; 883 int adv; /* advance the element */ 884 int i; 885 886 loop: 887 /* 888 * First loop tries to advance pieces of the cluster which 889 * are out of sync. 890 */ 891 lokey = HAMMER2_KEY_MAX; 892 keynull = HAMMER2_CHECK_NULL; 893 mask = xop->run_mask; 894 cpu_lfence(); 895 896 for (i = 0; i < xop->cluster.nchains; ++i) { 897 chain = xop->cluster.array[i].chain; 898 if (chain == NULL) { 899 adv = 1; 900 } else if (chain->bref.key < xop->collect_key) { 901 adv = 1; 902 } else { 903 keynull &= ~HAMMER2_CHECK_NULL; 904 if (lokey > chain->bref.key) 905 lokey = chain->bref.key; 906 adv = 0; 907 } 908 if (adv == 0) 909 continue; 910 911 /* 912 * Advance element if possible, advanced element may be NULL. 913 */ 914 if (chain) 915 hammer2_chain_drop_unhold(chain); 916 917 fifo = &xop->collect[i]; 918 if (fifo->ri != fifo->wi) { 919 cpu_lfence(); 920 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 921 error = fifo->errors[fifo->ri & HAMMER2_XOPFIFO_MASK]; 922 ++fifo->ri; 923 xop->cluster.array[i].chain = chain; 924 xop->cluster.array[i].error = error; 925 if (chain == NULL) { 926 /* XXX */ 927 xop->cluster.array[i].flags |= 928 HAMMER2_CITEM_NULL; 929 } 930 if (fifo->wi - fifo->ri <= HAMMER2_XOPFIFO / 2) { 931 if (fifo->flags & HAMMER2_XOP_FIFO_STALL) { 932 atomic_clear_int(&fifo->flags, 933 HAMMER2_XOP_FIFO_STALL); 934 wakeup(xop); 935 lwkt_yield(); 936 } 937 } 938 --i; /* loop on same index */ 939 } else { 940 /* 941 * Retain CITEM_NULL flag. If set just repeat EOF. 942 * If not, the NULL,0 combination indicates an 943 * operation in-progress. 944 */ 945 xop->cluster.array[i].chain = NULL; 946 /* retain any CITEM_NULL setting */ 947 } 948 } 949 950 /* 951 * Determine whether the lowest collected key meets clustering 952 * requirements. Returns HAMMER2_ERROR_*: 953 * 954 * 0 - key valid, cluster can be returned. 955 * 956 * ENOENT - normal end of scan, return ENOENT. 957 * 958 * ESRCH - sufficient elements collected, quorum agreement 959 * that lokey is not a valid element and should be 960 * skipped. 961 * 962 * EDEADLK - sufficient elements collected, no quorum agreement 963 * (and no agreement possible). In this situation a 964 * repair is needed, for now we loop. 965 * 966 * EINPROGRESS - insufficient elements collected to resolve, wait 967 * for event and loop. 968 * 969 * EIO/ECHECK - IO error or CRC check error from hammer2_cluster_check() 970 */ 971 if ((flags & HAMMER2_XOP_COLLECT_WAITALL) && 972 (mask & HAMMER2_XOPMASK_ALLDONE) != HAMMER2_XOPMASK_VOP) { 973 error = HAMMER2_ERROR_EINPROGRESS; 974 } else { 975 error = hammer2_cluster_check(&xop->cluster, lokey, keynull); 976 } 977 if (error == HAMMER2_ERROR_EINPROGRESS) { 978 if (flags & HAMMER2_XOP_COLLECT_NOWAIT) 979 goto done; 980 tsleep_interlock(xop, 0); 981 if (atomic_cmpset_64(&xop->run_mask, 982 mask, mask | HAMMER2_XOPMASK_WAIT)) { 983 tsleep(xop, PINTERLOCKED, "h2coll", hz*60); 984 } 985 goto loop; 986 } 987 if (error == HAMMER2_ERROR_ESRCH) { 988 if (lokey != HAMMER2_KEY_MAX) { 989 xop->collect_key = lokey + 1; 990 goto loop; 991 } 992 error = HAMMER2_ERROR_ENOENT; 993 } 994 if (error == HAMMER2_ERROR_EDEADLK) { 995 kprintf("hammer2: no quorum possible lokey %016jx\n", 996 lokey); 997 if (lokey != HAMMER2_KEY_MAX) { 998 xop->collect_key = lokey + 1; 999 goto loop; 1000 } 1001 error = HAMMER2_ERROR_ENOENT; 1002 } 1003 if (lokey == HAMMER2_KEY_MAX) 1004 xop->collect_key = lokey; 1005 else 1006 xop->collect_key = lokey + 1; 1007 done: 1008 return error; 1009 } 1010 1011 /* 1012 * N x M processing threads are available to handle XOPs, N per cluster 1013 * index x M cluster nodes. 1014 * 1015 * Locate and return the next runnable xop, or NULL if no xops are 1016 * present or none of the xops are currently runnable (for various reasons). 1017 * The xop is left on the queue and serves to block other dependent xops 1018 * from being run. 1019 * 1020 * Dependent xops will not be returned. 1021 * 1022 * Sets HAMMER2_XOP_FIFO_RUN on the returned xop or returns NULL. 1023 * 1024 * NOTE! Xops run concurrently for each cluster index. 1025 */ 1026 #define XOP_HASH_SIZE 16 1027 #define XOP_HASH_MASK (XOP_HASH_SIZE - 1) 1028 1029 static __inline 1030 int 1031 xop_testhash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 1032 { 1033 uint32_t mask; 1034 int hv; 1035 1036 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 1037 mask = 1U << (hv & 31); 1038 hv >>= 5; 1039 1040 return ((int)(hash[hv & XOP_HASH_MASK] & mask)); 1041 } 1042 1043 static __inline 1044 void 1045 xop_sethash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 1046 { 1047 uint32_t mask; 1048 int hv; 1049 1050 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 1051 mask = 1U << (hv & 31); 1052 hv >>= 5; 1053 1054 hash[hv & XOP_HASH_MASK] |= mask; 1055 } 1056 1057 static 1058 hammer2_xop_head_t * 1059 hammer2_xop_next(hammer2_thread_t *thr) 1060 { 1061 hammer2_pfs_t *pmp = thr->pmp; 1062 int clindex = thr->clindex; 1063 uint32_t hash[XOP_HASH_SIZE] = { 0 }; 1064 hammer2_xop_head_t *xop; 1065 1066 hammer2_spin_ex(&pmp->xop_spin); 1067 TAILQ_FOREACH(xop, &thr->xopq, collect[clindex].entry) { 1068 /* 1069 * Check dependency 1070 */ 1071 if (xop_testhash(thr, xop->ip1, hash) || 1072 (xop->ip2 && xop_testhash(thr, xop->ip2, hash)) || 1073 (xop->ip3 && xop_testhash(thr, xop->ip3, hash)) || 1074 (xop->ip4 && xop_testhash(thr, xop->ip4, hash))) 1075 { 1076 continue; 1077 } 1078 xop_sethash(thr, xop->ip1, hash); 1079 if (xop->ip2) 1080 xop_sethash(thr, xop->ip2, hash); 1081 if (xop->ip3) 1082 xop_sethash(thr, xop->ip3, hash); 1083 if (xop->ip4) 1084 xop_sethash(thr, xop->ip4, hash); 1085 1086 /* 1087 * Check already running 1088 */ 1089 if (xop->collect[clindex].flags & HAMMER2_XOP_FIFO_RUN) 1090 continue; 1091 1092 /* 1093 * Found a good one, return it. 1094 */ 1095 atomic_set_int(&xop->collect[clindex].flags, 1096 HAMMER2_XOP_FIFO_RUN); 1097 break; 1098 } 1099 hammer2_spin_unex(&pmp->xop_spin); 1100 1101 return xop; 1102 } 1103 1104 /* 1105 * Remove the completed XOP from the queue, clear HAMMER2_XOP_FIFO_RUN. 1106 * 1107 * NOTE! Xops run concurrently for each cluster index. 1108 */ 1109 static 1110 void 1111 hammer2_xop_dequeue(hammer2_thread_t *thr, hammer2_xop_head_t *xop) 1112 { 1113 hammer2_pfs_t *pmp = thr->pmp; 1114 int clindex = thr->clindex; 1115 1116 hammer2_spin_ex(&pmp->xop_spin); 1117 TAILQ_REMOVE(&thr->xopq, xop, collect[clindex].entry); 1118 atomic_clear_int(&xop->collect[clindex].flags, 1119 HAMMER2_XOP_FIFO_RUN); 1120 hammer2_spin_unex(&pmp->xop_spin); 1121 if (TAILQ_FIRST(&thr->xopq)) 1122 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 1123 } 1124 1125 /* 1126 * Primary management thread for xops support. Each node has several such 1127 * threads which replicate front-end operations on cluster nodes. 1128 * 1129 * XOPS thread node operations, allowing the function to focus on a single 1130 * node in the cluster after validating the operation with the cluster. 1131 * This is primarily what prevents dead or stalled nodes from stalling 1132 * the front-end. 1133 */ 1134 void 1135 hammer2_primary_xops_thread(void *arg) 1136 { 1137 hammer2_thread_t *thr = arg; 1138 hammer2_xop_head_t *xop; 1139 uint64_t mask; 1140 uint32_t flags; 1141 uint32_t nflags; 1142 1143 mask = 1LLU << thr->clindex; 1144 1145 for (;;) { 1146 flags = thr->flags; 1147 1148 /* 1149 * Handle stop request 1150 */ 1151 if (flags & HAMMER2_THREAD_STOP) 1152 break; 1153 1154 /* 1155 * Handle freeze request 1156 */ 1157 if (flags & HAMMER2_THREAD_FREEZE) { 1158 hammer2_thr_signal2(thr, HAMMER2_THREAD_FROZEN, 1159 HAMMER2_THREAD_FREEZE); 1160 continue; 1161 } 1162 1163 if (flags & HAMMER2_THREAD_UNFREEZE) { 1164 hammer2_thr_signal2(thr, 0, 1165 HAMMER2_THREAD_FROZEN | 1166 HAMMER2_THREAD_UNFREEZE); 1167 continue; 1168 } 1169 1170 /* 1171 * Force idle if frozen until unfrozen or stopped. 1172 */ 1173 if (flags & HAMMER2_THREAD_FROZEN) { 1174 hammer2_thr_wait_any(thr, 1175 HAMMER2_THREAD_UNFREEZE | 1176 HAMMER2_THREAD_STOP, 1177 0); 1178 continue; 1179 } 1180 1181 /* 1182 * Reset state on REMASTER request 1183 */ 1184 if (flags & HAMMER2_THREAD_REMASTER) { 1185 hammer2_thr_signal2(thr, 0, HAMMER2_THREAD_REMASTER); 1186 /* reset state here */ 1187 continue; 1188 } 1189 1190 /* 1191 * Process requests. Each request can be multi-queued. 1192 * 1193 * If we get behind and the frontend VOP is no longer active, 1194 * we retire the request without processing it. The callback 1195 * may also abort processing if the frontend VOP becomes 1196 * inactive. 1197 */ 1198 if (flags & HAMMER2_THREAD_XOPQ) { 1199 nflags = flags & ~HAMMER2_THREAD_XOPQ; 1200 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 1201 continue; 1202 flags = nflags; 1203 /* fall through */ 1204 } 1205 while ((xop = hammer2_xop_next(thr)) != NULL) { 1206 if (hammer2_xop_active(xop)) { 1207 xop->desc->storage_func((hammer2_xop_t *)xop, 1208 thr->scratch, 1209 thr->clindex); 1210 hammer2_xop_dequeue(thr, xop); 1211 hammer2_xop_retire(xop, mask); 1212 } else { 1213 hammer2_xop_feed(xop, NULL, thr->clindex, 1214 ECONNABORTED); 1215 hammer2_xop_dequeue(thr, xop); 1216 hammer2_xop_retire(xop, mask); 1217 } 1218 } 1219 1220 /* Don't wait, this is a XOP caller thread in makefs */ 1221 break; 1222 1223 /* 1224 * Wait for event, interlock using THREAD_WAITING and 1225 * THREAD_SIGNAL. 1226 * 1227 * For robustness poll on a 30-second interval, but nominally 1228 * expect to be woken up. 1229 */ 1230 nflags = flags | HAMMER2_THREAD_WAITING; 1231 1232 tsleep_interlock(&thr->flags, 0); 1233 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1234 tsleep(&thr->flags, PINTERLOCKED, "h2idle", hz*30); 1235 } 1236 } 1237 1238 #if 0 1239 /* 1240 * Cleanup / termination 1241 */ 1242 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) { 1243 kprintf("hammer2_thread: aborting xop %s\n", xop->desc->id); 1244 TAILQ_REMOVE(&thr->xopq, xop, 1245 collect[thr->clindex].entry); 1246 hammer2_xop_retire(xop, mask); 1247 } 1248 #endif 1249 thr->td = NULL; 1250 hammer2_thr_signal(thr, HAMMER2_THREAD_STOPPED); 1251 /* thr structure can go invalid after this point */ 1252 } 1253