1 /* 2 * Copyright (c) 2015-2018 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 /* 35 * This module implements the hammer2 helper thread API, including 36 * the frontend/backend XOP API. 37 */ 38 #include "hammer2.h" 39 40 #define H2XOPDESCRIPTOR(label) \ 41 hammer2_xop_desc_t hammer2_##label##_desc = { \ 42 .storage_func = hammer2_xop_##label, \ 43 .id = #label \ 44 }; \ 45 46 H2XOPDESCRIPTOR(ipcluster); 47 H2XOPDESCRIPTOR(readdir); 48 H2XOPDESCRIPTOR(nresolve); 49 H2XOPDESCRIPTOR(unlink); 50 H2XOPDESCRIPTOR(nrename); 51 H2XOPDESCRIPTOR(scanlhc); 52 H2XOPDESCRIPTOR(scanall); 53 H2XOPDESCRIPTOR(lookup); 54 H2XOPDESCRIPTOR(delete); 55 H2XOPDESCRIPTOR(inode_mkdirent); 56 H2XOPDESCRIPTOR(inode_create); 57 H2XOPDESCRIPTOR(inode_create_det); 58 H2XOPDESCRIPTOR(inode_create_ins); 59 H2XOPDESCRIPTOR(inode_destroy); 60 H2XOPDESCRIPTOR(inode_chain_sync); 61 H2XOPDESCRIPTOR(inode_unlinkall); 62 H2XOPDESCRIPTOR(inode_connect); 63 H2XOPDESCRIPTOR(inode_flush); 64 H2XOPDESCRIPTOR(strategy_read); 65 H2XOPDESCRIPTOR(strategy_write); 66 67 /* 68 * Set flags and wakeup any waiters. 69 * 70 * WARNING! During teardown (thr) can disappear the instant our cmpset 71 * succeeds. 72 */ 73 void 74 hammer2_thr_signal(hammer2_thread_t *thr, uint32_t flags) 75 { 76 uint32_t oflags; 77 uint32_t nflags; 78 79 for (;;) { 80 oflags = thr->flags; 81 cpu_ccfence(); 82 nflags = (oflags | flags) & ~HAMMER2_THREAD_WAITING; 83 84 if (oflags & HAMMER2_THREAD_WAITING) { 85 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 86 wakeup(&thr->flags); 87 break; 88 } 89 } else { 90 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 91 break; 92 } 93 } 94 } 95 96 /* 97 * Set and clear flags and wakeup any waiters. 98 * 99 * WARNING! During teardown (thr) can disappear the instant our cmpset 100 * succeeds. 101 */ 102 void 103 hammer2_thr_signal2(hammer2_thread_t *thr, uint32_t posflags, uint32_t negflags) 104 { 105 uint32_t oflags; 106 uint32_t nflags; 107 108 for (;;) { 109 oflags = thr->flags; 110 cpu_ccfence(); 111 nflags = (oflags | posflags) & 112 ~(negflags | HAMMER2_THREAD_WAITING); 113 if (oflags & HAMMER2_THREAD_WAITING) { 114 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 115 wakeup(&thr->flags); 116 break; 117 } 118 } else { 119 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) 120 break; 121 } 122 } 123 } 124 125 /* 126 * Wait until all the bits in flags are set. 127 * 128 * WARNING! During teardown (thr) can disappear the instant our cmpset 129 * succeeds. 130 */ 131 void 132 hammer2_thr_wait(hammer2_thread_t *thr, uint32_t flags) 133 { 134 uint32_t oflags; 135 uint32_t nflags; 136 137 for (;;) { 138 oflags = thr->flags; 139 cpu_ccfence(); 140 if ((oflags & flags) == flags) 141 break; 142 nflags = oflags | HAMMER2_THREAD_WAITING; 143 tsleep_interlock(&thr->flags, 0); 144 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 145 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 146 } 147 } 148 } 149 150 /* 151 * Wait until any of the bits in flags are set, with timeout. 152 * 153 * WARNING! During teardown (thr) can disappear the instant our cmpset 154 * succeeds. 155 */ 156 int 157 hammer2_thr_wait_any(hammer2_thread_t *thr, uint32_t flags, int timo) 158 { 159 uint32_t oflags; 160 uint32_t nflags; 161 int error; 162 163 error = 0; 164 for (;;) { 165 oflags = thr->flags; 166 cpu_ccfence(); 167 if (oflags & flags) 168 break; 169 nflags = oflags | HAMMER2_THREAD_WAITING; 170 tsleep_interlock(&thr->flags, 0); 171 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 172 error = tsleep(&thr->flags, PINTERLOCKED, 173 "h2twait", timo); 174 } 175 if (error == ETIMEDOUT) { 176 error = HAMMER2_ERROR_ETIMEDOUT; 177 break; 178 } 179 } 180 return error; 181 } 182 183 /* 184 * Wait until the bits in flags are clear. 185 * 186 * WARNING! During teardown (thr) can disappear the instant our cmpset 187 * succeeds. 188 */ 189 void 190 hammer2_thr_wait_neg(hammer2_thread_t *thr, uint32_t flags) 191 { 192 uint32_t oflags; 193 uint32_t nflags; 194 195 for (;;) { 196 oflags = thr->flags; 197 cpu_ccfence(); 198 if ((oflags & flags) == 0) 199 break; 200 nflags = oflags | HAMMER2_THREAD_WAITING; 201 tsleep_interlock(&thr->flags, 0); 202 if (atomic_cmpset_int(&thr->flags, oflags, nflags)) { 203 tsleep(&thr->flags, PINTERLOCKED, "h2twait", hz*60); 204 } 205 } 206 } 207 208 /* 209 * Initialize the supplied thread structure, starting the specified 210 * thread. 211 * 212 * NOTE: thr structure can be retained across mounts and unmounts for this 213 * pmp, so make sure the flags are in a sane state. 214 */ 215 void 216 hammer2_thr_create(hammer2_thread_t *thr, hammer2_pfs_t *pmp, 217 hammer2_dev_t *hmp, 218 const char *id, int clindex, int repidx, 219 void (*func)(void *arg)) 220 { 221 thr->pmp = pmp; /* xop helpers */ 222 thr->hmp = hmp; /* bulkfree */ 223 thr->clindex = clindex; 224 thr->repidx = repidx; 225 TAILQ_INIT(&thr->xopq); 226 atomic_clear_int(&thr->flags, HAMMER2_THREAD_STOP | 227 HAMMER2_THREAD_STOPPED | 228 HAMMER2_THREAD_FREEZE | 229 HAMMER2_THREAD_FROZEN); 230 if (thr->scratch == NULL) 231 thr->scratch = kmalloc(MAXPHYS, M_HAMMER2, M_WAITOK | M_ZERO); 232 if (repidx >= 0) { 233 lwkt_create(func, thr, &thr->td, NULL, 0, repidx % ncpus, 234 "%s-%s.%02d", id, pmp->pfs_names[clindex], repidx); 235 } else if (pmp) { 236 lwkt_create(func, thr, &thr->td, NULL, 0, -1, 237 "%s-%s", id, pmp->pfs_names[clindex]); 238 } else { 239 lwkt_create(func, thr, &thr->td, NULL, 0, -1, "%s", id); 240 } 241 } 242 243 /* 244 * Terminate a thread. This function will silently return if the thread 245 * was never initialized or has already been deleted. 246 * 247 * This is accomplished by setting the STOP flag and waiting for the td 248 * structure to become NULL. 249 */ 250 void 251 hammer2_thr_delete(hammer2_thread_t *thr) 252 { 253 if (thr->td == NULL) 254 return; 255 hammer2_thr_signal(thr, HAMMER2_THREAD_STOP); 256 hammer2_thr_wait(thr, HAMMER2_THREAD_STOPPED); 257 thr->pmp = NULL; 258 if (thr->scratch) { 259 kfree(thr->scratch, M_HAMMER2); 260 thr->scratch = NULL; 261 } 262 KKASSERT(TAILQ_EMPTY(&thr->xopq)); 263 } 264 265 /* 266 * Asynchronous remaster request. Ask the synchronization thread to 267 * start over soon (as if it were frozen and unfrozen, but without waiting). 268 * The thread always recalculates mastership relationships when restarting. 269 */ 270 void 271 hammer2_thr_remaster(hammer2_thread_t *thr) 272 { 273 if (thr->td == NULL) 274 return; 275 hammer2_thr_signal(thr, HAMMER2_THREAD_REMASTER); 276 } 277 278 void 279 hammer2_thr_freeze_async(hammer2_thread_t *thr) 280 { 281 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 282 } 283 284 void 285 hammer2_thr_freeze(hammer2_thread_t *thr) 286 { 287 if (thr->td == NULL) 288 return; 289 hammer2_thr_signal(thr, HAMMER2_THREAD_FREEZE); 290 hammer2_thr_wait(thr, HAMMER2_THREAD_FROZEN); 291 } 292 293 void 294 hammer2_thr_unfreeze(hammer2_thread_t *thr) 295 { 296 if (thr->td == NULL) 297 return; 298 hammer2_thr_signal(thr, HAMMER2_THREAD_UNFREEZE); 299 hammer2_thr_wait_neg(thr, HAMMER2_THREAD_FROZEN); 300 } 301 302 int 303 hammer2_thr_break(hammer2_thread_t *thr) 304 { 305 if (thr->flags & (HAMMER2_THREAD_STOP | 306 HAMMER2_THREAD_REMASTER | 307 HAMMER2_THREAD_FREEZE)) { 308 return 1; 309 } 310 return 0; 311 } 312 313 /**************************************************************************** 314 * HAMMER2 XOPS API * 315 ****************************************************************************/ 316 317 void 318 hammer2_xop_group_init(hammer2_pfs_t *pmp, hammer2_xop_group_t *xgrp) 319 { 320 /* no extra fields in structure at the moment */ 321 } 322 323 /* 324 * Allocate a XOP request. 325 * 326 * Once allocated a XOP request can be started, collected, and retired, 327 * and can be retired early if desired. 328 * 329 * NOTE: Fifo indices might not be zero but ri == wi on objcache_get(). 330 */ 331 void * 332 hammer2_xop_alloc(hammer2_inode_t *ip, int flags) 333 { 334 hammer2_xop_t *xop; 335 336 xop = objcache_get(cache_xops, M_WAITOK); 337 KKASSERT(xop->head.cluster.array[0].chain == NULL); 338 339 xop->head.ip1 = ip; 340 xop->head.desc = NULL; 341 xop->head.flags = flags; 342 xop->head.state = 0; 343 xop->head.error = 0; 344 xop->head.collect_key = 0; 345 xop->head.focus_dio = NULL; 346 347 if (flags & HAMMER2_XOP_MODIFYING) 348 xop->head.mtid = hammer2_trans_sub(ip->pmp); 349 else 350 xop->head.mtid = 0; 351 352 xop->head.cluster.nchains = ip->cluster.nchains; 353 xop->head.cluster.pmp = ip->pmp; 354 xop->head.cluster.flags = HAMMER2_CLUSTER_LOCKED; 355 356 /* 357 * run_mask - Active thread (or frontend) associated with XOP 358 */ 359 xop->head.run_mask = HAMMER2_XOPMASK_VOP; 360 361 hammer2_inode_ref(ip); 362 363 return xop; 364 } 365 366 void 367 hammer2_xop_setname(hammer2_xop_head_t *xop, const char *name, size_t name_len) 368 { 369 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 370 xop->name1_len = name_len; 371 bcopy(name, xop->name1, name_len); 372 } 373 374 void 375 hammer2_xop_setname2(hammer2_xop_head_t *xop, const char *name, size_t name_len) 376 { 377 xop->name2 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 378 xop->name2_len = name_len; 379 bcopy(name, xop->name2, name_len); 380 } 381 382 size_t 383 hammer2_xop_setname_inum(hammer2_xop_head_t *xop, hammer2_key_t inum) 384 { 385 const size_t name_len = 18; 386 387 xop->name1 = kmalloc(name_len + 1, M_HAMMER2, M_WAITOK | M_ZERO); 388 xop->name1_len = name_len; 389 ksnprintf(xop->name1, name_len + 1, "0x%016jx", (intmax_t)inum); 390 391 return name_len; 392 } 393 394 395 void 396 hammer2_xop_setip2(hammer2_xop_head_t *xop, hammer2_inode_t *ip2) 397 { 398 xop->ip2 = ip2; 399 hammer2_inode_ref(ip2); 400 } 401 402 void 403 hammer2_xop_setip3(hammer2_xop_head_t *xop, hammer2_inode_t *ip3) 404 { 405 xop->ip3 = ip3; 406 hammer2_inode_ref(ip3); 407 } 408 409 void 410 hammer2_xop_reinit(hammer2_xop_head_t *xop) 411 { 412 xop->state = 0; 413 xop->error = 0; 414 xop->collect_key = 0; 415 xop->run_mask = HAMMER2_XOPMASK_VOP; 416 } 417 418 /* 419 * A mounted PFS needs Xops threads to support frontend operations. 420 */ 421 void 422 hammer2_xop_helper_create(hammer2_pfs_t *pmp) 423 { 424 int i; 425 int j; 426 427 lockmgr(&pmp->lock, LK_EXCLUSIVE); 428 pmp->has_xop_threads = 1; 429 430 for (i = 0; i < pmp->iroot->cluster.nchains; ++i) { 431 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) { 432 if (pmp->xop_groups[j].thrs[i].td) 433 continue; 434 hammer2_thr_create(&pmp->xop_groups[j].thrs[i], 435 pmp, NULL, 436 "h2xop", i, j, 437 hammer2_primary_xops_thread); 438 } 439 } 440 lockmgr(&pmp->lock, LK_RELEASE); 441 } 442 443 void 444 hammer2_xop_helper_cleanup(hammer2_pfs_t *pmp) 445 { 446 int i; 447 int j; 448 449 for (i = 0; i < pmp->pfs_nmasters; ++i) { 450 for (j = 0; j < HAMMER2_XOPGROUPS; ++j) { 451 if (pmp->xop_groups[j].thrs[i].td) 452 hammer2_thr_delete(&pmp->xop_groups[j].thrs[i]); 453 } 454 } 455 pmp->has_xop_threads = 0; 456 } 457 458 /* 459 * Start a XOP request, queueing it to all nodes in the cluster to 460 * execute the cluster op. 461 * 462 * XXX optimize single-target case. 463 */ 464 void 465 hammer2_xop_start_except(hammer2_xop_head_t *xop, hammer2_xop_desc_t *desc, 466 int notidx) 467 { 468 hammer2_inode_t *ip1; 469 hammer2_pfs_t *pmp; 470 hammer2_thread_t *thr; 471 int i; 472 int ng; 473 int nchains; 474 475 ip1 = xop->ip1; 476 pmp = ip1->pmp; 477 if (pmp->has_xop_threads == 0) 478 hammer2_xop_helper_create(pmp); 479 480 /* 481 * The intent of the XOP sequencer is to ensure that ops on the same 482 * inode execute in the same order. This is necessary when issuing 483 * modifying operations to multiple targets because some targets might 484 * get behind and the frontend is allowed to complete the moment a 485 * quorum of targets succeed. 486 * 487 * Strategy operations must be segregated from non-strategy operations 488 * to avoid a deadlock. For example, if a vfsync and a bread/bwrite 489 * were queued to the same worker thread, the locked buffer in the 490 * strategy operation can deadlock the vfsync's buffer list scan. 491 * 492 * TODO - RENAME fails here because it is potentially modifying 493 * three different inodes. 494 */ 495 if (xop->flags & HAMMER2_XOP_STRATEGY) { 496 hammer2_xop_strategy_t *xopst; 497 498 xopst = &((hammer2_xop_t *)xop)->xop_strategy; 499 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1)) ^ 500 hammer2_icrc32(&xopst->lbase, sizeof(xopst->lbase))); 501 ng = ng & (HAMMER2_XOPGROUPS_MASK >> 1); 502 ng += HAMMER2_XOPGROUPS / 2; 503 } else { 504 ng = (int)(hammer2_icrc32(&xop->ip1, sizeof(xop->ip1))); 505 ng = ng & (HAMMER2_XOPGROUPS_MASK >> 1); 506 } 507 xop->desc = desc; 508 509 /* 510 * The instant xop is queued another thread can pick it off. In the 511 * case of asynchronous ops, another thread might even finish and 512 * deallocate it. 513 */ 514 hammer2_spin_ex(&pmp->xop_spin); 515 nchains = ip1->cluster.nchains; 516 for (i = 0; i < nchains; ++i) { 517 /* 518 * XXX ip1->cluster.array* not stable here. This temporary 519 * hack fixes basic issues in target XOPs which need to 520 * obtain a starting chain from the inode but does not 521 * address possible races against inode updates which 522 * might NULL-out a chain. 523 */ 524 if (i != notidx && ip1->cluster.array[i].chain) { 525 thr = &pmp->xop_groups[ng].thrs[i]; 526 atomic_set_64(&xop->run_mask, 1LLU << i); 527 atomic_set_64(&xop->chk_mask, 1LLU << i); 528 xop->collect[i].thr = thr; 529 TAILQ_INSERT_TAIL(&thr->xopq, xop, collect[i].entry); 530 } 531 } 532 hammer2_spin_unex(&pmp->xop_spin); 533 /* xop can become invalid at this point */ 534 535 /* 536 * Each thread has its own xopq 537 */ 538 for (i = 0; i < nchains; ++i) { 539 if (i != notidx) { 540 thr = &pmp->xop_groups[ng].thrs[i]; 541 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 542 } 543 } 544 } 545 546 void 547 hammer2_xop_start(hammer2_xop_head_t *xop, hammer2_xop_desc_t *desc) 548 { 549 hammer2_xop_start_except(xop, desc, -1); 550 } 551 552 /* 553 * Retire a XOP. Used by both the VOP frontend and by the XOP backend. 554 */ 555 void 556 hammer2_xop_retire(hammer2_xop_head_t *xop, uint64_t mask) 557 { 558 hammer2_chain_t *chain; 559 uint64_t nmask; 560 int i; 561 562 /* 563 * Remove the frontend collector or remove a backend feeder. 564 * 565 * When removing the frontend we must wakeup any backend feeders 566 * who are waiting for FIFO space. 567 * 568 * When removing the last backend feeder we must wakeup any waiting 569 * frontend. 570 */ 571 KKASSERT(xop->run_mask & mask); 572 nmask = atomic_fetchadd_64(&xop->run_mask, 573 -mask + HAMMER2_XOPMASK_FEED); 574 575 /* 576 * More than one entity left 577 */ 578 if ((nmask & HAMMER2_XOPMASK_ALLDONE) != mask) { 579 /* 580 * Frontend terminating, wakeup any backends waiting on 581 * fifo full. 582 * 583 * NOTE!!! The xop can get ripped out from under us at 584 * this point, so do not reference it again. 585 * The wakeup(xop) doesn't touch the xop and 586 * is ok. 587 */ 588 if (mask == HAMMER2_XOPMASK_VOP) { 589 if (nmask & HAMMER2_XOPMASK_FIFOW) 590 wakeup(xop); 591 } 592 593 /* 594 * Wakeup frontend if the last backend is terminating. 595 */ 596 nmask -= mask; 597 if ((nmask & HAMMER2_XOPMASK_ALLDONE) == HAMMER2_XOPMASK_VOP) { 598 if (nmask & HAMMER2_XOPMASK_WAIT) 599 wakeup(xop); 600 } 601 602 return; 603 } 604 /* else nobody else left, we can ignore FIFOW */ 605 606 /* 607 * All collectors are gone, we can cleanup and dispose of the XOP. 608 * Note that this can wind up being a frontend OR a backend. 609 * Pending chains are locked shared and not owned by any thread. 610 * 611 * Cleanup the collection cluster. 612 */ 613 for (i = 0; i < xop->cluster.nchains; ++i) { 614 xop->cluster.array[i].flags = 0; 615 chain = xop->cluster.array[i].chain; 616 if (chain) { 617 xop->cluster.array[i].chain = NULL; 618 hammer2_chain_drop_unhold(chain); 619 } 620 } 621 622 /* 623 * Cleanup the fifos. Since we are the only entity left on this 624 * xop we don't have to worry about fifo flow control, and one 625 * lfence() will do the job. 626 */ 627 cpu_lfence(); 628 mask = xop->chk_mask; 629 for (i = 0; mask && i < HAMMER2_MAXCLUSTER; ++i) { 630 hammer2_xop_fifo_t *fifo = &xop->collect[i]; 631 while (fifo->ri != fifo->wi) { 632 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 633 if (chain) 634 hammer2_chain_drop_unhold(chain); 635 ++fifo->ri; 636 } 637 mask &= ~(1U << i); 638 } 639 640 /* 641 * The inode is only held at this point, simply drop it. 642 */ 643 if (xop->ip1) { 644 hammer2_inode_drop(xop->ip1); 645 xop->ip1 = NULL; 646 } 647 if (xop->ip2) { 648 hammer2_inode_drop(xop->ip2); 649 xop->ip2 = NULL; 650 } 651 if (xop->ip3) { 652 hammer2_inode_drop(xop->ip3); 653 xop->ip3 = NULL; 654 } 655 if (xop->name1) { 656 kfree(xop->name1, M_HAMMER2); 657 xop->name1 = NULL; 658 xop->name1_len = 0; 659 } 660 if (xop->name2) { 661 kfree(xop->name2, M_HAMMER2); 662 xop->name2 = NULL; 663 xop->name2_len = 0; 664 } 665 666 objcache_put(cache_xops, xop); 667 } 668 669 /* 670 * (Backend) Returns non-zero if the frontend is still attached. 671 */ 672 int 673 hammer2_xop_active(hammer2_xop_head_t *xop) 674 { 675 if (xop->run_mask & HAMMER2_XOPMASK_VOP) 676 return 1; 677 else 678 return 0; 679 } 680 681 /* 682 * (Backend) Feed chain data through the cluster validator and back to 683 * the frontend. Chains are fed from multiple nodes concurrently 684 * and pipelined via per-node FIFOs in the XOP. 685 * 686 * The chain must be locked (either shared or exclusive). The caller may 687 * unlock and drop the chain on return. This function will add an extra 688 * ref and hold the chain's data for the pass-back. 689 * 690 * No xop lock is needed because we are only manipulating fields under 691 * our direct control. 692 * 693 * Returns 0 on success and a hammer2 error code if sync is permanently 694 * lost. The caller retains a ref on the chain but by convention 695 * the lock is typically inherited by the xop (caller loses lock). 696 * 697 * Returns non-zero on error. In this situation the caller retains a 698 * ref on the chain but loses the lock (we unlock here). 699 */ 700 int 701 hammer2_xop_feed(hammer2_xop_head_t *xop, hammer2_chain_t *chain, 702 int clindex, int error) 703 { 704 hammer2_xop_fifo_t *fifo; 705 uint64_t mask; 706 707 /* 708 * Early termination (typicaly of xop_readir) 709 */ 710 if (hammer2_xop_active(xop) == 0) { 711 error = HAMMER2_ERROR_ABORTED; 712 goto done; 713 } 714 715 /* 716 * Multi-threaded entry into the XOP collector. We own the 717 * fifo->wi for our clindex. 718 */ 719 fifo = &xop->collect[clindex]; 720 721 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) 722 lwkt_yield(); 723 while (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 724 atomic_set_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 725 mask = xop->run_mask; 726 if ((mask & HAMMER2_XOPMASK_VOP) == 0) { 727 error = HAMMER2_ERROR_ABORTED; 728 goto done; 729 } 730 tsleep_interlock(xop, 0); 731 if (atomic_cmpset_64(&xop->run_mask, mask, 732 mask | HAMMER2_XOPMASK_FIFOW)) { 733 if (fifo->ri == fifo->wi - HAMMER2_XOPFIFO) { 734 tsleep(xop, PINTERLOCKED, "h2feed", hz*60); 735 } 736 } 737 /* retry */ 738 } 739 atomic_clear_int(&fifo->flags, HAMMER2_XOP_FIFO_STALL); 740 if (chain) 741 hammer2_chain_ref_hold(chain); 742 if (error == 0 && chain) 743 error = chain->error; 744 fifo->errors[fifo->wi & HAMMER2_XOPFIFO_MASK] = error; 745 fifo->array[fifo->wi & HAMMER2_XOPFIFO_MASK] = chain; 746 cpu_sfence(); 747 ++fifo->wi; 748 749 mask = atomic_fetchadd_64(&xop->run_mask, HAMMER2_XOPMASK_FEED); 750 if (mask & HAMMER2_XOPMASK_WAIT) { 751 atomic_clear_64(&xop->run_mask, HAMMER2_XOPMASK_WAIT); 752 wakeup(xop); 753 } 754 error = 0; 755 756 /* 757 * Cleanup. If an error occurred we eat the lock. If no error 758 * occurred the fifo inherits the lock and gains an additional ref. 759 * 760 * The caller's ref remains in both cases. 761 */ 762 done: 763 return error; 764 } 765 766 /* 767 * (Frontend) collect a response from a running cluster op. 768 * 769 * Responses are fed from all appropriate nodes concurrently 770 * and collected into a cohesive response >= collect_key. 771 * 772 * The collector will return the instant quorum or other requirements 773 * are met, even if some nodes get behind or become non-responsive. 774 * 775 * HAMMER2_XOP_COLLECT_NOWAIT - Used to 'poll' a completed collection, 776 * usually called synchronously from the 777 * node XOPs for the strategy code to 778 * fake the frontend collection and complete 779 * the BIO as soon as possible. 780 * 781 * HAMMER2_XOP_SYNCHRONIZER - Reqeuest synchronization with a particular 782 * cluster index, prevents looping when that 783 * index is out of sync so caller can act on 784 * the out of sync element. ESRCH and EDEADLK 785 * can be returned if this flag is specified. 786 * 787 * Returns 0 on success plus a filled out xop->cluster structure. 788 * Return ENOENT on normal termination. 789 * Otherwise return an error. 790 * 791 * WARNING! If the xop returns a cluster with a non-NULL focus, note that 792 * none of the chains in the cluster (or the focus) are either 793 * locked or I/O synchronized with the cpu. hammer2_xop_gdata() 794 * and hammer2_xop_pdata() must be used to safely access the focus 795 * chain's content. 796 * 797 * The frontend can make certain assumptions based on higher-level 798 * locking done by the frontend, but data integrity absolutely 799 * requires using the gdata/pdata API. 800 */ 801 int 802 hammer2_xop_collect(hammer2_xop_head_t *xop, int flags) 803 { 804 hammer2_xop_fifo_t *fifo; 805 hammer2_chain_t *chain; 806 hammer2_key_t lokey; 807 uint64_t mask; 808 int error; 809 int keynull; 810 int adv; /* advance the element */ 811 int i; 812 813 loop: 814 /* 815 * First loop tries to advance pieces of the cluster which 816 * are out of sync. 817 */ 818 lokey = HAMMER2_KEY_MAX; 819 keynull = HAMMER2_CHECK_NULL; 820 mask = xop->run_mask; 821 cpu_lfence(); 822 823 for (i = 0; i < xop->cluster.nchains; ++i) { 824 chain = xop->cluster.array[i].chain; 825 if (chain == NULL) { 826 adv = 1; 827 } else if (chain->bref.key < xop->collect_key) { 828 adv = 1; 829 } else { 830 keynull &= ~HAMMER2_CHECK_NULL; 831 if (lokey > chain->bref.key) 832 lokey = chain->bref.key; 833 adv = 0; 834 } 835 if (adv == 0) 836 continue; 837 838 /* 839 * Advance element if possible, advanced element may be NULL. 840 */ 841 if (chain) 842 hammer2_chain_drop_unhold(chain); 843 844 fifo = &xop->collect[i]; 845 if (fifo->ri != fifo->wi) { 846 cpu_lfence(); 847 chain = fifo->array[fifo->ri & HAMMER2_XOPFIFO_MASK]; 848 error = fifo->errors[fifo->ri & HAMMER2_XOPFIFO_MASK]; 849 ++fifo->ri; 850 xop->cluster.array[i].chain = chain; 851 xop->cluster.array[i].error = error; 852 if (chain == NULL) { 853 /* XXX */ 854 xop->cluster.array[i].flags |= 855 HAMMER2_CITEM_NULL; 856 } 857 if (fifo->wi - fifo->ri <= HAMMER2_XOPFIFO / 2) { 858 if (fifo->flags & HAMMER2_XOP_FIFO_STALL) { 859 atomic_clear_int(&fifo->flags, 860 HAMMER2_XOP_FIFO_STALL); 861 wakeup(xop); 862 lwkt_yield(); 863 } 864 } 865 --i; /* loop on same index */ 866 } else { 867 /* 868 * Retain CITEM_NULL flag. If set just repeat EOF. 869 * If not, the NULL,0 combination indicates an 870 * operation in-progress. 871 */ 872 xop->cluster.array[i].chain = NULL; 873 /* retain any CITEM_NULL setting */ 874 } 875 } 876 877 /* 878 * Determine whether the lowest collected key meets clustering 879 * requirements. Returns: 880 * 881 * 0 - key valid, cluster can be returned. 882 * 883 * ENOENT - normal end of scan, return ENOENT. 884 * 885 * ESRCH - sufficient elements collected, quorum agreement 886 * that lokey is not a valid element and should be 887 * skipped. 888 * 889 * EDEADLK - sufficient elements collected, no quorum agreement 890 * (and no agreement possible). In this situation a 891 * repair is needed, for now we loop. 892 * 893 * EINPROGRESS - insufficient elements collected to resolve, wait 894 * for event and loop. 895 */ 896 if ((flags & HAMMER2_XOP_COLLECT_WAITALL) && 897 (mask & HAMMER2_XOPMASK_ALLDONE) != HAMMER2_XOPMASK_VOP) { 898 error = HAMMER2_ERROR_EINPROGRESS; 899 } else { 900 error = hammer2_cluster_check(&xop->cluster, lokey, keynull); 901 } 902 if (error == HAMMER2_ERROR_EINPROGRESS) { 903 if (flags & HAMMER2_XOP_COLLECT_NOWAIT) 904 goto done; 905 tsleep_interlock(xop, 0); 906 if (atomic_cmpset_64(&xop->run_mask, 907 mask, mask | HAMMER2_XOPMASK_WAIT)) { 908 tsleep(xop, PINTERLOCKED, "h2coll", hz*60); 909 } 910 goto loop; 911 } 912 if (error == HAMMER2_ERROR_ESRCH) { 913 if (lokey != HAMMER2_KEY_MAX) { 914 xop->collect_key = lokey + 1; 915 goto loop; 916 } 917 error = HAMMER2_ERROR_ENOENT; 918 } 919 if (error == HAMMER2_ERROR_EDEADLK) { 920 kprintf("hammer2: no quorum possible lokey %016jx\n", 921 lokey); 922 if (lokey != HAMMER2_KEY_MAX) { 923 xop->collect_key = lokey + 1; 924 goto loop; 925 } 926 error = HAMMER2_ERROR_ENOENT; 927 } 928 if (lokey == HAMMER2_KEY_MAX) 929 xop->collect_key = lokey; 930 else 931 xop->collect_key = lokey + 1; 932 done: 933 return error; 934 } 935 936 /* 937 * N x M processing threads are available to handle XOPs, N per cluster 938 * index x M cluster nodes. 939 * 940 * Locate and return the next runnable xop, or NULL if no xops are 941 * present or none of the xops are currently runnable (for various reasons). 942 * The xop is left on the queue and serves to block other dependent xops 943 * from being run. 944 * 945 * Dependent xops will not be returned. 946 * 947 * Sets HAMMER2_XOP_FIFO_RUN on the returned xop or returns NULL. 948 * 949 * NOTE! Xops run concurrently for each cluster index. 950 */ 951 #define XOP_HASH_SIZE 16 952 #define XOP_HASH_MASK (XOP_HASH_SIZE - 1) 953 954 static __inline 955 int 956 xop_testhash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 957 { 958 uint32_t mask; 959 int hv; 960 961 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 962 mask = 1U << (hv & 31); 963 hv >>= 5; 964 965 return ((int)(hash[hv & XOP_HASH_MASK] & mask)); 966 } 967 968 static __inline 969 void 970 xop_sethash(hammer2_thread_t *thr, hammer2_inode_t *ip, uint32_t *hash) 971 { 972 uint32_t mask; 973 int hv; 974 975 hv = (int)((uintptr_t)ip + (uintptr_t)thr) / sizeof(hammer2_inode_t); 976 mask = 1U << (hv & 31); 977 hv >>= 5; 978 979 hash[hv & XOP_HASH_MASK] |= mask; 980 } 981 982 static 983 hammer2_xop_head_t * 984 hammer2_xop_next(hammer2_thread_t *thr) 985 { 986 hammer2_pfs_t *pmp = thr->pmp; 987 int clindex = thr->clindex; 988 uint32_t hash[XOP_HASH_SIZE] = { 0 }; 989 hammer2_xop_head_t *xop; 990 991 hammer2_spin_ex(&pmp->xop_spin); 992 TAILQ_FOREACH(xop, &thr->xopq, collect[clindex].entry) { 993 /* 994 * Check dependency 995 */ 996 if (xop_testhash(thr, xop->ip1, hash) || 997 (xop->ip2 && xop_testhash(thr, xop->ip2, hash)) || 998 (xop->ip3 && xop_testhash(thr, xop->ip3, hash))) { 999 continue; 1000 } 1001 xop_sethash(thr, xop->ip1, hash); 1002 if (xop->ip2) 1003 xop_sethash(thr, xop->ip2, hash); 1004 if (xop->ip3) 1005 xop_sethash(thr, xop->ip3, hash); 1006 1007 /* 1008 * Check already running 1009 */ 1010 if (xop->collect[clindex].flags & HAMMER2_XOP_FIFO_RUN) 1011 continue; 1012 1013 /* 1014 * Found a good one, return it. 1015 */ 1016 atomic_set_int(&xop->collect[clindex].flags, 1017 HAMMER2_XOP_FIFO_RUN); 1018 break; 1019 } 1020 hammer2_spin_unex(&pmp->xop_spin); 1021 1022 return xop; 1023 } 1024 1025 /* 1026 * Remove the completed XOP from the queue, clear HAMMER2_XOP_FIFO_RUN. 1027 * 1028 * NOTE! Xops run concurrently for each cluster index. 1029 */ 1030 static 1031 void 1032 hammer2_xop_dequeue(hammer2_thread_t *thr, hammer2_xop_head_t *xop) 1033 { 1034 hammer2_pfs_t *pmp = thr->pmp; 1035 int clindex = thr->clindex; 1036 1037 hammer2_spin_ex(&pmp->xop_spin); 1038 TAILQ_REMOVE(&thr->xopq, xop, collect[clindex].entry); 1039 atomic_clear_int(&xop->collect[clindex].flags, 1040 HAMMER2_XOP_FIFO_RUN); 1041 hammer2_spin_unex(&pmp->xop_spin); 1042 if (TAILQ_FIRST(&thr->xopq)) 1043 hammer2_thr_signal(thr, HAMMER2_THREAD_XOPQ); 1044 } 1045 1046 /* 1047 * Primary management thread for xops support. Each node has several such 1048 * threads which replicate front-end operations on cluster nodes. 1049 * 1050 * XOPS thread node operations, allowing the function to focus on a single 1051 * node in the cluster after validating the operation with the cluster. 1052 * This is primarily what prevents dead or stalled nodes from stalling 1053 * the front-end. 1054 */ 1055 void 1056 hammer2_primary_xops_thread(void *arg) 1057 { 1058 hammer2_thread_t *thr = arg; 1059 hammer2_pfs_t *pmp; 1060 hammer2_xop_head_t *xop; 1061 uint64_t mask; 1062 uint32_t flags; 1063 uint32_t nflags; 1064 hammer2_xop_desc_t *last_desc = NULL; 1065 1066 pmp = thr->pmp; 1067 /*xgrp = &pmp->xop_groups[thr->repidx]; not needed */ 1068 mask = 1LLU << thr->clindex; 1069 1070 for (;;) { 1071 flags = thr->flags; 1072 1073 /* 1074 * Handle stop request 1075 */ 1076 if (flags & HAMMER2_THREAD_STOP) 1077 break; 1078 1079 /* 1080 * Handle freeze request 1081 */ 1082 if (flags & HAMMER2_THREAD_FREEZE) { 1083 hammer2_thr_signal2(thr, HAMMER2_THREAD_FROZEN, 1084 HAMMER2_THREAD_FREEZE); 1085 continue; 1086 } 1087 1088 if (flags & HAMMER2_THREAD_UNFREEZE) { 1089 hammer2_thr_signal2(thr, 0, 1090 HAMMER2_THREAD_FROZEN | 1091 HAMMER2_THREAD_UNFREEZE); 1092 continue; 1093 } 1094 1095 /* 1096 * Force idle if frozen until unfrozen or stopped. 1097 */ 1098 if (flags & HAMMER2_THREAD_FROZEN) { 1099 hammer2_thr_wait_any(thr, 1100 HAMMER2_THREAD_UNFREEZE | 1101 HAMMER2_THREAD_STOP, 1102 0); 1103 continue; 1104 } 1105 1106 /* 1107 * Reset state on REMASTER request 1108 */ 1109 if (flags & HAMMER2_THREAD_REMASTER) { 1110 hammer2_thr_signal2(thr, 0, HAMMER2_THREAD_REMASTER); 1111 /* reset state here */ 1112 continue; 1113 } 1114 1115 /* 1116 * Process requests. Each request can be multi-queued. 1117 * 1118 * If we get behind and the frontend VOP is no longer active, 1119 * we retire the request without processing it. The callback 1120 * may also abort processing if the frontend VOP becomes 1121 * inactive. 1122 */ 1123 if (flags & HAMMER2_THREAD_XOPQ) { 1124 nflags = flags & ~HAMMER2_THREAD_XOPQ; 1125 if (!atomic_cmpset_int(&thr->flags, flags, nflags)) 1126 continue; 1127 flags = nflags; 1128 /* fall through */ 1129 } 1130 while ((xop = hammer2_xop_next(thr)) != NULL) { 1131 if (hammer2_xop_active(xop)) { 1132 last_desc = xop->desc; 1133 xop->desc->storage_func((hammer2_xop_t *)xop, 1134 thr->scratch, 1135 thr->clindex); 1136 hammer2_xop_dequeue(thr, xop); 1137 hammer2_xop_retire(xop, mask); 1138 } else { 1139 last_desc = xop->desc; 1140 hammer2_xop_feed(xop, NULL, thr->clindex, 1141 ECONNABORTED); 1142 hammer2_xop_dequeue(thr, xop); 1143 hammer2_xop_retire(xop, mask); 1144 } 1145 } 1146 1147 /* 1148 * Wait for event, interlock using THREAD_WAITING and 1149 * THREAD_SIGNAL. 1150 * 1151 * For robustness poll on a 30-second interval, but nominally 1152 * expect to be woken up. 1153 */ 1154 nflags = flags | HAMMER2_THREAD_WAITING; 1155 1156 tsleep_interlock(&thr->flags, 0); 1157 if (atomic_cmpset_int(&thr->flags, flags, nflags)) { 1158 tsleep(&thr->flags, PINTERLOCKED, "h2idle", hz*30); 1159 } 1160 } 1161 1162 #if 0 1163 /* 1164 * Cleanup / termination 1165 */ 1166 while ((xop = TAILQ_FIRST(&thr->xopq)) != NULL) { 1167 kprintf("hammer2_thread: aborting xop %s\n", xop->desc->id); 1168 TAILQ_REMOVE(&thr->xopq, xop, 1169 collect[thr->clindex].entry); 1170 hammer2_xop_retire(xop, mask); 1171 } 1172 #endif 1173 thr->td = NULL; 1174 hammer2_thr_signal(thr, HAMMER2_THREAD_STOPPED); 1175 /* thr structure can go invalid after this point */ 1176 } 1177