1 /*- 2 * Copyright (c) 2000 Doug Rabson 3 * Copyright (c) 2014 Jeff Roberson 4 * Copyright (c) 2016 Matthew Macy 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/cdefs.h> 30 #include <sys/param.h> 31 #include <sys/systm.h> 32 #include <sys/bus.h> 33 #include <sys/cpumask.h> 34 #include <sys/kernel.h> 35 #include <sys/libkern.h> 36 #include <sys/limits.h> 37 #include <sys/lock.h> 38 #include <sys/malloc.h> 39 #include <sys/proc.h> 40 #include <sys/sched.h> 41 #include <sys/gtaskqueue.h> 42 #include <sys/unistd.h> 43 #include <machine/stdarg.h> 44 45 static MALLOC_DEFINE(M_GTASKQUEUE, "gtaskqueue", "Group Task Queues"); 46 static void gtaskqueue_thread_enqueue(void *); 47 static void gtaskqueue_thread_loop(void *arg); 48 static int task_is_running(struct gtaskqueue *queue, struct gtask *gtask); 49 static void gtaskqueue_drain_locked(struct gtaskqueue *queue, struct gtask *gtask); 50 51 TASKQGROUP_DEFINE(softirq, ncpus, 1); 52 53 struct gtaskqueue_busy { 54 struct gtask *tb_running; 55 u_int tb_seq; 56 LIST_ENTRY(gtaskqueue_busy) tb_link; 57 }; 58 59 typedef void (*gtaskqueue_enqueue_fn)(void *context); 60 61 struct gtaskqueue { 62 STAILQ_HEAD(, gtask) tq_queue; 63 LIST_HEAD(, gtaskqueue_busy) tq_active; 64 u_int tq_seq; 65 int tq_callouts; 66 struct lock tq_lock; 67 gtaskqueue_enqueue_fn tq_enqueue; 68 void *tq_context; 69 const char *tq_name; 70 struct thread **tq_threads; 71 int tq_tcount; 72 int tq_flags; 73 #if 0 74 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 75 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 76 #endif 77 }; 78 79 #define TQ_FLAGS_ACTIVE (1 << 0) 80 #define TQ_FLAGS_BLOCKED (1 << 1) 81 #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 82 83 #define DT_CALLOUT_ARMED (1 << 0) 84 85 #define TQ_LOCK(tq) lockmgr(&(tq)->tq_lock, LK_EXCLUSIVE) 86 #define TQ_ASSERT_LOCKED(tq) KKASSERT(lockstatus(&(tq)->tq_lock, NULL) != 0) 87 #define TQ_UNLOCK(tq) lockmgr(&(tq)->tq_lock, LK_RELEASE); 88 #define TQ_ASSERT_UNLOCKED(tq) KKASSERT(lockstatus(&(tq)->tq_lock) == 0) 89 90 #ifdef INVARIANTS 91 static void 92 gtask_dump(struct gtask *gtask) 93 { 94 kprintf("gtask: %p ta_flags=%x ta_priority=%d ta_func=%p " 95 "ta_context=%p\n", 96 gtask, gtask->ta_flags, gtask->ta_priority, 97 gtask->ta_func, gtask->ta_context); 98 } 99 #endif 100 101 static __inline int 102 TQ_SLEEP(struct gtaskqueue *tq, void *p, const char *wm) 103 { 104 return (lksleep(p, &tq->tq_lock, 0, wm, 0)); 105 } 106 107 static struct gtaskqueue * 108 _gtaskqueue_create(const char *name, int mflags, 109 taskqueue_enqueue_fn enqueue, void *context, 110 int lkflags, const char *mtxname __unused) 111 { 112 struct gtaskqueue *queue; 113 114 queue = kmalloc(sizeof(struct gtaskqueue), 115 M_GTASKQUEUE, mflags | M_ZERO); 116 if (!queue) { 117 kprintf("_gtaskqueue_create: kmalloc failed %08x\n", mflags); 118 return (NULL); 119 } 120 121 STAILQ_INIT(&queue->tq_queue); 122 LIST_INIT(&queue->tq_active); 123 queue->tq_enqueue = enqueue; 124 queue->tq_context = context; 125 queue->tq_name = name ? name : "taskqueue"; 126 queue->tq_flags |= TQ_FLAGS_ACTIVE; 127 if (enqueue == gtaskqueue_thread_enqueue) 128 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 129 lockinit(&queue->tq_lock, queue->tq_name, 0, 0); 130 131 return (queue); 132 } 133 134 /* 135 * Signal a taskqueue thread to terminate. 136 */ 137 static void 138 gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq) 139 { 140 141 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 142 wakeup(tq); 143 TQ_SLEEP(tq, pp, "gtq_destroy"); 144 } 145 } 146 147 static void __unused 148 gtaskqueue_free(struct gtaskqueue *queue) 149 { 150 151 TQ_LOCK(queue); 152 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 153 gtaskqueue_terminate(queue->tq_threads, queue); 154 KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?")); 155 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 156 lockuninit(&queue->tq_lock); 157 kfree(queue->tq_threads, M_GTASKQUEUE); 158 /*kfree(queue->tq_name, M_GTASKQUEUE);*/ 159 kfree(queue, M_GTASKQUEUE); 160 } 161 162 /* 163 * Wait for all to complete, then prevent it from being enqueued 164 */ 165 void 166 grouptask_block(struct grouptask *grouptask) 167 { 168 struct gtaskqueue *queue = grouptask->gt_taskqueue; 169 struct gtask *gtask = &grouptask->gt_task; 170 171 #ifdef INVARIANTS 172 if (queue == NULL) { 173 gtask_dump(gtask); 174 panic("queue == NULL"); 175 } 176 #endif 177 TQ_LOCK(queue); 178 gtask->ta_flags |= TASK_NOENQUEUE; 179 gtaskqueue_drain_locked(queue, gtask); 180 TQ_UNLOCK(queue); 181 } 182 183 void 184 grouptask_unblock(struct grouptask *grouptask) 185 { 186 struct gtaskqueue *queue = grouptask->gt_taskqueue; 187 struct gtask *gtask = &grouptask->gt_task; 188 189 #ifdef INVARIANTS 190 if (queue == NULL) { 191 gtask_dump(gtask); 192 panic("queue == NULL"); 193 } 194 #endif 195 TQ_LOCK(queue); 196 gtask->ta_flags &= ~TASK_NOENQUEUE; 197 TQ_UNLOCK(queue); 198 } 199 200 int 201 grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask) 202 { 203 #ifdef INVARIANTS 204 if (queue == NULL) { 205 gtask_dump(gtask); 206 panic("queue == NULL"); 207 } 208 #endif 209 TQ_LOCK(queue); 210 if (gtask->ta_flags & TASK_ENQUEUED) { 211 TQ_UNLOCK(queue); 212 return (0); 213 } 214 if (gtask->ta_flags & TASK_NOENQUEUE) { 215 TQ_UNLOCK(queue); 216 return (EAGAIN); 217 } 218 STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link); 219 gtask->ta_flags |= TASK_ENQUEUED; 220 TQ_UNLOCK(queue); 221 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 222 queue->tq_enqueue(queue->tq_context); 223 return (0); 224 } 225 226 static void 227 gtaskqueue_task_nop_fn(void *context) 228 { 229 } 230 231 /* 232 * Block until all currently queued tasks in this taskqueue 233 * have begun execution. Tasks queued during execution of 234 * this function are ignored. 235 */ 236 static void 237 gtaskqueue_drain_tq_queue(struct gtaskqueue *queue) 238 { 239 struct gtask t_barrier; 240 241 if (STAILQ_EMPTY(&queue->tq_queue)) 242 return; 243 244 /* 245 * Enqueue our barrier after all current tasks, but with 246 * the highest priority so that newly queued tasks cannot 247 * pass it. Because of the high priority, we can not use 248 * taskqueue_enqueue_locked directly (which drops the lock 249 * anyway) so just insert it at tail while we have the 250 * queue lock. 251 */ 252 GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier); 253 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); 254 t_barrier.ta_flags |= TASK_ENQUEUED; 255 256 /* 257 * Once the barrier has executed, all previously queued tasks 258 * have completed or are currently executing. 259 */ 260 while (t_barrier.ta_flags & TASK_ENQUEUED) 261 TQ_SLEEP(queue, &t_barrier, "gtq_qdrain"); 262 } 263 264 /* 265 * Block until all currently executing tasks for this taskqueue 266 * complete. Tasks that begin execution during the execution 267 * of this function are ignored. 268 */ 269 static void 270 gtaskqueue_drain_tq_active(struct gtaskqueue *queue) 271 { 272 struct gtaskqueue_busy *tb; 273 u_int seq; 274 275 if (LIST_EMPTY(&queue->tq_active)) 276 return; 277 278 /* Block taskq_terminate().*/ 279 queue->tq_callouts++; 280 281 /* Wait for any active task with sequence from the past. */ 282 seq = queue->tq_seq; 283 restart: 284 LIST_FOREACH(tb, &queue->tq_active, tb_link) { 285 if ((int)(tb->tb_seq - seq) <= 0) { 286 TQ_SLEEP(queue, tb->tb_running, "gtq_adrain"); 287 goto restart; 288 } 289 } 290 291 /* Release taskqueue_terminate(). */ 292 queue->tq_callouts--; 293 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 294 wakeup_one(queue->tq_threads); 295 } 296 297 void 298 gtaskqueue_block(struct gtaskqueue *queue) 299 { 300 301 TQ_LOCK(queue); 302 queue->tq_flags |= TQ_FLAGS_BLOCKED; 303 TQ_UNLOCK(queue); 304 } 305 306 void 307 gtaskqueue_unblock(struct gtaskqueue *queue) 308 { 309 310 TQ_LOCK(queue); 311 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 312 if (!STAILQ_EMPTY(&queue->tq_queue)) 313 queue->tq_enqueue(queue->tq_context); 314 TQ_UNLOCK(queue); 315 } 316 317 static void 318 gtaskqueue_run_locked(struct gtaskqueue *queue) 319 { 320 struct gtaskqueue_busy tb; 321 struct gtask *gtask; 322 #if 0 323 struct epoch_tracker et; 324 bool in_net_epoch; 325 #endif 326 327 KASSERT(queue != NULL, ("tq is NULL")); 328 TQ_ASSERT_LOCKED(queue); 329 tb.tb_running = NULL; 330 LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link); 331 #if 0 332 in_net_epoch = false; 333 #endif 334 335 while ((gtask = STAILQ_FIRST(&queue->tq_queue)) != NULL) { 336 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 337 gtask->ta_flags &= ~TASK_ENQUEUED; 338 tb.tb_running = gtask; 339 tb.tb_seq = ++queue->tq_seq; 340 TQ_UNLOCK(queue); 341 342 KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL")); 343 #if 0 344 if (!in_net_epoch && TASK_IS_NET(gtask)) { 345 in_net_epoch = true; 346 NET_EPOCH_ENTER(et); 347 } else if (in_net_epoch && !TASK_IS_NET(gtask)) { 348 NET_EPOCH_EXIT(et); 349 in_net_epoch = false; 350 } 351 #endif 352 gtask->ta_func(gtask->ta_context); 353 354 TQ_LOCK(queue); 355 wakeup(gtask); 356 } 357 #if 0 358 if (in_net_epoch) 359 NET_EPOCH_EXIT(et); 360 #endif 361 LIST_REMOVE(&tb, tb_link); 362 } 363 364 static int 365 task_is_running(struct gtaskqueue *queue, struct gtask *gtask) 366 { 367 struct gtaskqueue_busy *tb; 368 369 TQ_ASSERT_LOCKED(queue); 370 LIST_FOREACH(tb, &queue->tq_active, tb_link) { 371 if (tb->tb_running == gtask) 372 return (1); 373 } 374 return (0); 375 } 376 377 static int 378 gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask) 379 { 380 381 if (gtask->ta_flags & TASK_ENQUEUED) 382 STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link); 383 gtask->ta_flags &= ~TASK_ENQUEUED; 384 return (task_is_running(queue, gtask) ? EBUSY : 0); 385 } 386 387 int 388 gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask) 389 { 390 int error; 391 392 TQ_LOCK(queue); 393 error = gtaskqueue_cancel_locked(queue, gtask); 394 TQ_UNLOCK(queue); 395 396 return (error); 397 } 398 399 static void 400 gtaskqueue_drain_locked(struct gtaskqueue *queue, struct gtask *gtask) 401 { 402 while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask)) 403 TQ_SLEEP(queue, gtask, "gtq_drain"); 404 } 405 406 void 407 gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask) 408 { 409 TQ_LOCK(queue); 410 gtaskqueue_drain_locked(queue, gtask); 411 TQ_UNLOCK(queue); 412 } 413 414 void 415 gtaskqueue_drain_all(struct gtaskqueue *queue) 416 { 417 418 TQ_LOCK(queue); 419 gtaskqueue_drain_tq_queue(queue); 420 gtaskqueue_drain_tq_active(queue); 421 TQ_UNLOCK(queue); 422 } 423 424 static int __printflike(4, 0) 425 _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, 426 const char *name, __va_list ap) 427 { 428 char ktname[MAXCOMLEN + 1]; 429 struct thread *td; 430 struct gtaskqueue *tq; 431 int i, error; 432 433 if (count <= 0) 434 return (EINVAL); 435 436 kvsnprintf(ktname, sizeof(ktname), name, ap); 437 tq = *tqp; 438 439 tq->tq_threads = kmalloc(sizeof(struct thread *) * count, 440 M_GTASKQUEUE, M_WAITOK | M_ZERO); 441 442 for (i = 0; i < count; i++) { 443 int cpu = i % ncpus; 444 if (count == 1) { 445 error = lwkt_create(gtaskqueue_thread_loop, tqp, 446 &tq->tq_threads[i], NULL, 447 TDF_NOSTART, cpu, 448 "%s", ktname); 449 } else { 450 error = lwkt_create(gtaskqueue_thread_loop, tqp, 451 &tq->tq_threads[i], NULL, 452 TDF_NOSTART, cpu, 453 "%s_%d", ktname, i); 454 } 455 if (error) { 456 /* should be ok to continue, taskqueue_free will dtrt */ 457 kprintf("%s: lwkt_create(%s): error %d", 458 __func__, ktname, error); 459 tq->tq_threads[i] = NULL; /* paranoid */ 460 } else 461 tq->tq_tcount++; 462 } 463 for (i = 0; i < count; i++) { 464 if (tq->tq_threads[i] == NULL) 465 continue; 466 td = tq->tq_threads[i]; 467 lwkt_setpri_initial(td, pri); 468 lwkt_schedule(td); 469 } 470 471 return (0); 472 } 473 474 static int __printflike(4, 5) 475 gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, 476 const char *name, ...) 477 { 478 __va_list ap; 479 int error; 480 481 __va_start(ap, name); 482 error = _gtaskqueue_start_threads(tqp, count, pri, name, ap); 483 __va_end(ap); 484 return (error); 485 } 486 487 #if 0 488 static inline void 489 gtaskqueue_run_callback(struct gtaskqueue *tq, 490 enum taskqueue_callback_type cb_type) 491 { 492 taskqueue_callback_fn tq_callback; 493 494 TQ_ASSERT_UNLOCKED(tq); 495 tq_callback = tq->tq_callbacks[cb_type]; 496 if (tq_callback != NULL) 497 tq_callback(tq->tq_cb_contexts[cb_type]); 498 } 499 #endif 500 501 static void 502 gtaskqueue_thread_loop(void *arg) 503 { 504 struct gtaskqueue **tqp, *tq; 505 506 tqp = arg; 507 tq = *tqp; 508 #if 0 509 gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 510 #endif 511 TQ_LOCK(tq); 512 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 513 /* XXX ? */ 514 gtaskqueue_run_locked(tq); 515 /* 516 * Because taskqueue_run() can drop tq_mutex, we need to 517 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 518 * meantime, which means we missed a wakeup. 519 */ 520 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 521 break; 522 TQ_SLEEP(tq, tq, "-"); 523 } 524 gtaskqueue_run_locked(tq); 525 /* 526 * This thread is on its way out, so just drop the lock temporarily 527 * in order to call the shutdown callback. This allows the callback 528 * to look at the taskqueue, even just before it dies. 529 */ 530 #if 0 531 TQ_UNLOCK(tq); 532 gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 533 TQ_LOCK(tq); 534 #endif 535 536 /* rendezvous with thread that asked us to terminate */ 537 tq->tq_tcount--; 538 wakeup_one(tq->tq_threads); 539 TQ_UNLOCK(tq); 540 lwkt_exit(); 541 } 542 543 static void 544 gtaskqueue_thread_enqueue(void *context) 545 { 546 struct gtaskqueue **tqp, *tq; 547 548 tqp = context; 549 tq = *tqp; 550 wakeup_one(tq); 551 } 552 553 /* 554 * NOTE: FreeBSD uses MTX_SPIN locks, which doesn't make a whole lot 555 * of sense (over-use of spin-locks in general). In DFly we 556 * want to use blockable locks for almost everything. 557 */ 558 static struct gtaskqueue * 559 gtaskqueue_create_fast(const char *name, int mflags, 560 taskqueue_enqueue_fn enqueue, void *context) 561 { 562 return _gtaskqueue_create(name, mflags, enqueue, context, 563 0, "fast_taskqueue"); 564 } 565 566 struct taskqgroup_cpu { 567 LIST_HEAD(, grouptask) tgc_tasks; 568 struct gtaskqueue *tgc_taskq; 569 int tgc_cnt; 570 int tgc_cpu; 571 }; 572 573 struct taskqgroup { 574 struct taskqgroup_cpu tqg_queue[MAXCPU]; 575 struct lock tqg_lock; 576 const char * tqg_name; 577 int tqg_cnt; 578 }; 579 580 struct taskq_bind_task { 581 struct gtask bt_task; 582 int bt_cpuid; 583 }; 584 585 static void 586 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx, int cpu) 587 { 588 struct taskqgroup_cpu *qcpu; 589 590 qcpu = &qgroup->tqg_queue[idx]; 591 LIST_INIT(&qcpu->tgc_tasks); 592 qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK, 593 gtaskqueue_thread_enqueue, 594 &qcpu->tgc_taskq); 595 gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, TDPRI_KERN_DAEMON, 596 "%s_%d", qgroup->tqg_name, idx); 597 qcpu->tgc_cpu = cpu; 598 } 599 600 /* 601 * Find the taskq with least # of tasks that doesn't currently have any 602 * other queues from the uniq identifier. 603 */ 604 static int 605 taskqgroup_find(struct taskqgroup *qgroup, void *uniq) 606 { 607 struct grouptask *n; 608 int i, idx, mincnt; 609 int strict; 610 611 KKASSERT(lockstatus(&qgroup->tqg_lock, NULL) != 0); 612 KASSERT(qgroup->tqg_cnt != 0, 613 ("qgroup %s has no queues", qgroup->tqg_name)); 614 615 /* 616 * Two passes: first scan for a queue with the least tasks that 617 * does not already service this uniq id. If that fails simply find 618 * the queue with the least total tasks. 619 */ 620 for (idx = -1, mincnt = INT_MAX, strict = 1; mincnt == INT_MAX; 621 strict = 0) { 622 for (i = 0; i < qgroup->tqg_cnt; i++) { 623 if (qgroup->tqg_queue[i].tgc_cnt > mincnt) 624 continue; 625 if (strict) { 626 LIST_FOREACH(n, &qgroup->tqg_queue[i].tgc_tasks, 627 gt_list) 628 if (n->gt_uniq == uniq) 629 break; 630 if (n != NULL) 631 continue; 632 } 633 mincnt = qgroup->tqg_queue[i].tgc_cnt; 634 idx = i; 635 } 636 } 637 if (idx == -1) 638 panic("%s: failed to pick a qid.", __func__); 639 640 return (idx); 641 } 642 643 void 644 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask, 645 void *uniq, device_t dev, struct resource *irq, const char *name) 646 { 647 int cpu, qid, error; 648 649 KASSERT(qgroup->tqg_cnt > 0, 650 ("qgroup %s has no queues", qgroup->tqg_name)); 651 652 gtask->gt_uniq = uniq; 653 ksnprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask"); 654 gtask->gt_dev = dev; 655 gtask->gt_irq = irq; 656 gtask->gt_cpu = -1; 657 lockmgr(&qgroup->tqg_lock, LK_EXCLUSIVE); 658 qid = taskqgroup_find(qgroup, uniq); 659 qgroup->tqg_queue[qid].tgc_cnt++; 660 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 661 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 662 if (dev != NULL && irq != NULL) { 663 cpu = qgroup->tqg_queue[qid].tgc_cpu; 664 gtask->gt_cpu = cpu; 665 lockmgr(&qgroup->tqg_lock, LK_RELEASE); 666 #if 0 667 /* 668 * XXX FreeBSD created a mess by separating out the cpu 669 * binding from bus_setup_intr(). Punt for now. 670 */ 671 error = bus_bind_intr(dev, irq, cpu); 672 #endif 673 error = 0; 674 675 if (error) 676 kprintf("%s: binding interrupt failed for %s: %d\n", 677 __func__, gtask->gt_name, error); 678 } else { 679 lockmgr(&qgroup->tqg_lock, LK_RELEASE); 680 } 681 } 682 683 int 684 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask, 685 void *uniq, int cpu, device_t dev, struct resource *irq, const char *name) 686 { 687 int i, qid, error; 688 689 gtask->gt_uniq = uniq; 690 ksnprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask"); 691 gtask->gt_dev = dev; 692 gtask->gt_irq = irq; 693 gtask->gt_cpu = cpu; 694 lockmgr(&qgroup->tqg_lock, LK_EXCLUSIVE); 695 for (i = 0, qid = -1; i < qgroup->tqg_cnt; i++) { 696 if (qgroup->tqg_queue[i].tgc_cpu == cpu) { 697 qid = i; 698 break; 699 } 700 } 701 if (qid == -1) { 702 lockmgr(&qgroup->tqg_lock, LK_RELEASE); 703 kprintf("%s: qid not found for %s cpu=%d\n", 704 __func__, gtask->gt_name, cpu); 705 return (EINVAL); 706 } 707 qgroup->tqg_queue[qid].tgc_cnt++; 708 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 709 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 710 cpu = qgroup->tqg_queue[qid].tgc_cpu; 711 lockmgr(&qgroup->tqg_lock, LK_RELEASE); 712 713 if (dev != NULL && irq != NULL) { 714 #if 0 715 /* 716 * XXX FreeBSD created a mess by separating out the cpu 717 * binding from bus_setup_intr(). Punt for now. 718 */ 719 error = bus_bind_intr(dev, irq, cpu); 720 #endif 721 error = 0; 722 723 if (error) { 724 kprintf("%s: binding interrupt failed for %s: %d\n", 725 __func__, gtask->gt_name, error); 726 } 727 } 728 return (0); 729 } 730 731 void 732 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask) 733 { 734 int i; 735 736 grouptask_block(gtask); 737 lockmgr(&qgroup->tqg_lock, LK_EXCLUSIVE); 738 for (i = 0; i < qgroup->tqg_cnt; i++) 739 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue) 740 break; 741 if (i == qgroup->tqg_cnt) 742 panic("%s: task %s not in group", __func__, gtask->gt_name); 743 qgroup->tqg_queue[i].tgc_cnt--; 744 LIST_REMOVE(gtask, gt_list); 745 lockmgr(&qgroup->tqg_lock, LK_RELEASE); 746 gtask->gt_taskqueue = NULL; 747 gtask->gt_task.ta_flags &= ~TASK_NOENQUEUE; 748 } 749 750 static void 751 taskqgroup_binder(void *ctx) 752 { 753 struct taskq_bind_task *gtask; 754 755 gtask = ctx; 756 lwkt_migratecpu(gtask->bt_cpuid); 757 kfree(gtask, M_DEVBUF); 758 } 759 760 void 761 taskqgroup_bind(struct taskqgroup *qgroup) 762 { 763 struct taskq_bind_task *gtask; 764 int i; 765 766 /* 767 * Bind taskqueue threads to specific CPUs, if they have been assigned 768 * one. 769 */ 770 if (qgroup->tqg_cnt == 1) 771 return; 772 773 for (i = 0; i < qgroup->tqg_cnt; i++) { 774 gtask = kmalloc(sizeof(*gtask), M_DEVBUF, M_WAITOK); 775 GTASK_INIT(>ask->bt_task, 0, 0, taskqgroup_binder, gtask); 776 gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu; 777 grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq, 778 >ask->bt_task); 779 } 780 } 781 782 struct taskqgroup * 783 taskqgroup_create(const char *name, int cnt, int stride) 784 { 785 struct taskqgroup *qgroup; 786 int cpu, i, j; 787 788 qgroup = kmalloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO); 789 lockinit(&qgroup->tqg_lock, "taskqgroup", 0, 0); 790 qgroup->tqg_name = name; 791 qgroup->tqg_cnt = cnt; 792 793 for (cpu = i = 0; i < cnt; i++) { 794 taskqgroup_cpu_create(qgroup, i, cpu); 795 for (j = 0; j < stride; j++) 796 cpu = (cpu + 1) % ncpus; 797 } 798 return (qgroup); 799 } 800 801 void 802 taskqgroup_destroy(struct taskqgroup *qgroup) 803 { 804 } 805 806 void 807 taskqgroup_drain_all(struct taskqgroup *tqg) 808 { 809 struct gtaskqueue *q; 810 811 for (int i = 0; i < ncpus; i++) { 812 q = tqg->tqg_queue[i].tgc_taskq; 813 if (q == NULL) 814 continue; 815 gtaskqueue_drain_all(q); 816 } 817 } 818