1 /* 2 * Copyright (c) 2015-2018 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 /* 35 * This module implements the hammer2 helper thread API, including 36 * the frontend/backend XOP API. 37 */ 38 #include "hammer2.h" 39 40 #define H2XOPDESCRIPTOR(label) \ 41 hammer2_xop_desc_t hammer2_##label##_desc = { \ 42 .storage_func = hammer2_xop_##label, \ 43 .id = #label \ 44 }; \ 45 46 H2XOPDESCRIPTOR(ipcluster); 47 H2XOPDESCRIPTOR(readdir); 48 H2XOPDESCRIPTOR(nresolve); 49 H2XOPDESCRIPTOR(unlink); 50 H2XOPDESCRIPTOR(nrename); 51 H2XOPDESCRIPTOR(scanlhc); 52 H2XOPDESCRIPTOR(scanall); 53 H2XOPDESCRIPTOR(lookup); 54 H2XOPDESCRIPTOR(delete); 55 H2XOPDESCRIPTOR(inode_mkdirent); 56 H2XOPDESCRIPTOR(inode_create); 57 H2XOPDESCRIPTOR(inode_destroy); 58 H2XOPDESCRIPTOR(inode_chain_sync); 59 H2XOPDESCRIPTOR(inode_unlinkall); 60 H2XOPDESCRIPTOR(inode_connect); 61 H2XOPDESCRIPTOR(inode_flush); 62 H2XOPDESCRIPTOR(strategy_read); 63 H2XOPDESCRIPTOR(strategy_write); 64 65 /* 66 * Set flags and wakeup any waiters. 67 * 68 * WARNING! During teardown (thr) can disappear the instant our cmpset 69 * succeeds. 70 */ 71 void 72 hammer2_thr_signal(hammer2_thread_t *thr, uint32_t flags) 73 { 74 uint32_t oflags; 75 uint32_t nflags; 76 77 for (;;) { 78 oflags = thr->flags; 79 cpu_ccfence(); 80 nflags = (oflags | flags) & ~HAMMER2_THREAD_WAITING; 81 82 if (oflags & HAMMER2_THREAD_WAITING) { 83 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 84 wakeup(&thr->flags); 85 break; 86 } 87 } else { 88 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 89 break; 90 } 91 } 92 } 93 94 /* 95 * Set and clear flags and wakeup any waiters. 96 * 97 * WARNING! During teardown (thr) can disappear the instant our cmpset 98 * succeeds. 99 */ 100 void 101 hammer2_thr_signal2(hammer2_thread_t *thr, uint32_t posflags, uint32_t negflags) 102 { 103 uint32_t oflags; 104 uint32_t nflags; 105 106 for (;;) { 107 oflags = thr->flags; 108 cpu_ccfence(); 109 nflags = (oflags | posflags) & 110 ~(negflags | HAMMER2_THREAD_WAITING); 111 if (oflags & HAMMER2_THREAD_WAITING) { 112 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 113 wakeup(&thr->flags); 114 break; 115 } 116 } else { 117 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 118 break; 119 } 120 } 121 } 122 123 /* 124 * Wait until all the bits in flags are set. 125 * 126 * WARNING! During teardown (thr) can disappear the instant our cmpset 127 * succeeds. 128 */ 129 void 130 hammer2_thr_wait(hammer2_thread_t *thr, uint32_t flags) 131 { 132 uint32_t oflags; 133 uint32_t nflags; 134 135 for (;;) { 136 oflags = thr->flags; 137 cpu_ccfence(); 138 if ((oflags & flags) == flags) 139 break; 140 nflags = oflags | HAMMER2_THREAD_WAITING; 141 tsleep_interlock(&thr->flags, 0); 142 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 143 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 144 } 145 } 146 } 147 148 /* 149 * Wait until any of the bits in flags are set, with timeout. 150 * 151 * WARNING! During teardown (thr) can disappear the instant our cmpset 152 * succeeds. 153 */ 154 int 155 hammer2_thr_wait_any(hammer2_thread_t *thr, uint32_t flags, int timo) 156 { 157 uint32_t oflags; 158 uint32_t nflags; 159 int error; 160 161 error = 0; 162 for (;;) { 163 oflags = thr->flags; 164 cpu_ccfence(); 165 if (oflags & flags) 166 break; 167 nflags = oflags | HAMMER2_THREAD_WAITING; 168 tsleep_interlock(&thr->flags, 0); 169 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 170 error = tsleep(&thr->flags, PINTERLOCKED, 171 "h2twait", timo); 172 } 173 if (error == ETIMEDOUT) { 174 error = HAMMER2_ERROR_ETIMEDOUT; 175 break; 176 } 177 } 178 return error; 179 } 180 181 /* 182 * Wait until the bits in flags are clear. 183 * 184 * WARNING! During teardown (thr) can disappear the instant our cmpset 185 * succeeds. 186 */ 187 void 188 hammer2_thr_wait_neg(hammer2_thread_t *thr, uint32_t flags) 189 { 190 uint32_t oflags; 191 uint32_t nflags; 192 193 for (;;) { 194 oflags = thr->flags; 195 cpu_ccfence(); 196 if ((oflags & flags) == 0) 197 break; 198 nflags = oflags | HAMMER2_THREAD_WAITING; 199 tsleep_interlock(&thr->flags, 0); 200 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 201 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 202 } 203 } 204 } 205 206 /* 207 * Initialize the supplied thread structure, starting the specified 208 * thread. 209 * 210 * NOTE: thr structure can be retained across mounts and unmounts for this 211 * pmp, so make sure the flags are in a sane state. 212 */ 213 void 214 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp, 215 hammer2_dev_t *hmp, 216 const char *id, int clindex, int repidx, 217 void (*func)(void *arg)) 218 { 219 thr->pmp = pmp; /* xop helpers */ 220 thr->hmp = hmp; /* bulkfree */ 221 thr->clindex = clindex; 222 thr->repidx = repidx; 223 TAILQ_INIT(&thr->xopq); 224 atomic_clear_int(&thr->flags, HAMMER2_THREAD_STOP | 225 HAMMER2_THREAD_STOPPED | 226 HAMMER2_THREAD_FREEZE | 227 HAMMER2_THREAD_FROZEN); 228 if (thr->scratch == NULL) 229 thr->scratch = kmalloc(MAXPHYS, M_HAMMER2, M_WAITOK | M_ZERO); 230 if (repidx >= 0) { 231 lwkt_create(func, thr, &thr->td, NULL, 0, repidx % ncpus, 232 "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx); 233 } else if (pmp) { 234 lwkt_create(func, thr, &thr->td, NULL, 0, -1, 235 "%s-%s", id, pmp->pfs_names[clindex]); 236 } else { 237 lwkt_create(func, thr, &thr->td, NULL, 0, -1, "%s", id); 238 } 239 } 240 241 /* 242 * Terminate a thread. This function will silently return if the thread 243 * was never initialized or has already been deleted. 244 * 245 * This is accomplished by setting the STOP flag and waiting for the td 246 * structure to become NULL. 247 */ 248 void 249 hammer2_thr_delete(hammer2_thread_t *thr) 250 { 251 if (thr->td == NULL) 252 return; 253 hammer2_thr_signal(thr, HAMMER2_THREAD_STOP); 254 hammer2_thr_wait(thr, HAMMER2_THREAD_STOPPED); 255 thr->pmp = NULL; 256 if (thr->scratch) { 257 kfree(thr->scratch, M_HAMMER2); 258 thr->scratch = NULL; 259 } 260 KKASSERT(TAILQ_EMPTY(&thr->xopq)); 261 } 262 263 /* 264 * Asynchronous remaster request. Ask the synchronization thread to 265 * start over soon (as if it were frozen and unfrozen, but without waiting). 266 * The thread always recalculates mastership relationships when restarting. 267 */ 268 void 269 hammer2_thr_remaster(hammer2_thread_t *thr) 270 { 271 if (thr->td == NULL) 272 return; 273 hammer2_thr_signal(thr, HAMMER2_THREAD_REMASTER); 274 } 275 276 void 277 hammer2_thr_freeze_async(hammer2_thread_t *thr) 278 { 279 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 280 } 281 282 void 283 hammer2_thr_freeze(hammer2_thread_t *thr) 284 { 285 if (thr->td == NULL) 286 return; 287 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 288 hammer2_thr_wait(thr, HAMMER2_THREAD_FROZEN); 289 } 290 291 void 292 hammer2_thr_unfreeze(hammer2_thread_t *thr) 293 { 294 if (thr->td == NULL) 295 return; 296 hammer2_thr_signal(thr, HAMMER2_THREAD_UNFREEZE); 297 hammer2_thr_wait_neg(thr, HAMMER2_THREAD_FROZEN); 298 } 299 300 int 301 hammer2_thr_break(hammer2_thread_t *thr) 302 { 303 if (thr->flags & (HAMMER2_THREAD_STOP | 304 HAMMER2_THREAD_REMASTER | 305 HAMMER2_THREAD_FREEZE)) { 306 return 1; 307 } 308 return 0; 309 } 310 311 /**************************************************************************** 312 * HAMMER2 XOPS API * 313 ****************************************************************************/ 314 315 void 316 hammer2_xop_group_init(hammer2_pfs_t *pmp, hammer2_xop_group_t *xgrp) 317 { 318 /* no extra fields in structure at the moment */ 319 } 320 321 /* 322 * Allocate a XOP request. 323 * 324 * Once allocated a XOP request can be started, collected, and retired, 325 * and can be retired early if desired. 326 * 327 * NOTE: Fifo indices might not be zero but ri == wi on objcache_get(). 328 */ 329 void * 330 hammer2_xop_alloc(hammer2_inode_t *ip, int flags) 331 { 332 hammer2_xop_t *xop; 333 334 xop = objcache_get(cache_xops, M_WAITOK); 335 KKASSERT(xop->head.cluster.array[0].chain == NULL); 336 337 xop->head.ip1 = ip; 338 xop->head.desc = NULL; 339 xop->head.flags = flags; 340 xop->head.state = 0; 341 xop->head.error = 0; 342 xop->head.collect_key = 0; 343 xop->head.focus_dio = NULL; 344 345 if (flags & HAMMER2_XOP_MODIFYING) 346 xop->head.mtid = hammer2_trans_sub(ip->pmp); 347 else 348 xop->head.mtid = 0; 349 350 xop->head.cluster.nchains = ip->cluster.nchains; 351 xop->head.cluster.pmp = ip->pmp; 352 xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED; 353 354 /* 355 * run_mask - Active thread (or frontend) associated with XOP 356 */ 357 xop->head.run_mask = HAMMER2_XOPMASK_VOP; 358 359 hammer2_inode_ref(ip); 360 361 return xop; 362 } 363 364 void 365 hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len) 366 { 367 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 368 xop->name1_len = name_len; 369 bcopy(name, xop->name1, name_len); 370 } 371 372 void 373 hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len) 374 { 375 xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 376 xop->name2_len = name_len; 377 bcopy(name, xop->name2, name_len); 378 } 379 380 size_t 381 hammer2_xop_setname_inum(hammer2_xop_head_t *xop, hammer2_key_t inum) 382 { 383 const size_t name_len = 18; 384 385 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 386 xop->name1_len = name_len; 387 ksnprintf(xop->name1, name_len + 1, "0x%016jx", (intmax_t)inum); 388 389 return name_len; 390 } 391 392 393 void 394 hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2) 395 { 396 xop->ip2 = ip2; 397 hammer2_inode_ref(ip2); 398 } 399 400 void 401 hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3) 402 { 403 xop->ip3 = ip3; 404 hammer2_inode_ref(ip3); 405 } 406 407 void 408 hammer2_xop_reinit(hammer2_xop_head_t *xop) 409 { 410 xop->state = 0; 411 xop->error = 0; 412 xop->collect_key = 0; 413 xop->run_mask = HAMMER2_XOPMASK_VOP; 414 } 415 416 /* 417 * A mounted PFS needs Xops threads to support frontend operations. 418 */ 419 void 420 hammer2_xop_helper_create(hammer2_pfs_t *pmp) 421 { 422 int i; 423 int j; 424 425 lockmgr(&pmp->lock, LK_EXCLUSIVE); 426 pmp->has_xop_threads = 1; 427 428 for (i = 0; i < pmp->iroot->cluster.nchains; ++i) { 429 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) { 430 if (pmp->xop_groups[j].thrs[i].td) 431 continue; 432 hammer2_thr_create(&pmp->xop_groups[j].thrs[i], 433 pmp, NULL, 434 "h2xop", i, j, 435 hammer2_primary_xops_thread); 436 } 437 } 438 lockmgr(&pmp->lock, LK_RELEASE); 439 } 440 441 void 442 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp) 443 { 444 int i; 445 int j; 446 447 for (i = 0; i < pmp->pfs_nmasters; ++i) { 448 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) { 449 if (pmp->xop_groups[j].thrs[i].td) 450 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]); 451 } 452 } 453 pmp->has_xop_threads = 0; 454 } 455 456 /* 457 * Start a XOP request, queueing it to all nodes in the cluster to 458 * execute the cluster op. 459 * 460 * XXX optimize single-target case. 461 */ 462 void 463 hammer2_xop_start_except(hammer2_xop_head_t *xop, hammer2_xop_desc_t *desc, 464 int notidx) 465 { 466 hammer2_inode_t *ip1; 467 hammer2_pfs_t *pmp; 468 hammer2_thread_t *thr; 469 int i; 470 int ng; 471 int nchains; 472 473 ip1 = xop->ip1; 474 pmp = ip1->pmp; 475 if (pmp->has_xop_threads == 0) 476 hammer2_xop_helper_create(pmp); 477 478 /* 479 * The intent of the XOP sequencer is to ensure that ops on the same 480 * inode execute in the same order. This is necessary when issuing 481 * modifying operations to multiple targets because some targets might 482 * get behind and the frontend is allowed to complete the moment a 483 * quorum of targets succeed. 484 * 485 * Strategy operations must be segregated from non-strategy operations 486 * to avoid a deadlock. For example, if a vfsync and a bread/bwrite 487 * were queued to the same worker thread, the locked buffer in the 488 * strategy operation can deadlock the vfsync's buffer list scan. 489 * 490 * TODO - RENAME fails here because it is potentially modifying 491 * three different inodes. 492 */ 493 if (xop->flags & HAMMER2_XOP_STRATEGY) { 494 hammer2_xop_strategy_t *xopst; 495 496 xopst = &((hammer2_xop_t *)xop)->xop_strategy; 497 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1)) ^ 498 hammer2_icrc32(&xopst->lbase, sizeof(xopst->lbase))); 499 ng = ng & (HAMMER2_XOPGROUPS_MASK >> 1); 500 ng += HAMMER2_XOPGROUPS / 2; 501 } else { 502 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1))); 503 ng = ng & (HAMMER2_XOPGROUPS_MASK >> 1); 504 } 505 xop->desc = desc; 506 507 /* 508 * The instant xop is queued another thread can pick it off. In the 509 * case of asynchronous ops, another thread might even finish and 510 * deallocate it. 511 */ 512 hammer2_spin_ex(&pmp->xop_spin); 513 nchains = ip1->cluster.nchains; 514 for (i = 0; i < nchains; ++i) { 515 /* 516 * XXX ip1->cluster.array* not stable here. This temporary 517 * hack fixes basic issues in target XOPs which need to 518 * obtain a starting chain from the inode but does not 519 * address possible races against inode updates which 520 * might NULL-out a chain. 521 */ 522 if (i != notidx && ip1->cluster.array[i].chain) { 523 thr = &pmp->xop_groups[ng].thrs[i]; 524 atomic_set_64(&xop->run_mask, 1LLU << i); 525 atomic_set_64(&xop->chk_mask, 1LLU << i); 526 xop->collect[i].thr = thr; 527 TAILQ_INSERT_TAIL(&thr->xopq, xop, collect[i].entry); 528 } 529 } 530 hammer2_spin_unex(&pmp->xop_spin); 531 /* xop can become invalid at this point */ 532 533 /* 534 * Each thread has its own xopq 535 */ 536 for (i = 0; i < nchains; ++i) { 537 if (i != notidx) { 538 thr = &pmp->xop_groups[ng].thrs[i]; 539 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 540 } 541 } 542 } 543 544 void 545 hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_desc_t *desc) 546 { 547 hammer2_xop_start_except(xop, desc, -1); 548 } 549 550 /* 551 * Retire a XOP. Used by both the VOP frontend and by the XOP backend. 552 */ 553 void 554 hammer2_xop_retire(hammer2_xop_head_t *xop, uint64_t mask) 555 { 556 hammer2_chain_t *chain; 557 uint64_t nmask; 558 int i; 559 560 /* 561 * Remove the frontend collector or remove a backend feeder. 562 * 563 * When removing the frontend we must wakeup any backend feeders 564 * who are waiting for FIFO space. 565 * 566 * When removing the last backend feeder we must wakeup any waiting 567 * frontend. 568 */ 569 KKASSERT(xop->run_mask & mask); 570 nmask = atomic_fetchadd_64(&xop->run_mask, 571 -mask + HAMMER2_XOPMASK_FEED); 572 573 /* 574 * More than one entity left 575 */ 576 if ((nmask & HAMMER2_XOPMASK_ALLDONE) != mask) { 577 /* 578 * Frontend terminating, wakeup any backends waiting on 579 * fifo full. 580 * 581 * NOTE!!! The xop can get ripped out from under us at 582 * this point, so do not reference it again. 583 * The wakeup(xop) doesn't touch the xop and 584 * is ok. 585 */ 586 if (mask == HAMMER2_XOPMASK_VOP) { 587 if (nmask & HAMMER2_XOPMASK_FIFOW) 588 wakeup(xop); 589 } 590 591 /* 592 * Wakeup frontend if the last backend is terminating. 593 */ 594 nmask -= mask; 595 if ((nmask & HAMMER2_XOPMASK_ALLDONE) == HAMMER2_XOPMASK_VOP) { 596 if (nmask & HAMMER2_XOPMASK_WAIT) 597 wakeup(xop); 598 } 599 600 return; 601 } 602 /* else nobody else left, we can ignore FIFOW */ 603 604 /* 605 * All collectors are gone, we can cleanup and dispose of the XOP. 606 * Note that this can wind up being a frontend OR a backend. 607 * Pending chains are locked shared and not owned by any thread. 608 * 609 * Cleanup the collection cluster. 610 */ 611 for (i = 0; i < xop->cluster.nchains; ++i) { 612 xop->cluster.array[i].flags = 0; 613 chain = xop->cluster.array[i].chain; 614 if (chain) { 615 xop->cluster.array[i].chain = NULL; 616 hammer2_chain_drop_unhold(chain); 617 } 618 } 619 620 /* 621 * Cleanup the fifos. Since we are the only entity left on this 622 * xop we don't have to worry about fifo flow control, and one 623 * lfence() will do the job. 624 */ 625 cpu_lfence(); 626 mask = xop->chk_mask; 627 for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) { 628 hammer2_xop_fifo_t *fifo = &xop->collect[i]; 629 while (fifo->ri != fifo->wi) { 630 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 631 if (chain) 632 hammer2_chain_drop_unhold(chain); 633 ++fifo->ri; 634 } 635 mask &= ~(1U << i); 636 } 637 638 /* 639 * The inode is only held at this point, simply drop it. 640 */ 641 if (xop->ip1) { 642 hammer2_inode_drop(xop->ip1); 643 xop->ip1 = NULL; 644 } 645 if (xop->ip2) { 646 hammer2_inode_drop(xop->ip2); 647 xop->ip2 = NULL; 648 } 649 if (xop->ip3) { 650 hammer2_inode_drop(xop->ip3); 651 xop->ip3 = NULL; 652 } 653 if (xop->name1) { 654 kfree(xop->name1, M_HAMMER2); 655 xop->name1 = NULL; 656 xop->name1_len = 0; 657 } 658 if (xop->name2) { 659 kfree(xop->name2, M_HAMMER2); 660 xop->name2 = NULL; 661 xop->name2_len = 0; 662 } 663 664 objcache_put(cache_xops, xop); 665 } 666 667 /* 668 * (Backend) Returns non-zero if the frontend is still attached. 669 */ 670 int 671 hammer2_xop_active(hammer2_xop_head_t *xop) 672 { 673 if (xop->run_mask & HAMMER2_XOPMASK_VOP) 674 return 1; 675 else 676 return 0; 677 } 678 679 /* 680 * (Backend) Feed chain data through the cluster validator and back to 681 * the frontend. Chains are fed from multiple nodes concurrently 682 * and pipelined via per-node FIFOs in the XOP. 683 * 684 * The chain must be locked (either shared or exclusive). The caller may 685 * unlock and drop the chain on return. This function will add an extra 686 * ref and hold the chain's data for the pass-back. 687 * 688 * No xop lock is needed because we are only manipulating fields under 689 * our direct control. 690 * 691 * Returns 0 on success and a hammer2 error code if sync is permanently 692 * lost. The caller retains a ref on the chain but by convention 693 * the lock is typically inherited by the xop (caller loses lock). 694 * 695 * Returns non-zero on error. In this situation the caller retains a 696 * ref on the chain but loses the lock (we unlock here). 697 */ 698 int 699 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain, 700 int clindex, int error) 701 { 702 hammer2_xop_fifo_t *fifo; 703 uint64_t mask; 704 705 /* 706 * Early termination (typicaly of xop_readir) 707 */ 708 if (hammer2_xop_active(xop) == 0) { 709 error = HAMMER2_ERROR_ABORTED; 710 goto done; 711 } 712 713 /* 714 * Multi-threaded entry into the XOP collector. We own the 715 * fifo->wi for our clindex. 716 */ 717 fifo = &xop->collect[clindex]; 718 719 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) 720 lwkt_yield(); 721 while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 722 atomic_set_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 723 mask = xop->run_mask; 724 if ((mask & HAMMER2_XOPMASK_VOP) == 0) { 725 error = HAMMER2_ERROR_ABORTED; 726 goto done; 727 } 728 tsleep_interlock(xop, 0); 729 if (atomic_cmpset_64(&xop->run_mask, mask, 730 mask | HAMMER2_XOPMASK_FIFOW)) { 731 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 732 tsleep(xop, PINTERLOCKED, "h2feed", hz*60); 733 } 734 } 735 /* retry */ 736 } 737 atomic_clear_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 738 if (chain) 739 hammer2_chain_ref_hold(chain); 740 if (error == 0 && chain) 741 error = chain->error; 742 fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error; 743 fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain; 744 cpu_sfence(); 745 ++fifo->wi; 746 747 mask = atomic_fetchadd_64(&xop->run_mask, HAMMER2_XOPMASK_FEED); 748 if (mask & HAMMER2_XOPMASK_WAIT) { 749 atomic_clear_64(&xop->run_mask, HAMMER2_XOPMASK_WAIT); 750 wakeup(xop); 751 } 752 error = 0; 753 754 /* 755 * Cleanup. If an error occurred we eat the lock. If no error 756 * occurred the fifo inherits the lock and gains an additional ref. 757 * 758 * The caller's ref remains in both cases. 759 */ 760 done: 761 return error; 762 } 763 764 /* 765 * (Frontend) collect a response from a running cluster op. 766 * 767 * Responses are fed from all appropriate nodes concurrently 768 * and collected into a cohesive response >= collect_key. 769 * 770 * The collector will return the instant quorum or other requirements 771 * are met, even if some nodes get behind or become non-responsive. 772 * 773 * HAMMER2_XOP_COLLECT_NOWAIT - Used to 'poll' a completed collection, 774 * usually called synchronously from the 775 * node XOPs for the strategy code to 776 * fake the frontend collection and complete 777 * the BIO as soon as possible. 778 * 779 * HAMMER2_XOP_SYNCHRONIZER - Reqeuest synchronization with a particular 780 * cluster index, prevents looping when that 781 * index is out of sync so caller can act on 782 * the out of sync element. ESRCH and EDEADLK 783 * can be returned if this flag is specified. 784 * 785 * Returns 0 on success plus a filled out xop->cluster structure. 786 * Return ENOENT on normal termination. 787 * Otherwise return an error. 788 * 789 * WARNING! If the xop returns a cluster with a non-NULL focus, note that 790 * none of the chains in the cluster (or the focus) are either 791 * locked or I/O synchronized with the cpu. hammer2_xop_gdata() 792 * and hammer2_xop_pdata() must be used to safely access the focus 793 * chain's content. 794 * 795 * The frontend can make certain assumptions based on higher-level 796 * locking done by the frontend, but data integrity absolutely 797 * requires using the gdata/pdata API. 798 */ 799 int 800 hammer2_xop_collect(hammer2_xop_head_t *xop, int flags) 801 { 802 hammer2_xop_fifo_t *fifo; 803 hammer2_chain_t *chain; 804 hammer2_key_t lokey; 805 uint64_t mask; 806 int error; 807 int keynull; 808 int adv; /* advance the element */ 809 int i; 810 811 loop: 812 /* 813 * First loop tries to advance pieces of the cluster which 814 * are out of sync. 815 */ 816 lokey = HAMMER2_KEY_MAX; 817 keynull = HAMMER2_CHECK_NULL; 818 mask = xop->run_mask; 819 cpu_lfence(); 820 821 for (i = 0; i < xop->cluster.nchains; ++i) { 822 chain = xop->cluster.array[i].chain; 823 if (chain == NULL) { 824 adv = 1; 825 } else if (chain->bref.key < xop->collect_key) { 826 adv = 1; 827 } else { 828 keynull &= ~HAMMER2_CHECK_NULL; 829 if (lokey > chain->bref.key) 830 lokey = chain->bref.key; 831 adv = 0; 832 } 833 if (adv == 0) 834 continue; 835 836 /* 837 * Advance element if possible, advanced element may be NULL. 838 */ 839 if (chain) 840 hammer2_chain_drop_unhold(chain); 841 842 fifo = &xop->collect[i]; 843 if (fifo->ri != fifo->wi) { 844 cpu_lfence(); 845 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 846 error = fifo->errors[fifo->ri & HAMMER2_XOPFIFO_MASK]; 847 ++fifo->ri; 848 xop->cluster.array[i].chain = chain; 849 xop->cluster.array[i].error = error; 850 if (chain == NULL) { 851 /* XXX */ 852 xop->cluster.array[i].flags |= 853 HAMMER2_CITEM_NULL; 854 } 855 if (fifo->wi - fifo->ri <= HAMMER2_XOPFIFO / 2) { 856 if (fifo->flags & HAMMER2_XOP_FIFO_STALL) { 857 atomic_clear_int(&fifo->flags, 858 HAMMER2_XOP_FIFO_STALL); 859 wakeup(xop); 860 lwkt_yield(); 861 } 862 } 863 --i; /* loop on same index */ 864 } else { 865 /* 866 * Retain CITEM_NULL flag. If set just repeat EOF. 867 * If not, the NULL,0 combination indicates an 868 * operation in-progress. 869 */ 870 xop->cluster.array[i].chain = NULL; 871 /* retain any CITEM_NULL setting */ 872 } 873 } 874 875 /* 876 * Determine whether the lowest collected key meets clustering 877 * requirements. Returns: 878 * 879 * 0 - key valid, cluster can be returned. 880 * 881 * ENOENT - normal end of scan, return ENOENT. 882 * 883 * ESRCH - sufficient elements collected, quorum agreement 884 * that lokey is not a valid element and should be 885 * skipped. 886 * 887 * EDEADLK - sufficient elements collected, no quorum agreement 888 * (and no agreement possible). In this situation a 889 * repair is needed, for now we loop. 890 * 891 * EINPROGRESS - insufficient elements collected to resolve, wait 892 * for event and loop. 893 */ 894 if ((flags & HAMMER2_XOP_COLLECT_WAITALL) && 895 (mask & HAMMER2_XOPMASK_ALLDONE) != HAMMER2_XOPMASK_VOP) { 896 error = HAMMER2_ERROR_EINPROGRESS; 897 } else { 898 error = hammer2_cluster_check(&xop->cluster, lokey, keynull); 899 } 900 if (error == HAMMER2_ERROR_EINPROGRESS) { 901 if (flags & HAMMER2_XOP_COLLECT_NOWAIT) 902 goto done; 903 tsleep_interlock(xop, 0); 904 if (atomic_cmpset_64(&xop->run_mask, 905 mask, mask | HAMMER2_XOPMASK_WAIT)) { 906 tsleep(xop, PINTERLOCKED, "h2coll", hz*60); 907 } 908 goto loop; 909 } 910 if (error == HAMMER2_ERROR_ESRCH) { 911 if (lokey != HAMMER2_KEY_MAX) { 912 xop->collect_key = lokey + 1; 913 goto loop; 914 } 915 error = HAMMER2_ERROR_ENOENT; 916 } 917 if (error == HAMMER2_ERROR_EDEADLK) { 918 kprintf("hammer2: no quorum possible lokey %016jx\n", 919 lokey); 920 if (lokey != HAMMER2_KEY_MAX) { 921 xop->collect_key = lokey + 1; 922 goto loop; 923 } 924 error = HAMMER2_ERROR_ENOENT; 925 } 926 if (lokey == HAMMER2_KEY_MAX) 927 xop->collect_key = lokey; 928 else 929 xop->collect_key = lokey + 1; 930 done: 931 return error; 932 } 933 934 /* 935 * N x M processing threads are available to handle XOPs, N per cluster 936 * index x M cluster nodes. 937 * 938 * Locate and return the next runnable xop, or NULL if no xops are 939 * present or none of the xops are currently runnable (for various reasons). 940 * The xop is left on the queue and serves to block other dependent xops 941 * from being run. 942 * 943 * Dependent xops will not be returned. 944 * 945 * Sets HAMMER2_XOP_FIFO_RUN on the returned xop or returns NULL. 946 * 947 * NOTE! Xops run concurrently for each cluster index. 948 */ 949 #define XOP_HASH_SIZE 16 950 #define XOP_HASH_MASK (XOP_HASH_SIZE - 1) 951 952 static __inline 953 int 954 xop_testhash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 955 { 956 uint32_t mask; 957 int hv; 958 959 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 960 mask = 1U << (hv & 31); 961 hv >>= 5; 962 963 return ((int)(hash[hv & XOP_HASH_MASK] & mask)); 964 } 965 966 static __inline 967 void 968 xop_sethash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 969 { 970 uint32_t mask; 971 int hv; 972 973 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 974 mask = 1U << (hv & 31); 975 hv >>= 5; 976 977 hash[hv & XOP_HASH_MASK] |= mask; 978 } 979 980 static 981 hammer2_xop_head_t * 982 hammer2_xop_next(hammer2_thread_t *thr) 983 { 984 hammer2_pfs_t *pmp = thr->pmp; 985 int clindex = thr->clindex; 986 uint32_t hash[XOP_HASH_SIZE] = { 0 }; 987 hammer2_xop_head_t *xop; 988 989 hammer2_spin_ex(&pmp->xop_spin); 990 TAILQ_FOREACH(xop, &thr->xopq, collect[clindex].entry) { 991 /* 992 * Check dependency 993 */ 994 if (xop_testhash(thr, xop->ip1, hash) || 995 (xop->ip2 && xop_testhash(thr, xop->ip2, hash)) || 996 (xop->ip3 && xop_testhash(thr, xop->ip3, hash))) { 997 continue; 998 } 999 xop_sethash(thr, xop->ip1, hash); 1000 if (xop->ip2) 1001 xop_sethash(thr, xop->ip2, hash); 1002 if (xop->ip3) 1003 xop_sethash(thr, xop->ip3, hash); 1004 1005 /* 1006 * Check already running 1007 */ 1008 if (xop->collect[clindex].flags & HAMMER2_XOP_FIFO_RUN) 1009 continue; 1010 1011 /* 1012 * Found a good one, return it. 1013 */ 1014 atomic_set_int(&xop->collect[clindex].flags, 1015 HAMMER2_XOP_FIFO_RUN); 1016 break; 1017 } 1018 hammer2_spin_unex(&pmp->xop_spin); 1019 1020 return xop; 1021 } 1022 1023 /* 1024 * Remove the completed XOP from the queue, clear HAMMER2_XOP_FIFO_RUN. 1025 * 1026 * NOTE! Xops run concurrently for each cluster index. 1027 */ 1028 static 1029 void 1030 hammer2_xop_dequeue(hammer2_thread_t *thr, hammer2_xop_head_t *xop) 1031 { 1032 hammer2_pfs_t *pmp = thr->pmp; 1033 int clindex = thr->clindex; 1034 1035 hammer2_spin_ex(&pmp->xop_spin); 1036 TAILQ_REMOVE(&thr->xopq, xop, collect[clindex].entry); 1037 atomic_clear_int(&xop->collect[clindex].flags, 1038 HAMMER2_XOP_FIFO_RUN); 1039 hammer2_spin_unex(&pmp->xop_spin); 1040 if (TAILQ_FIRST(&thr->xopq)) 1041 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 1042 } 1043 1044 /* 1045 * Primary management thread for xops support. Each node has several such 1046 * threads which replicate front-end operations on cluster nodes. 1047 * 1048 * XOPS thread node operations, allowing the function to focus on a single 1049 * node in the cluster after validating the operation with the cluster. 1050 * This is primarily what prevents dead or stalled nodes from stalling 1051 * the front-end. 1052 */ 1053 void 1054 hammer2_primary_xops_thread(void *arg) 1055 { 1056 hammer2_thread_t *thr = arg; 1057 hammer2_pfs_t *pmp; 1058 hammer2_xop_head_t *xop; 1059 uint64_t mask; 1060 uint32_t flags; 1061 uint32_t nflags; 1062 hammer2_xop_desc_t *last_desc = NULL; 1063 1064 pmp = thr->pmp; 1065 /*xgrp = &pmp->xop_groups[thr->repidx]; not needed */ 1066 mask = 1LLU << thr->clindex; 1067 1068 for (;;) { 1069 flags = thr->flags; 1070 1071 /* 1072 * Handle stop request 1073 */ 1074 if (flags & HAMMER2_THREAD_STOP) 1075 break; 1076 1077 /* 1078 * Handle freeze request 1079 */ 1080 if (flags & HAMMER2_THREAD_FREEZE) { 1081 hammer2_thr_signal2(thr, HAMMER2_THREAD_FROZEN, 1082 HAMMER2_THREAD_FREEZE); 1083 continue; 1084 } 1085 1086 if (flags & HAMMER2_THREAD_UNFREEZE) { 1087 hammer2_thr_signal2(thr, 0, 1088 HAMMER2_THREAD_FROZEN | 1089 HAMMER2_THREAD_UNFREEZE); 1090 continue; 1091 } 1092 1093 /* 1094 * Force idle if frozen until unfrozen or stopped. 1095 */ 1096 if (flags & HAMMER2_THREAD_FROZEN) { 1097 hammer2_thr_wait_any(thr, 1098 HAMMER2_THREAD_UNFREEZE | 1099 HAMMER2_THREAD_STOP, 1100 0); 1101 continue; 1102 } 1103 1104 /* 1105 * Reset state on REMASTER request 1106 */ 1107 if (flags & HAMMER2_THREAD_REMASTER) { 1108 hammer2_thr_signal2(thr, 0, HAMMER2_THREAD_REMASTER); 1109 /* reset state here */ 1110 continue; 1111 } 1112 1113 /* 1114 * Process requests. Each request can be multi-queued. 1115 * 1116 * If we get behind and the frontend VOP is no longer active, 1117 * we retire the request without processing it. The callback 1118 * may also abort processing if the frontend VOP becomes 1119 * inactive. 1120 */ 1121 if (flags & HAMMER2_THREAD_XOPQ) { 1122 nflags = flags & ~HAMMER2_THREAD_XOPQ; 1123 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 1124 continue; 1125 flags = nflags; 1126 /* fall through */ 1127 } 1128 while ((xop = hammer2_xop_next(thr)) != NULL) { 1129 if (hammer2_xop_active(xop)) { 1130 last_desc = xop->desc; 1131 xop->desc->storage_func((hammer2_xop_t *)xop, 1132 thr->scratch, 1133 thr->clindex); 1134 hammer2_xop_dequeue(thr, xop); 1135 hammer2_xop_retire(xop, mask); 1136 } else { 1137 last_desc = xop->desc; 1138 hammer2_xop_feed(xop, NULL, thr->clindex, 1139 ECONNABORTED); 1140 hammer2_xop_dequeue(thr, xop); 1141 hammer2_xop_retire(xop, mask); 1142 } 1143 } 1144 1145 /* 1146 * Wait for event, interlock using THREAD_WAITING and 1147 * THREAD_SIGNAL. 1148 * 1149 * For robustness poll on a 30-second interval, but nominally 1150 * expect to be woken up. 1151 */ 1152 nflags = flags | HAMMER2_THREAD_WAITING; 1153 1154 tsleep_interlock(&thr->flags, 0); 1155 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1156 tsleep(&thr->flags, PINTERLOCKED, "h2idle", hz*30); 1157 } 1158 } 1159 1160 #if 0 1161 /* 1162 * Cleanup / termination 1163 */ 1164 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) { 1165 kprintf("hammer2_thread: aborting xop %s\n", xop->desc->id); 1166 TAILQ_REMOVE(&thr->xopq, xop, 1167 collect[thr->clindex].entry); 1168 hammer2_xop_retire(xop, mask); 1169 } 1170 #endif 1171 thr->td = NULL; 1172 hammer2_thr_signal(thr, HAMMER2_THREAD_STOPPED); 1173 /* thr structure can go invalid after this point */ 1174 } 1175