1 /* 2 * SPDX-License-Identifier: BSD-3-Clause 3 * 4 * Copyright (c) 2022 Tomohiro Kusumi <tkusumi@netbsd.org> 5 * Copyright (c) 2011-2022 The DragonFly Project. All rights reserved. 6 * 7 * This code is derived from software contributed to The DragonFly Project 8 * by Matthew Dillon <dillon@dragonflybsd.org> 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * 1. Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 2. Redistributions in binary form must reproduce the above copyright 17 * notice, this list of conditions and the following disclaimer in 18 * the documentation and/or other materials provided with the 19 * distribution. 20 * 3. Neither the name of The DragonFly Project nor the names of its 21 * contributors may be used to endorse or promote products derived 22 * from this software without specific, prior written permission. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 32 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 33 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 34 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 35 * SUCH DAMAGE. 36 */ 37 /* 38 * This module implements the hammer2 helper thread API, including 39 * the frontend/backend XOP API. 40 */ 41 #include "hammer2.h" 42 43 #define H2XOPDESCRIPTOR(label) \ 44 hammer2_xop_desc_t hammer2_##label##_desc = { \ 45 .storage_func = hammer2_xop_##label, \ 46 .id = #label \ 47 } 48 49 H2XOPDESCRIPTOR(ipcluster); 50 H2XOPDESCRIPTOR(readdir); 51 H2XOPDESCRIPTOR(nresolve); 52 H2XOPDESCRIPTOR(unlink); 53 H2XOPDESCRIPTOR(nrename); 54 H2XOPDESCRIPTOR(scanlhc); 55 H2XOPDESCRIPTOR(scanall); 56 H2XOPDESCRIPTOR(lookup); 57 H2XOPDESCRIPTOR(delete); 58 H2XOPDESCRIPTOR(inode_mkdirent); 59 H2XOPDESCRIPTOR(inode_create); 60 H2XOPDESCRIPTOR(inode_create_det); 61 H2XOPDESCRIPTOR(inode_create_ins); 62 H2XOPDESCRIPTOR(inode_destroy); 63 H2XOPDESCRIPTOR(inode_chain_sync); 64 H2XOPDESCRIPTOR(inode_unlinkall); 65 H2XOPDESCRIPTOR(inode_connect); 66 H2XOPDESCRIPTOR(inode_flush); 67 H2XOPDESCRIPTOR(strategy_read); 68 H2XOPDESCRIPTOR(strategy_write); 69 70 //struct objcache *cache_xops; 71 static struct thread dummy_td; 72 struct thread *curthread = &dummy_td; 73 74 /* 75 * Set flags and wakeup any waiters. 76 * 77 * WARNING! During teardown (thr) can disappear the instant our cmpset 78 * succeeds. 79 */ 80 void 81 hammer2_thr_signal(hammer2_thread_t *thr, uint32_t flags) 82 { 83 uint32_t oflags; 84 uint32_t nflags; 85 86 for (;;) { 87 oflags = thr->flags; 88 cpu_ccfence(); 89 nflags = (oflags | flags) & ~HAMMER2_THREAD_WAITING; 90 91 if (oflags & HAMMER2_THREAD_WAITING) { 92 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 93 wakeup(&thr->flags); 94 break; 95 } 96 } else { 97 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 98 break; 99 } 100 } 101 } 102 103 /* 104 * Set and clear flags and wakeup any waiters. 105 * 106 * WARNING! During teardown (thr) can disappear the instant our cmpset 107 * succeeds. 108 */ 109 void 110 hammer2_thr_signal2(hammer2_thread_t *thr, uint32_t posflags, uint32_t negflags) 111 { 112 uint32_t oflags; 113 uint32_t nflags; 114 115 for (;;) { 116 oflags = thr->flags; 117 cpu_ccfence(); 118 nflags = (oflags | posflags) & 119 ~(negflags | HAMMER2_THREAD_WAITING); 120 if (oflags & HAMMER2_THREAD_WAITING) { 121 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 122 wakeup(&thr->flags); 123 break; 124 } 125 } else { 126 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 127 break; 128 } 129 } 130 } 131 132 /* 133 * Wait until all the bits in flags are set. 134 * 135 * WARNING! During teardown (thr) can disappear the instant our cmpset 136 * succeeds. 137 */ 138 void 139 hammer2_thr_wait(hammer2_thread_t *thr, uint32_t flags) 140 { 141 uint32_t oflags; 142 uint32_t nflags; 143 144 for (;;) { 145 oflags = thr->flags; 146 cpu_ccfence(); 147 if ((oflags & flags) == flags) 148 break; 149 nflags = oflags | HAMMER2_THREAD_WAITING; 150 tsleep_interlock(&thr->flags, 0); 151 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 152 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 153 } 154 } 155 } 156 157 /* 158 * Wait until any of the bits in flags are set, with timeout. 159 * 160 * WARNING! During teardown (thr) can disappear the instant our cmpset 161 * succeeds. 162 */ 163 int 164 hammer2_thr_wait_any(hammer2_thread_t *thr, uint32_t flags, int timo) 165 { 166 uint32_t oflags; 167 uint32_t nflags; 168 int error; 169 170 error = 0; 171 for (;;) { 172 oflags = thr->flags; 173 cpu_ccfence(); 174 if (oflags & flags) 175 break; 176 nflags = oflags | HAMMER2_THREAD_WAITING; 177 tsleep_interlock(&thr->flags, 0); 178 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 179 error = tsleep(&thr->flags, PINTERLOCKED, 180 "h2twait", timo); 181 } 182 if (error == ETIMEDOUT) { 183 error = HAMMER2_ERROR_ETIMEDOUT; 184 break; 185 } 186 } 187 return error; 188 } 189 190 /* 191 * Wait until the bits in flags are clear. 192 * 193 * WARNING! During teardown (thr) can disappear the instant our cmpset 194 * succeeds. 195 */ 196 void 197 hammer2_thr_wait_neg(hammer2_thread_t *thr, uint32_t flags) 198 { 199 uint32_t oflags; 200 uint32_t nflags; 201 202 for (;;) { 203 oflags = thr->flags; 204 cpu_ccfence(); 205 if ((oflags & flags) == 0) 206 break; 207 nflags = oflags | HAMMER2_THREAD_WAITING; 208 tsleep_interlock(&thr->flags, 0); 209 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 210 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 211 } 212 } 213 } 214 215 /* 216 * Initialize the supplied thread structure, starting the specified 217 * thread. 218 * 219 * NOTE: thr structure can be retained across mounts and unmounts for this 220 * pmp, so make sure the flags are in a sane state. 221 */ 222 void 223 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp, 224 hammer2_dev_t *hmp, 225 const char *id, int clindex, int repidx, 226 void (*func)(void *arg)) 227 { 228 thr->pmp = pmp; /* xop helpers */ 229 thr->hmp = hmp; /* bulkfree */ 230 thr->clindex = clindex; 231 thr->repidx = repidx; 232 TAILQ_INIT(&thr->xopq); 233 atomic_clear_int(&thr->flags, HAMMER2_THREAD_STOP | 234 HAMMER2_THREAD_STOPPED | 235 HAMMER2_THREAD_FREEZE | 236 HAMMER2_THREAD_FROZEN); 237 if (thr->scratch == NULL) 238 thr->scratch = kmalloc(MAXPHYS, M_HAMMER2, M_WAITOK | M_ZERO); 239 #if 0 240 if (repidx >= 0) { 241 lwkt_create(func, thr, &thr->td, NULL, 0, repidx % ncpus, 242 "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx); 243 } else if (pmp) { 244 lwkt_create(func, thr, &thr->td, NULL, 0, -1, 245 "%s-%s", id, pmp->pfs_names[clindex]); 246 } else { 247 lwkt_create(func, thr, &thr->td, NULL, 0, -1, "%s", id); 248 } 249 #else 250 thr->td = &dummy_td; 251 #endif 252 } 253 254 /* 255 * Terminate a thread. This function will silently return if the thread 256 * was never initialized or has already been deleted. 257 * 258 * This is accomplished by setting the STOP flag and waiting for the td 259 * structure to become NULL. 260 */ 261 void 262 hammer2_thr_delete(hammer2_thread_t *thr) 263 { 264 if (thr->td == NULL) 265 return; 266 hammer2_thr_signal(thr, HAMMER2_THREAD_STOP); 267 /* Don't wait, there's no such thread in makefs */ 268 //hammer2_thr_wait(thr, HAMMER2_THREAD_STOPPED); 269 thr->pmp = NULL; 270 if (thr->scratch) { 271 kfree(thr->scratch, M_HAMMER2); 272 thr->scratch = NULL; 273 } 274 KKASSERT(TAILQ_EMPTY(&thr->xopq)); 275 } 276 277 /* 278 * Asynchronous remaster request. Ask the synchronization thread to 279 * start over soon (as if it were frozen and unfrozen, but without waiting). 280 * The thread always recalculates mastership relationships when restarting. 281 */ 282 void 283 hammer2_thr_remaster(hammer2_thread_t *thr) 284 { 285 if (thr->td == NULL) 286 return; 287 hammer2_thr_signal(thr, HAMMER2_THREAD_REMASTER); 288 } 289 290 void 291 hammer2_thr_freeze_async(hammer2_thread_t *thr) 292 { 293 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 294 } 295 296 void 297 hammer2_thr_freeze(hammer2_thread_t *thr) 298 { 299 if (thr->td == NULL) 300 return; 301 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 302 hammer2_thr_wait(thr, HAMMER2_THREAD_FROZEN); 303 } 304 305 void 306 hammer2_thr_unfreeze(hammer2_thread_t *thr) 307 { 308 if (thr->td == NULL) 309 return; 310 hammer2_thr_signal(thr, HAMMER2_THREAD_UNFREEZE); 311 hammer2_thr_wait_neg(thr, HAMMER2_THREAD_FROZEN); 312 } 313 314 int 315 hammer2_thr_break(hammer2_thread_t *thr) 316 { 317 if (thr->flags & (HAMMER2_THREAD_STOP | 318 HAMMER2_THREAD_REMASTER | 319 HAMMER2_THREAD_FREEZE)) { 320 return 1; 321 } 322 return 0; 323 } 324 325 /**************************************************************************** 326 * HAMMER2 XOPS API * 327 ****************************************************************************/ 328 329 /* 330 * Allocate or reallocate XOP FIFO. This doesn't exist in sys/vfs/hammer2 331 * where XOP is handled by dedicated kernel threads and when FIFO stalls 332 * threads wait for frontend to collect results. 333 */ 334 static void 335 hammer2_xop_fifo_alloc(hammer2_xop_fifo_t *fifo, size_t nmemb) 336 { 337 size_t size; 338 339 /* Assert nmemb requirements. */ 340 KKASSERT((nmemb & (nmemb - 1)) == 0); 341 KKASSERT(nmemb >= HAMMER2_XOPFIFO); 342 343 /* malloc or realloc fifo array. */ 344 size = nmemb * sizeof(hammer2_chain_t *); 345 if (!fifo->array) 346 fifo->array = kmalloc(size, M_HAMMER2, M_WAITOK | M_ZERO); 347 else 348 fifo->array = krealloc(fifo->array, size, M_HAMMER2, 349 M_WAITOK | M_ZERO); 350 KKASSERT(fifo->array); 351 352 /* malloc or realloc fifo errors. */ 353 size = nmemb * sizeof(int); 354 if (!fifo->errors) 355 fifo->errors = kmalloc(size, M_HAMMER2, M_WAITOK | M_ZERO); 356 else 357 fifo->errors = krealloc(fifo->errors, size, M_HAMMER2, 358 M_WAITOK | M_ZERO); 359 KKASSERT(fifo->errors); 360 } 361 362 /* 363 * Allocate a XOP request. 364 * 365 * Once allocated a XOP request can be started, collected, and retired, 366 * and can be retired early if desired. 367 * 368 * NOTE: Fifo indices might not be zero but ri == wi on objcache_get(). 369 */ 370 void * 371 hammer2_xop_alloc(hammer2_inode_t *ip, int flags) 372 { 373 hammer2_xop_t *xop; 374 375 xop = ecalloc(1, sizeof(*xop)); 376 KKASSERT(xop->head.cluster.array[0].chain == NULL); 377 378 xop->head.ip1 = ip; 379 xop->head.desc = NULL; 380 xop->head.flags = flags; 381 xop->head.state = 0; 382 xop->head.error = 0; 383 xop->head.collect_key = 0; 384 xop->head.focus_dio = NULL; 385 386 if (flags & HAMMER2_XOP_MODIFYING) 387 xop->head.mtid = hammer2_trans_sub(ip->pmp); 388 else 389 xop->head.mtid = 0; 390 391 xop->head.cluster.nchains = ip->cluster.nchains; 392 xop->head.cluster.pmp = ip->pmp; 393 xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED; 394 395 /* 396 * run_mask - Active thread (or frontend) associated with XOP 397 */ 398 xop->head.run_mask = HAMMER2_XOPMASK_VOP; 399 400 hammer2_xop_fifo_t *fifo = &xop->head.collect[0]; 401 xop->head.fifo_size = HAMMER2_XOPFIFO; 402 hammer2_xop_fifo_alloc(fifo, xop->head.fifo_size); 403 404 hammer2_inode_ref(ip); 405 406 return xop; 407 } 408 409 void 410 hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len) 411 { 412 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 413 xop->name1_len = name_len; 414 bcopy(name, xop->name1, name_len); 415 } 416 417 void 418 hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len) 419 { 420 xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 421 xop->name2_len = name_len; 422 bcopy(name, xop->name2, name_len); 423 } 424 425 size_t 426 hammer2_xop_setname_inum(hammer2_xop_head_t *xop, hammer2_key_t inum) 427 { 428 const size_t name_len = 18; 429 430 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 431 xop->name1_len = name_len; 432 ksnprintf(xop->name1, name_len + 1, "0x%016jx", (intmax_t)inum); 433 434 return name_len; 435 } 436 437 438 void 439 hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2) 440 { 441 xop->ip2 = ip2; 442 hammer2_inode_ref(ip2); 443 } 444 445 void 446 hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3) 447 { 448 xop->ip3 = ip3; 449 hammer2_inode_ref(ip3); 450 } 451 452 void 453 hammer2_xop_setip4(hammer2_xop_head_t *xop, hammer2_inode_t *ip4) 454 { 455 xop->ip4 = ip4; 456 hammer2_inode_ref(ip4); 457 } 458 459 void 460 hammer2_xop_reinit(hammer2_xop_head_t *xop) 461 { 462 xop->state = 0; 463 xop->error = 0; 464 xop->collect_key = 0; 465 xop->run_mask = HAMMER2_XOPMASK_VOP; 466 } 467 468 /* 469 * A mounted PFS needs Xops threads to support frontend operations. 470 */ 471 void 472 hammer2_xop_helper_create(hammer2_pfs_t *pmp) 473 { 474 int i; 475 int j; 476 477 lockmgr(&pmp->lock, LK_EXCLUSIVE); 478 pmp->has_xop_threads = 1; 479 480 pmp->xop_groups = kmalloc(hammer2_xop_nthreads * 481 sizeof(hammer2_xop_group_t), 482 M_HAMMER2, M_WAITOK | M_ZERO); 483 for (i = 0; i < pmp->iroot->cluster.nchains; ++i) { 484 for (j = 0; j < hammer2_xop_nthreads; ++j) { 485 if (pmp->xop_groups[j].thrs[i].td) 486 continue; 487 hammer2_thr_create(&pmp->xop_groups[j].thrs[i], 488 pmp, NULL, 489 "h2xop", i, j, 490 hammer2_primary_xops_thread); 491 } 492 } 493 lockmgr(&pmp->lock, LK_RELEASE); 494 } 495 496 void 497 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp) 498 { 499 int i; 500 int j; 501 502 if (pmp->xop_groups == NULL) { 503 KKASSERT(pmp->has_xop_threads == 0); 504 return; 505 } 506 507 for (i = 0; i < pmp->pfs_nmasters; ++i) { 508 for (j = 0; j < hammer2_xop_nthreads; ++j) { 509 if (pmp->xop_groups[j].thrs[i].td) 510 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]); 511 } 512 } 513 pmp->has_xop_threads = 0; 514 kfree(pmp->xop_groups, M_HAMMER2); 515 pmp->xop_groups = NULL; 516 } 517 518 /* 519 * Start a XOP request, queueing it to all nodes in the cluster to 520 * execute the cluster op. 521 * 522 * XXX optimize single-target case. 523 */ 524 void 525 hammer2_xop_start_except(hammer2_xop_head_t *xop, hammer2_xop_desc_t *desc, 526 int notidx) 527 { 528 hammer2_inode_t *ip1; 529 hammer2_pfs_t *pmp; 530 hammer2_thread_t *thr; 531 int i; 532 int ng; 533 int nchains; 534 535 ip1 = xop->ip1; 536 pmp = ip1->pmp; 537 if (pmp->has_xop_threads == 0) 538 hammer2_xop_helper_create(pmp); 539 540 /* 541 * The sequencer assigns a worker thread to the XOP. 542 * 543 * (1) The worker threads are partitioned into two sets, one for 544 * NON-STRATEGY XOPs, and the other for STRATEGY XOPs. This 545 * guarantees that strategy calls will always be able to make 546 * progress and will not deadlock against non-strategy calls. 547 * 548 * (2) If clustered, non-strategy operations to the same inode must 549 * be serialized. This is to avoid confusion when issuing 550 * modifying operations because a XOP completes the instant a 551 * quorum is reached. 552 * 553 * TODO - RENAME fails here because it is potentially modifying 554 * three different inodes, but we triple-lock the inodes 555 * involved so it shouldn't create a sequencing schism. 556 */ 557 if (xop->flags & HAMMER2_XOP_STRATEGY) { 558 /* 559 * Use worker space 0 associated with the current cpu 560 * for strategy ops. 561 */ 562 /* 563 hammer2_xop_strategy_t *xopst; 564 u_int which; 565 566 xopst = &((hammer2_xop_t *)xop)->xop_strategy; 567 which = ((unsigned int)ip1->ihash + 568 ((unsigned int)xopst->lbase >> HAMMER2_PBUFRADIX)) % 569 hammer2_xop_sgroups; 570 ng = mycpu->gd_cpuid % hammer2_xop_mod + 571 hammer2_xop_mod * which; 572 */ 573 ng = 0; 574 } else if (hammer2_spread_workers == 0 && ip1->cluster.nchains == 1) { 575 /* 576 * For now try to keep the work on the same cpu to reduce 577 * IPI overhead. Several threads are assigned to each cpu, 578 * don't be very smart and select the one to use based on 579 * the inode hash. 580 */ 581 /* 582 u_int which; 583 584 which = (unsigned int)ip1->ihash % hammer2_xop_xgroups; 585 ng = mycpu->gd_cpuid % hammer2_xop_mod + 586 (which * hammer2_xop_mod) + 587 hammer2_xop_xbase; 588 */ 589 ng = 0; 590 } else { 591 /* 592 * Hash based on inode only, must serialize inode to same 593 * thread regardless of current cpu. 594 */ 595 /* 596 ng = (unsigned int)ip1->ihash % 597 (hammer2_xop_mod * hammer2_xop_xgroups) + 598 hammer2_xop_xbase; 599 */ 600 ng = 0; 601 } 602 xop->desc = desc; 603 604 /* 605 * The instant xop is queued another thread can pick it off. In the 606 * case of asynchronous ops, another thread might even finish and 607 * deallocate it. 608 */ 609 hammer2_spin_ex(&pmp->xop_spin); 610 nchains = ip1->cluster.nchains; 611 for (i = 0; i < nchains; ++i) { 612 /* 613 * XXX ip1->cluster.array* not stable here. This temporary 614 * hack fixes basic issues in target XOPs which need to 615 * obtain a starting chain from the inode but does not 616 * address possible races against inode updates which 617 * might NULL-out a chain. 618 */ 619 if (i != notidx && ip1->cluster.array[i].chain) { 620 thr = &pmp->xop_groups[ng].thrs[i]; 621 atomic_set_64(&xop->run_mask, 1LLU << i); 622 atomic_set_64(&xop->chk_mask, 1LLU << i); 623 xop->collect[i].thr = thr; 624 TAILQ_INSERT_TAIL(&thr->xopq, xop, collect[i].entry); 625 } 626 } 627 hammer2_spin_unex(&pmp->xop_spin); 628 /* xop can become invalid at this point */ 629 630 /* 631 * Each thread has its own xopq 632 */ 633 for (i = 0; i < nchains; ++i) { 634 if (i != notidx) { 635 thr = &pmp->xop_groups[ng].thrs[i]; 636 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 637 hammer2_primary_xops_thread(thr); 638 } 639 } 640 } 641 642 void 643 hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_desc_t *desc) 644 { 645 hammer2_xop_start_except(xop, desc, -1); 646 } 647 648 /* 649 * Retire a XOP. Used by both the VOP frontend and by the XOP backend. 650 */ 651 void 652 hammer2_xop_retire(hammer2_xop_head_t *xop, uint64_t mask) 653 { 654 hammer2_chain_t *chain; 655 uint64_t nmask; 656 int i; 657 658 /* 659 * Remove the frontend collector or remove a backend feeder. 660 * 661 * When removing the frontend we must wakeup any backend feeders 662 * who are waiting for FIFO space. 663 * 664 * When removing the last backend feeder we must wakeup any waiting 665 * frontend. 666 */ 667 KKASSERT(xop->run_mask & mask); 668 nmask = atomic_fetchadd_64(&xop->run_mask, 669 -mask + HAMMER2_XOPMASK_FEED); 670 671 /* 672 * More than one entity left 673 */ 674 if ((nmask & HAMMER2_XOPMASK_ALLDONE) != mask) { 675 /* 676 * Frontend terminating, wakeup any backends waiting on 677 * fifo full. 678 * 679 * NOTE!!! The xop can get ripped out from under us at 680 * this point, so do not reference it again. 681 * The wakeup(xop) doesn't touch the xop and 682 * is ok. 683 */ 684 if (mask == HAMMER2_XOPMASK_VOP) { 685 if (nmask & HAMMER2_XOPMASK_FIFOW) 686 wakeup(xop); 687 } 688 689 /* 690 * Wakeup frontend if the last backend is terminating. 691 */ 692 nmask -= mask; 693 if ((nmask & HAMMER2_XOPMASK_ALLDONE) == HAMMER2_XOPMASK_VOP) { 694 if (nmask & HAMMER2_XOPMASK_WAIT) 695 wakeup(xop); 696 } 697 698 return; 699 } 700 /* else nobody else left, we can ignore FIFOW */ 701 702 /* 703 * All collectors are gone, we can cleanup and dispose of the XOP. 704 * Note that this can wind up being a frontend OR a backend. 705 * Pending chains are locked shared and not owned by any thread. 706 */ 707 708 /* 709 * Cleanup the xop's cluster. If there is an inode reference, 710 * cache the cluster chains in the inode to improve performance, 711 * preventing them from recursively destroying the chain recursion. 712 * 713 * Note that ip->ccache[i] does NOT necessarily represent usable 714 * chains or chains that are related to the inode. The chains are 715 * simply held to prevent bottom-up lastdrop destruction of 716 * potentially valuable resolved chain data. 717 */ 718 if (xop->ip1) { 719 /* 720 * Cache cluster chains in a convenient inode. The chains 721 * are cache ref'd but not held. The inode simply serves 722 * as a place to cache the chains to prevent the chains 723 * from being cleaned up. 724 */ 725 hammer2_chain_t *dropch[HAMMER2_MAXCLUSTER]; 726 hammer2_inode_t *ip; 727 int prior_nchains; 728 729 ip = xop->ip1; 730 hammer2_spin_ex(&ip->cluster_spin); 731 prior_nchains = ip->ccache_nchains; 732 for (i = 0; i < prior_nchains; ++i) { 733 dropch[i] = ip->ccache[i].chain; 734 ip->ccache[i].chain = NULL; 735 } 736 for (i = 0; i < xop->cluster.nchains; ++i) { 737 ip->ccache[i] = xop->cluster.array[i]; 738 if (ip->ccache[i].chain) 739 hammer2_chain_ref(ip->ccache[i].chain); 740 } 741 ip->ccache_nchains = i; 742 hammer2_spin_unex(&ip->cluster_spin); 743 744 /* 745 * Drop prior cache 746 */ 747 for (i = 0; i < prior_nchains; ++i) { 748 chain = dropch[i]; 749 if (chain) 750 hammer2_chain_drop(chain); 751 } 752 } 753 754 /* 755 * Drop and unhold chains in xop cluster 756 */ 757 for (i = 0; i < xop->cluster.nchains; ++i) { 758 xop->cluster.array[i].flags = 0; 759 chain = xop->cluster.array[i].chain; 760 if (chain) { 761 xop->cluster.array[i].chain = NULL; 762 hammer2_chain_drop_unhold(chain); 763 } 764 } 765 766 /* 767 * Cleanup the fifos. Since we are the only entity left on this 768 * xop we don't have to worry about fifo flow control, and one 769 * lfence() will do the job. 770 */ 771 cpu_lfence(); 772 mask = xop->chk_mask; 773 for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) { 774 hammer2_xop_fifo_t *fifo = &xop->collect[i]; 775 while (fifo->ri != fifo->wi) { 776 chain = fifo->array[fifo->ri & fifo_mask(xop)]; 777 if (chain) 778 hammer2_chain_drop_unhold(chain); 779 ++fifo->ri; 780 } 781 mask &= ~(1U << i); 782 } 783 784 /* 785 * The inode is only held at this point, simply drop it. 786 */ 787 if (xop->ip1) { 788 hammer2_inode_drop(xop->ip1); 789 xop->ip1 = NULL; 790 } 791 if (xop->ip2) { 792 hammer2_inode_drop(xop->ip2); 793 xop->ip2 = NULL; 794 } 795 if (xop->ip3) { 796 hammer2_inode_drop(xop->ip3); 797 xop->ip3 = NULL; 798 } 799 if (xop->ip4) { 800 hammer2_inode_drop(xop->ip4); 801 xop->ip4 = NULL; 802 } 803 if (xop->name1) { 804 kfree(xop->name1, M_HAMMER2); 805 xop->name1 = NULL; 806 xop->name1_len = 0; 807 } 808 if (xop->name2) { 809 kfree(xop->name2, M_HAMMER2); 810 xop->name2 = NULL; 811 xop->name2_len = 0; 812 } 813 814 for (i = 0; i < xop->cluster.nchains; ++i) { 815 kfree(xop->collect[i].array, M_HAMMER2); 816 kfree(xop->collect[i].errors, M_HAMMER2); 817 } 818 819 free(xop); 820 } 821 822 /* 823 * (Backend) Returns non-zero if the frontend is still attached. 824 */ 825 int 826 hammer2_xop_active(hammer2_xop_head_t *xop) 827 { 828 if (xop->run_mask & HAMMER2_XOPMASK_VOP) 829 return 1; 830 else 831 return 0; 832 } 833 834 /* 835 * (Backend) Feed chain data through the cluster validator and back to 836 * the frontend. Chains are fed from multiple nodes concurrently 837 * and pipelined via per-node FIFOs in the XOP. 838 * 839 * The chain must be locked (either shared or exclusive). The caller may 840 * unlock and drop the chain on return. This function will add an extra 841 * ref and hold the chain's data for the pass-back. 842 * 843 * No xop lock is needed because we are only manipulating fields under 844 * our direct control. 845 * 846 * Returns 0 on success and a hammer2 error code if sync is permanently 847 * lost. The caller retains a ref on the chain but by convention 848 * the lock is typically inherited by the xop (caller loses lock). 849 * 850 * Returns non-zero on error. In this situation the caller retains a 851 * ref on the chain but loses the lock (we unlock here). 852 */ 853 int 854 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain, 855 int clindex, int error) 856 { 857 hammer2_xop_fifo_t *fifo; 858 uint64_t mask; 859 860 /* 861 * Early termination (typicaly of xop_readir) 862 */ 863 if (hammer2_xop_active(xop) == 0) { 864 error = HAMMER2_ERROR_ABORTED; 865 goto done; 866 } 867 868 /* 869 * Multi-threaded entry into the XOP collector. We own the 870 * fifo->wi for our clindex. 871 */ 872 fifo = &xop->collect[clindex]; 873 874 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) 875 lwkt_yield(); 876 while (fifo->ri == fifo->wi - xop->fifo_size) { 877 atomic_set_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 878 mask = xop->run_mask; 879 if ((mask & HAMMER2_XOPMASK_VOP) == 0) { 880 error = HAMMER2_ERROR_ABORTED; 881 goto done; 882 } 883 xop->fifo_size *= 2; 884 hammer2_xop_fifo_alloc(fifo, xop->fifo_size); 885 } 886 atomic_clear_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 887 if (chain) 888 hammer2_chain_ref_hold(chain); 889 if (error == 0 && chain) 890 error = chain->error; 891 fifo->errors[fifo->wi & fifo_mask(xop)] = error; 892 fifo->array[fifo->wi & fifo_mask(xop)] = chain; 893 cpu_sfence(); 894 ++fifo->wi; 895 896 mask = atomic_fetchadd_64(&xop->run_mask, HAMMER2_XOPMASK_FEED); 897 if (mask & HAMMER2_XOPMASK_WAIT) { 898 atomic_clear_64(&xop->run_mask, HAMMER2_XOPMASK_WAIT); 899 wakeup(xop); 900 } 901 error = 0; 902 903 /* 904 * Cleanup. If no error 905 * occurred the fifo inherits the lock and gains an additional ref. 906 * 907 * The caller's ref remains in both cases. 908 */ 909 done: 910 return error; 911 } 912 913 /* 914 * (Frontend) collect a response from a running cluster op. 915 * 916 * Responses are fed from all appropriate nodes concurrently 917 * and collected into a cohesive response >= collect_key. 918 * 919 * The collector will return the instant quorum or other requirements 920 * are met, even if some nodes get behind or become non-responsive. 921 * 922 * HAMMER2_XOP_COLLECT_NOWAIT - Used to 'poll' a completed collection, 923 * usually called synchronously from the 924 * node XOPs for the strategy code to 925 * fake the frontend collection and complete 926 * the BIO as soon as possible. 927 * 928 * Returns 0 on success plus a filled out xop->cluster structure. 929 * Return ENOENT on normal termination. 930 * Otherwise return an error. 931 * 932 * WARNING! If the xop returns a cluster with a non-NULL focus, note that 933 * none of the chains in the cluster (or the focus) are either 934 * locked or I/O synchronized with the cpu. hammer2_xop_gdata() 935 * and hammer2_xop_pdata() must be used to safely access the focus 936 * chain's content. 937 * 938 * The frontend can make certain assumptions based on higher-level 939 * locking done by the frontend, but data integrity absolutely 940 * requires using the gdata/pdata API. 941 */ 942 int 943 hammer2_xop_collect(hammer2_xop_head_t *xop, int flags) 944 { 945 hammer2_xop_fifo_t *fifo; 946 hammer2_chain_t *chain; 947 hammer2_key_t lokey; 948 uint64_t mask; 949 int error; 950 int keynull; 951 int adv; /* advance the element */ 952 int i; 953 954 loop: 955 /* 956 * First loop tries to advance pieces of the cluster which 957 * are out of sync. 958 */ 959 lokey = HAMMER2_KEY_MAX; 960 keynull = HAMMER2_CHECK_NULL; 961 mask = xop->run_mask; 962 cpu_lfence(); 963 964 for (i = 0; i < xop->cluster.nchains; ++i) { 965 chain = xop->cluster.array[i].chain; 966 if (chain == NULL) { 967 adv = 1; 968 } else if (chain->bref.key < xop->collect_key) { 969 adv = 1; 970 } else { 971 keynull &= ~HAMMER2_CHECK_NULL; 972 if (lokey > chain->bref.key) 973 lokey = chain->bref.key; 974 adv = 0; 975 } 976 if (adv == 0) 977 continue; 978 979 /* 980 * Advance element if possible, advanced element may be NULL. 981 */ 982 if (chain) 983 hammer2_chain_drop_unhold(chain); 984 985 fifo = &xop->collect[i]; 986 if (fifo->ri != fifo->wi) { 987 cpu_lfence(); 988 chain = fifo->array[fifo->ri & fifo_mask(xop)]; 989 error = fifo->errors[fifo->ri & fifo_mask(xop)]; 990 ++fifo->ri; 991 xop->cluster.array[i].chain = chain; 992 xop->cluster.array[i].error = error; 993 if (chain == NULL) { 994 /* XXX */ 995 xop->cluster.array[i].flags |= 996 HAMMER2_CITEM_NULL; 997 } 998 if (fifo->wi - fifo->ri <= HAMMER2_XOPFIFO / 2) { 999 if (fifo->flags & HAMMER2_XOP_FIFO_STALL) { 1000 atomic_clear_int(&fifo->flags, 1001 HAMMER2_XOP_FIFO_STALL); 1002 wakeup(xop); 1003 lwkt_yield(); 1004 } 1005 } 1006 --i; /* loop on same index */ 1007 } else { 1008 /* 1009 * Retain CITEM_NULL flag. If set just repeat EOF. 1010 * If not, the NULL,0 combination indicates an 1011 * operation in-progress. 1012 */ 1013 xop->cluster.array[i].chain = NULL; 1014 /* retain any CITEM_NULL setting */ 1015 } 1016 } 1017 1018 /* 1019 * Determine whether the lowest collected key meets clustering 1020 * requirements. Returns HAMMER2_ERROR_*: 1021 * 1022 * 0 - key valid, cluster can be returned. 1023 * 1024 * ENOENT - normal end of scan, return ENOENT. 1025 * 1026 * ESRCH - sufficient elements collected, quorum agreement 1027 * that lokey is not a valid element and should be 1028 * skipped. 1029 * 1030 * EDEADLK - sufficient elements collected, no quorum agreement 1031 * (and no agreement possible). In this situation a 1032 * repair is needed, for now we loop. 1033 * 1034 * EINPROGRESS - insufficient elements collected to resolve, wait 1035 * for event and loop. 1036 * 1037 * EIO - IO error or CRC check error from hammer2_cluster_check() 1038 */ 1039 if ((flags & HAMMER2_XOP_COLLECT_WAITALL) && 1040 (mask & HAMMER2_XOPMASK_ALLDONE) != HAMMER2_XOPMASK_VOP) { 1041 error = HAMMER2_ERROR_EINPROGRESS; 1042 } else { 1043 error = hammer2_cluster_check(&xop->cluster, lokey, keynull); 1044 } 1045 if (error == HAMMER2_ERROR_EINPROGRESS) { 1046 if (flags & HAMMER2_XOP_COLLECT_NOWAIT) 1047 goto done; 1048 tsleep_interlock(xop, 0); 1049 if (atomic_cmpset_64(&xop->run_mask, 1050 mask, mask | HAMMER2_XOPMASK_WAIT)) { 1051 tsleep(xop, PINTERLOCKED, "h2coll", hz*60); 1052 } 1053 goto loop; 1054 } 1055 if (error == HAMMER2_ERROR_ESRCH) { 1056 if (lokey != HAMMER2_KEY_MAX) { 1057 xop->collect_key = lokey + 1; 1058 goto loop; 1059 } 1060 error = HAMMER2_ERROR_ENOENT; 1061 } 1062 if (error == HAMMER2_ERROR_EDEADLK) { 1063 kprintf("hammer2: no quorum possible lokey %016jx\n", 1064 lokey); 1065 if (lokey != HAMMER2_KEY_MAX) { 1066 xop->collect_key = lokey + 1; 1067 goto loop; 1068 } 1069 error = HAMMER2_ERROR_ENOENT; 1070 } 1071 if (lokey == HAMMER2_KEY_MAX) 1072 xop->collect_key = lokey; 1073 else 1074 xop->collect_key = lokey + 1; 1075 done: 1076 return error; 1077 } 1078 1079 /* 1080 * N x M processing threads are available to handle XOPs, N per cluster 1081 * index x M cluster nodes. 1082 * 1083 * Locate and return the next runnable xop, or NULL if no xops are 1084 * present or none of the xops are currently runnable (for various reasons). 1085 * The xop is left on the queue and serves to block other dependent xops 1086 * from being run. 1087 * 1088 * Dependent xops will not be returned. 1089 * 1090 * Sets HAMMER2_XOP_FIFO_RUN on the returned xop or returns NULL. 1091 * 1092 * NOTE! Xops run concurrently for each cluster index. 1093 */ 1094 #define XOP_HASH_SIZE 16 1095 #define XOP_HASH_MASK (XOP_HASH_SIZE - 1) 1096 1097 static __inline 1098 int 1099 xop_testhash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 1100 { 1101 uint32_t mask; 1102 int hv; 1103 1104 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 1105 mask = 1U << (hv & 31); 1106 hv >>= 5; 1107 1108 return ((int)(hash[hv & XOP_HASH_MASK] & mask)); 1109 } 1110 1111 static __inline 1112 void 1113 xop_sethash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 1114 { 1115 uint32_t mask; 1116 int hv; 1117 1118 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 1119 mask = 1U << (hv & 31); 1120 hv >>= 5; 1121 1122 hash[hv & XOP_HASH_MASK] |= mask; 1123 } 1124 1125 static 1126 hammer2_xop_head_t * 1127 hammer2_xop_next(hammer2_thread_t *thr) 1128 { 1129 hammer2_pfs_t *pmp = thr->pmp; 1130 int clindex = thr->clindex; 1131 uint32_t hash[XOP_HASH_SIZE] = { 0 }; 1132 hammer2_xop_head_t *xop; 1133 1134 hammer2_spin_ex(&pmp->xop_spin); 1135 TAILQ_FOREACH(xop, &thr->xopq, collect[clindex].entry) { 1136 /* 1137 * Check dependency 1138 */ 1139 if (xop_testhash(thr, xop->ip1, hash) || 1140 (xop->ip2 && xop_testhash(thr, xop->ip2, hash)) || 1141 (xop->ip3 && xop_testhash(thr, xop->ip3, hash)) || 1142 (xop->ip4 && xop_testhash(thr, xop->ip4, hash))) 1143 { 1144 continue; 1145 } 1146 xop_sethash(thr, xop->ip1, hash); 1147 if (xop->ip2) 1148 xop_sethash(thr, xop->ip2, hash); 1149 if (xop->ip3) 1150 xop_sethash(thr, xop->ip3, hash); 1151 if (xop->ip4) 1152 xop_sethash(thr, xop->ip4, hash); 1153 1154 /* 1155 * Check already running 1156 */ 1157 if (xop->collect[clindex].flags & HAMMER2_XOP_FIFO_RUN) 1158 continue; 1159 1160 /* 1161 * Found a good one, return it. 1162 */ 1163 atomic_set_int(&xop->collect[clindex].flags, 1164 HAMMER2_XOP_FIFO_RUN); 1165 break; 1166 } 1167 hammer2_spin_unex(&pmp->xop_spin); 1168 1169 return xop; 1170 } 1171 1172 /* 1173 * Remove the completed XOP from the queue, clear HAMMER2_XOP_FIFO_RUN. 1174 * 1175 * NOTE! Xops run concurrently for each cluster index. 1176 */ 1177 static 1178 void 1179 hammer2_xop_dequeue(hammer2_thread_t *thr, hammer2_xop_head_t *xop) 1180 { 1181 hammer2_pfs_t *pmp = thr->pmp; 1182 int clindex = thr->clindex; 1183 1184 hammer2_spin_ex(&pmp->xop_spin); 1185 TAILQ_REMOVE(&thr->xopq, xop, collect[clindex].entry); 1186 atomic_clear_int(&xop->collect[clindex].flags, 1187 HAMMER2_XOP_FIFO_RUN); 1188 hammer2_spin_unex(&pmp->xop_spin); 1189 if (TAILQ_FIRST(&thr->xopq)) 1190 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 1191 } 1192 1193 /* 1194 * Primary management thread for xops support. Each node has several such 1195 * threads which replicate front-end operations on cluster nodes. 1196 * 1197 * XOPS thread node operations, allowing the function to focus on a single 1198 * node in the cluster after validating the operation with the cluster. 1199 * This is primarily what prevents dead or stalled nodes from stalling 1200 * the front-end. 1201 */ 1202 void 1203 hammer2_primary_xops_thread(void *arg) 1204 { 1205 hammer2_thread_t *thr = arg; 1206 hammer2_xop_head_t *xop; 1207 uint64_t mask; 1208 uint32_t flags; 1209 uint32_t nflags; 1210 1211 mask = 1LLU << thr->clindex; 1212 1213 for (;;) { 1214 flags = thr->flags; 1215 1216 /* 1217 * Handle stop request 1218 */ 1219 if (flags & HAMMER2_THREAD_STOP) 1220 break; 1221 1222 /* 1223 * Handle freeze request 1224 */ 1225 if (flags & HAMMER2_THREAD_FREEZE) { 1226 hammer2_thr_signal2(thr, HAMMER2_THREAD_FROZEN, 1227 HAMMER2_THREAD_FREEZE); 1228 continue; 1229 } 1230 1231 if (flags & HAMMER2_THREAD_UNFREEZE) { 1232 hammer2_thr_signal2(thr, 0, 1233 HAMMER2_THREAD_FROZEN | 1234 HAMMER2_THREAD_UNFREEZE); 1235 continue; 1236 } 1237 1238 /* 1239 * Force idle if frozen until unfrozen or stopped. 1240 */ 1241 if (flags & HAMMER2_THREAD_FROZEN) { 1242 hammer2_thr_wait_any(thr, 1243 HAMMER2_THREAD_UNFREEZE | 1244 HAMMER2_THREAD_STOP, 1245 0); 1246 continue; 1247 } 1248 1249 /* 1250 * Reset state on REMASTER request 1251 */ 1252 if (flags & HAMMER2_THREAD_REMASTER) { 1253 hammer2_thr_signal2(thr, 0, HAMMER2_THREAD_REMASTER); 1254 /* reset state here */ 1255 continue; 1256 } 1257 1258 /* 1259 * Process requests. Each request can be multi-queued. 1260 * 1261 * If we get behind and the frontend VOP is no longer active, 1262 * we retire the request without processing it. The callback 1263 * may also abort processing if the frontend VOP becomes 1264 * inactive. 1265 */ 1266 if (flags & HAMMER2_THREAD_XOPQ) { 1267 nflags = flags & ~HAMMER2_THREAD_XOPQ; 1268 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 1269 continue; 1270 flags = nflags; 1271 /* fall through */ 1272 } 1273 while ((xop = hammer2_xop_next(thr)) != NULL) { 1274 if (hammer2_xop_active(xop)) { 1275 xop->desc->storage_func((hammer2_xop_t *)xop, 1276 thr->scratch, 1277 thr->clindex); 1278 hammer2_xop_dequeue(thr, xop); 1279 hammer2_xop_retire(xop, mask); 1280 } else { 1281 hammer2_xop_feed(xop, NULL, thr->clindex, 1282 ECONNABORTED); 1283 hammer2_xop_dequeue(thr, xop); 1284 hammer2_xop_retire(xop, mask); 1285 } 1286 } 1287 1288 /* Don't wait, this is a XOP caller thread in makefs */ 1289 break; 1290 1291 /* 1292 * Wait for event, interlock using THREAD_WAITING and 1293 * THREAD_SIGNAL. 1294 * 1295 * For robustness poll on a 30-second interval, but nominally 1296 * expect to be woken up. 1297 */ 1298 nflags = flags | HAMMER2_THREAD_WAITING; 1299 1300 tsleep_interlock(&thr->flags, 0); 1301 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1302 tsleep(&thr->flags, PINTERLOCKED, "h2idle", hz*30); 1303 } 1304 } 1305 1306 #if 0 1307 /* 1308 * Cleanup / termination 1309 */ 1310 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) { 1311 kprintf("hammer2_thread: aborting xop %s\n", xop->desc->id); 1312 TAILQ_REMOVE(&thr->xopq, xop, 1313 collect[thr->clindex].entry); 1314 hammer2_xop_retire(xop, mask); 1315 } 1316 #endif 1317 thr->td = NULL; 1318 hammer2_thr_signal(thr, HAMMER2_THREAD_STOPPED); 1319 /* thr structure can go invalid after this point */ 1320 } 1321