123ac9029SStephen Hurd /*- 223ac9029SStephen Hurd * Copyright (c) 2000 Doug Rabson 323ac9029SStephen Hurd * Copyright (c) 2014 Jeff Roberson 423ac9029SStephen Hurd * Copyright (c) 2016 Matthew Macy 523ac9029SStephen Hurd * All rights reserved. 623ac9029SStephen Hurd * 723ac9029SStephen Hurd * Redistribution and use in source and binary forms, with or without 823ac9029SStephen Hurd * modification, are permitted provided that the following conditions 923ac9029SStephen Hurd * are met: 1023ac9029SStephen Hurd * 1. Redistributions of source code must retain the above copyright 1123ac9029SStephen Hurd * notice, this list of conditions and the following disclaimer. 1223ac9029SStephen Hurd * 2. Redistributions in binary form must reproduce the above copyright 1323ac9029SStephen Hurd * notice, this list of conditions and the following disclaimer in the 1423ac9029SStephen Hurd * documentation and/or other materials provided with the distribution. 1523ac9029SStephen Hurd * 1623ac9029SStephen Hurd * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 1723ac9029SStephen Hurd * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 1823ac9029SStephen Hurd * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 1923ac9029SStephen Hurd * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 2023ac9029SStephen Hurd * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 2123ac9029SStephen Hurd * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2223ac9029SStephen Hurd * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2323ac9029SStephen Hurd * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2423ac9029SStephen Hurd * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2523ac9029SStephen Hurd * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2623ac9029SStephen Hurd * SUCH DAMAGE. 2723ac9029SStephen Hurd */ 2823ac9029SStephen Hurd 2923ac9029SStephen Hurd #include <sys/cdefs.h> 3023ac9029SStephen Hurd __FBSDID("$FreeBSD$"); 3123ac9029SStephen Hurd 3223ac9029SStephen Hurd #include <sys/param.h> 3323ac9029SStephen Hurd #include <sys/systm.h> 3423ac9029SStephen Hurd #include <sys/bus.h> 3523ac9029SStephen Hurd #include <sys/cpuset.h> 3623ac9029SStephen Hurd #include <sys/interrupt.h> 3723ac9029SStephen Hurd #include <sys/kernel.h> 3823ac9029SStephen Hurd #include <sys/kthread.h> 3923ac9029SStephen Hurd #include <sys/libkern.h> 4023ac9029SStephen Hurd #include <sys/limits.h> 4123ac9029SStephen Hurd #include <sys/lock.h> 4223ac9029SStephen Hurd #include <sys/malloc.h> 4323ac9029SStephen Hurd #include <sys/mutex.h> 4423ac9029SStephen Hurd #include <sys/proc.h> 4523ac9029SStephen Hurd #include <sys/sched.h> 4623ac9029SStephen Hurd #include <sys/smp.h> 4723ac9029SStephen Hurd #include <sys/gtaskqueue.h> 4823ac9029SStephen Hurd #include <sys/unistd.h> 4923ac9029SStephen Hurd #include <machine/stdarg.h> 5023ac9029SStephen Hurd 51a0fcc371SStephen Hurd static MALLOC_DEFINE(M_GTASKQUEUE, "gtaskqueue", "Group Task Queues"); 5223ac9029SStephen Hurd static void gtaskqueue_thread_enqueue(void *); 5323ac9029SStephen Hurd static void gtaskqueue_thread_loop(void *arg); 54ab2e3f79SStephen Hurd 55ab2e3f79SStephen Hurd TASKQGROUP_DEFINE(softirq, mp_ncpus, 1); 56d945ed64SSean Bruno 5723ac9029SStephen Hurd struct gtaskqueue_busy { 5823ac9029SStephen Hurd struct gtask *tb_running; 5923ac9029SStephen Hurd TAILQ_ENTRY(gtaskqueue_busy) tb_link; 6023ac9029SStephen Hurd }; 6123ac9029SStephen Hurd 6223ac9029SStephen Hurd static struct gtask * const TB_DRAIN_WAITER = (struct gtask *)0x1; 6323ac9029SStephen Hurd 6423ac9029SStephen Hurd struct gtaskqueue { 6523ac9029SStephen Hurd STAILQ_HEAD(, gtask) tq_queue; 6623ac9029SStephen Hurd gtaskqueue_enqueue_fn tq_enqueue; 6723ac9029SStephen Hurd void *tq_context; 6823ac9029SStephen Hurd char *tq_name; 6923ac9029SStephen Hurd TAILQ_HEAD(, gtaskqueue_busy) tq_active; 7023ac9029SStephen Hurd struct mtx tq_mutex; 7123ac9029SStephen Hurd struct thread **tq_threads; 7223ac9029SStephen Hurd int tq_tcount; 7323ac9029SStephen Hurd int tq_spin; 7423ac9029SStephen Hurd int tq_flags; 7523ac9029SStephen Hurd int tq_callouts; 7623ac9029SStephen Hurd taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 7723ac9029SStephen Hurd void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 7823ac9029SStephen Hurd }; 7923ac9029SStephen Hurd 8023ac9029SStephen Hurd #define TQ_FLAGS_ACTIVE (1 << 0) 8123ac9029SStephen Hurd #define TQ_FLAGS_BLOCKED (1 << 1) 8223ac9029SStephen Hurd #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 8323ac9029SStephen Hurd 8423ac9029SStephen Hurd #define DT_CALLOUT_ARMED (1 << 0) 8523ac9029SStephen Hurd 8623ac9029SStephen Hurd #define TQ_LOCK(tq) \ 8723ac9029SStephen Hurd do { \ 8823ac9029SStephen Hurd if ((tq)->tq_spin) \ 8923ac9029SStephen Hurd mtx_lock_spin(&(tq)->tq_mutex); \ 9023ac9029SStephen Hurd else \ 9123ac9029SStephen Hurd mtx_lock(&(tq)->tq_mutex); \ 9223ac9029SStephen Hurd } while (0) 9323ac9029SStephen Hurd #define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 9423ac9029SStephen Hurd 9523ac9029SStephen Hurd #define TQ_UNLOCK(tq) \ 9623ac9029SStephen Hurd do { \ 9723ac9029SStephen Hurd if ((tq)->tq_spin) \ 9823ac9029SStephen Hurd mtx_unlock_spin(&(tq)->tq_mutex); \ 9923ac9029SStephen Hurd else \ 10023ac9029SStephen Hurd mtx_unlock(&(tq)->tq_mutex); \ 10123ac9029SStephen Hurd } while (0) 10223ac9029SStephen Hurd #define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 10323ac9029SStephen Hurd 1041248952aSSean Bruno #ifdef INVARIANTS 1051248952aSSean Bruno static void 1061248952aSSean Bruno gtask_dump(struct gtask *gtask) 1071248952aSSean Bruno { 1081248952aSSean Bruno printf("gtask: %p ta_flags=%x ta_priority=%d ta_func=%p ta_context=%p\n", 1091248952aSSean Bruno gtask, gtask->ta_flags, gtask->ta_priority, gtask->ta_func, gtask->ta_context); 1101248952aSSean Bruno } 1111248952aSSean Bruno #endif 1121248952aSSean Bruno 11323ac9029SStephen Hurd static __inline int 11423ac9029SStephen Hurd TQ_SLEEP(struct gtaskqueue *tq, void *p, struct mtx *m, int pri, const char *wm, 11523ac9029SStephen Hurd int t) 11623ac9029SStephen Hurd { 11723ac9029SStephen Hurd if (tq->tq_spin) 11823ac9029SStephen Hurd return (msleep_spin(p, m, wm, t)); 11923ac9029SStephen Hurd return (msleep(p, m, pri, wm, t)); 12023ac9029SStephen Hurd } 12123ac9029SStephen Hurd 12223ac9029SStephen Hurd static struct gtaskqueue * 12323ac9029SStephen Hurd _gtaskqueue_create(const char *name, int mflags, 12423ac9029SStephen Hurd taskqueue_enqueue_fn enqueue, void *context, 12523ac9029SStephen Hurd int mtxflags, const char *mtxname __unused) 12623ac9029SStephen Hurd { 12723ac9029SStephen Hurd struct gtaskqueue *queue; 12823ac9029SStephen Hurd char *tq_name; 12923ac9029SStephen Hurd 13023ac9029SStephen Hurd tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO); 13123ac9029SStephen Hurd if (!tq_name) 13223ac9029SStephen Hurd return (NULL); 13323ac9029SStephen Hurd 13423ac9029SStephen Hurd snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); 13523ac9029SStephen Hurd 13623ac9029SStephen Hurd queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO); 13723ac9029SStephen Hurd if (!queue) 13823ac9029SStephen Hurd return (NULL); 13923ac9029SStephen Hurd 14023ac9029SStephen Hurd STAILQ_INIT(&queue->tq_queue); 14123ac9029SStephen Hurd TAILQ_INIT(&queue->tq_active); 14223ac9029SStephen Hurd queue->tq_enqueue = enqueue; 14323ac9029SStephen Hurd queue->tq_context = context; 14423ac9029SStephen Hurd queue->tq_name = tq_name; 14523ac9029SStephen Hurd queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 14623ac9029SStephen Hurd queue->tq_flags |= TQ_FLAGS_ACTIVE; 14723ac9029SStephen Hurd if (enqueue == gtaskqueue_thread_enqueue) 14823ac9029SStephen Hurd queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 14923ac9029SStephen Hurd mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); 15023ac9029SStephen Hurd 15123ac9029SStephen Hurd return (queue); 15223ac9029SStephen Hurd } 15323ac9029SStephen Hurd 15423ac9029SStephen Hurd 15523ac9029SStephen Hurd /* 15623ac9029SStephen Hurd * Signal a taskqueue thread to terminate. 15723ac9029SStephen Hurd */ 15823ac9029SStephen Hurd static void 15923ac9029SStephen Hurd gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq) 16023ac9029SStephen Hurd { 16123ac9029SStephen Hurd 16223ac9029SStephen Hurd while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 16323ac9029SStephen Hurd wakeup(tq); 16423ac9029SStephen Hurd TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0); 16523ac9029SStephen Hurd } 16623ac9029SStephen Hurd } 16723ac9029SStephen Hurd 16823ac9029SStephen Hurd static void 16923ac9029SStephen Hurd gtaskqueue_free(struct gtaskqueue *queue) 17023ac9029SStephen Hurd { 17123ac9029SStephen Hurd 17223ac9029SStephen Hurd TQ_LOCK(queue); 17323ac9029SStephen Hurd queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 17423ac9029SStephen Hurd gtaskqueue_terminate(queue->tq_threads, queue); 17523ac9029SStephen Hurd KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?")); 17623ac9029SStephen Hurd KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 17723ac9029SStephen Hurd mtx_destroy(&queue->tq_mutex); 17823ac9029SStephen Hurd free(queue->tq_threads, M_GTASKQUEUE); 17923ac9029SStephen Hurd free(queue->tq_name, M_GTASKQUEUE); 18023ac9029SStephen Hurd free(queue, M_GTASKQUEUE); 18123ac9029SStephen Hurd } 18223ac9029SStephen Hurd 18323ac9029SStephen Hurd int 18423ac9029SStephen Hurd grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask) 18523ac9029SStephen Hurd { 1861248952aSSean Bruno #ifdef INVARIANTS 1871248952aSSean Bruno if (queue == NULL) { 1881248952aSSean Bruno gtask_dump(gtask); 1891248952aSSean Bruno panic("queue == NULL"); 1901248952aSSean Bruno } 1911248952aSSean Bruno #endif 19223ac9029SStephen Hurd TQ_LOCK(queue); 19323ac9029SStephen Hurd if (gtask->ta_flags & TASK_ENQUEUED) { 19423ac9029SStephen Hurd TQ_UNLOCK(queue); 19523ac9029SStephen Hurd return (0); 19623ac9029SStephen Hurd } 19723ac9029SStephen Hurd STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link); 19823ac9029SStephen Hurd gtask->ta_flags |= TASK_ENQUEUED; 19923ac9029SStephen Hurd TQ_UNLOCK(queue); 200ab2e3f79SStephen Hurd if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 20123ac9029SStephen Hurd queue->tq_enqueue(queue->tq_context); 20223ac9029SStephen Hurd return (0); 20323ac9029SStephen Hurd } 20423ac9029SStephen Hurd 20523ac9029SStephen Hurd static void 20623ac9029SStephen Hurd gtaskqueue_task_nop_fn(void *context) 20723ac9029SStephen Hurd { 20823ac9029SStephen Hurd } 20923ac9029SStephen Hurd 21023ac9029SStephen Hurd /* 21123ac9029SStephen Hurd * Block until all currently queued tasks in this taskqueue 21223ac9029SStephen Hurd * have begun execution. Tasks queued during execution of 21323ac9029SStephen Hurd * this function are ignored. 21423ac9029SStephen Hurd */ 21523ac9029SStephen Hurd static void 21623ac9029SStephen Hurd gtaskqueue_drain_tq_queue(struct gtaskqueue *queue) 21723ac9029SStephen Hurd { 21823ac9029SStephen Hurd struct gtask t_barrier; 21923ac9029SStephen Hurd 22023ac9029SStephen Hurd if (STAILQ_EMPTY(&queue->tq_queue)) 22123ac9029SStephen Hurd return; 22223ac9029SStephen Hurd 22323ac9029SStephen Hurd /* 22423ac9029SStephen Hurd * Enqueue our barrier after all current tasks, but with 22523ac9029SStephen Hurd * the highest priority so that newly queued tasks cannot 22623ac9029SStephen Hurd * pass it. Because of the high priority, we can not use 22723ac9029SStephen Hurd * taskqueue_enqueue_locked directly (which drops the lock 22823ac9029SStephen Hurd * anyway) so just insert it at tail while we have the 22923ac9029SStephen Hurd * queue lock. 23023ac9029SStephen Hurd */ 23123ac9029SStephen Hurd GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier); 23223ac9029SStephen Hurd STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); 23323ac9029SStephen Hurd t_barrier.ta_flags |= TASK_ENQUEUED; 23423ac9029SStephen Hurd 23523ac9029SStephen Hurd /* 23623ac9029SStephen Hurd * Once the barrier has executed, all previously queued tasks 23723ac9029SStephen Hurd * have completed or are currently executing. 23823ac9029SStephen Hurd */ 23923ac9029SStephen Hurd while (t_barrier.ta_flags & TASK_ENQUEUED) 24023ac9029SStephen Hurd TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0); 24123ac9029SStephen Hurd } 24223ac9029SStephen Hurd 24323ac9029SStephen Hurd /* 24423ac9029SStephen Hurd * Block until all currently executing tasks for this taskqueue 24523ac9029SStephen Hurd * complete. Tasks that begin execution during the execution 24623ac9029SStephen Hurd * of this function are ignored. 24723ac9029SStephen Hurd */ 24823ac9029SStephen Hurd static void 24923ac9029SStephen Hurd gtaskqueue_drain_tq_active(struct gtaskqueue *queue) 25023ac9029SStephen Hurd { 25123ac9029SStephen Hurd struct gtaskqueue_busy tb_marker, *tb_first; 25223ac9029SStephen Hurd 25323ac9029SStephen Hurd if (TAILQ_EMPTY(&queue->tq_active)) 25423ac9029SStephen Hurd return; 25523ac9029SStephen Hurd 25623ac9029SStephen Hurd /* Block taskq_terminate().*/ 25723ac9029SStephen Hurd queue->tq_callouts++; 25823ac9029SStephen Hurd 25923ac9029SStephen Hurd /* 26023ac9029SStephen Hurd * Wait for all currently executing taskqueue threads 26123ac9029SStephen Hurd * to go idle. 26223ac9029SStephen Hurd */ 26323ac9029SStephen Hurd tb_marker.tb_running = TB_DRAIN_WAITER; 26423ac9029SStephen Hurd TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link); 26523ac9029SStephen Hurd while (TAILQ_FIRST(&queue->tq_active) != &tb_marker) 26623ac9029SStephen Hurd TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0); 26723ac9029SStephen Hurd TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link); 26823ac9029SStephen Hurd 26923ac9029SStephen Hurd /* 27023ac9029SStephen Hurd * Wakeup any other drain waiter that happened to queue up 27123ac9029SStephen Hurd * without any intervening active thread. 27223ac9029SStephen Hurd */ 27323ac9029SStephen Hurd tb_first = TAILQ_FIRST(&queue->tq_active); 27423ac9029SStephen Hurd if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER) 27523ac9029SStephen Hurd wakeup(tb_first); 27623ac9029SStephen Hurd 27723ac9029SStephen Hurd /* Release taskqueue_terminate(). */ 27823ac9029SStephen Hurd queue->tq_callouts--; 27923ac9029SStephen Hurd if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 28023ac9029SStephen Hurd wakeup_one(queue->tq_threads); 28123ac9029SStephen Hurd } 28223ac9029SStephen Hurd 28323ac9029SStephen Hurd void 28423ac9029SStephen Hurd gtaskqueue_block(struct gtaskqueue *queue) 28523ac9029SStephen Hurd { 28623ac9029SStephen Hurd 28723ac9029SStephen Hurd TQ_LOCK(queue); 28823ac9029SStephen Hurd queue->tq_flags |= TQ_FLAGS_BLOCKED; 28923ac9029SStephen Hurd TQ_UNLOCK(queue); 29023ac9029SStephen Hurd } 29123ac9029SStephen Hurd 29223ac9029SStephen Hurd void 29323ac9029SStephen Hurd gtaskqueue_unblock(struct gtaskqueue *queue) 29423ac9029SStephen Hurd { 29523ac9029SStephen Hurd 29623ac9029SStephen Hurd TQ_LOCK(queue); 29723ac9029SStephen Hurd queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 29823ac9029SStephen Hurd if (!STAILQ_EMPTY(&queue->tq_queue)) 29923ac9029SStephen Hurd queue->tq_enqueue(queue->tq_context); 30023ac9029SStephen Hurd TQ_UNLOCK(queue); 30123ac9029SStephen Hurd } 30223ac9029SStephen Hurd 30323ac9029SStephen Hurd static void 30423ac9029SStephen Hurd gtaskqueue_run_locked(struct gtaskqueue *queue) 30523ac9029SStephen Hurd { 30623ac9029SStephen Hurd struct gtaskqueue_busy tb; 30723ac9029SStephen Hurd struct gtaskqueue_busy *tb_first; 30823ac9029SStephen Hurd struct gtask *gtask; 30923ac9029SStephen Hurd 31023ac9029SStephen Hurd KASSERT(queue != NULL, ("tq is NULL")); 31123ac9029SStephen Hurd TQ_ASSERT_LOCKED(queue); 31223ac9029SStephen Hurd tb.tb_running = NULL; 31323ac9029SStephen Hurd 31423ac9029SStephen Hurd while (STAILQ_FIRST(&queue->tq_queue)) { 31523ac9029SStephen Hurd TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); 31623ac9029SStephen Hurd 31723ac9029SStephen Hurd /* 31823ac9029SStephen Hurd * Carefully remove the first task from the queue and 31923ac9029SStephen Hurd * clear its TASK_ENQUEUED flag 32023ac9029SStephen Hurd */ 32123ac9029SStephen Hurd gtask = STAILQ_FIRST(&queue->tq_queue); 32223ac9029SStephen Hurd KASSERT(gtask != NULL, ("task is NULL")); 32323ac9029SStephen Hurd STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 32423ac9029SStephen Hurd gtask->ta_flags &= ~TASK_ENQUEUED; 32523ac9029SStephen Hurd tb.tb_running = gtask; 32623ac9029SStephen Hurd TQ_UNLOCK(queue); 32723ac9029SStephen Hurd 32823ac9029SStephen Hurd KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL")); 32923ac9029SStephen Hurd gtask->ta_func(gtask->ta_context); 33023ac9029SStephen Hurd 33123ac9029SStephen Hurd TQ_LOCK(queue); 33223ac9029SStephen Hurd tb.tb_running = NULL; 33323ac9029SStephen Hurd wakeup(gtask); 33423ac9029SStephen Hurd 33523ac9029SStephen Hurd TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); 33623ac9029SStephen Hurd tb_first = TAILQ_FIRST(&queue->tq_active); 33723ac9029SStephen Hurd if (tb_first != NULL && 33823ac9029SStephen Hurd tb_first->tb_running == TB_DRAIN_WAITER) 33923ac9029SStephen Hurd wakeup(tb_first); 34023ac9029SStephen Hurd } 34123ac9029SStephen Hurd } 34223ac9029SStephen Hurd 34323ac9029SStephen Hurd static int 34423ac9029SStephen Hurd task_is_running(struct gtaskqueue *queue, struct gtask *gtask) 34523ac9029SStephen Hurd { 34623ac9029SStephen Hurd struct gtaskqueue_busy *tb; 34723ac9029SStephen Hurd 34823ac9029SStephen Hurd TQ_ASSERT_LOCKED(queue); 34923ac9029SStephen Hurd TAILQ_FOREACH(tb, &queue->tq_active, tb_link) { 35023ac9029SStephen Hurd if (tb->tb_running == gtask) 35123ac9029SStephen Hurd return (1); 35223ac9029SStephen Hurd } 35323ac9029SStephen Hurd return (0); 35423ac9029SStephen Hurd } 35523ac9029SStephen Hurd 35623ac9029SStephen Hurd static int 35723ac9029SStephen Hurd gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask) 35823ac9029SStephen Hurd { 35923ac9029SStephen Hurd 36023ac9029SStephen Hurd if (gtask->ta_flags & TASK_ENQUEUED) 36123ac9029SStephen Hurd STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link); 36223ac9029SStephen Hurd gtask->ta_flags &= ~TASK_ENQUEUED; 36323ac9029SStephen Hurd return (task_is_running(queue, gtask) ? EBUSY : 0); 36423ac9029SStephen Hurd } 36523ac9029SStephen Hurd 36623ac9029SStephen Hurd int 36723ac9029SStephen Hurd gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask) 36823ac9029SStephen Hurd { 36923ac9029SStephen Hurd int error; 37023ac9029SStephen Hurd 37123ac9029SStephen Hurd TQ_LOCK(queue); 37223ac9029SStephen Hurd error = gtaskqueue_cancel_locked(queue, gtask); 37323ac9029SStephen Hurd TQ_UNLOCK(queue); 37423ac9029SStephen Hurd 37523ac9029SStephen Hurd return (error); 37623ac9029SStephen Hurd } 37723ac9029SStephen Hurd 37823ac9029SStephen Hurd void 37923ac9029SStephen Hurd gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask) 38023ac9029SStephen Hurd { 38123ac9029SStephen Hurd 38223ac9029SStephen Hurd if (!queue->tq_spin) 38323ac9029SStephen Hurd WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 38423ac9029SStephen Hurd 38523ac9029SStephen Hurd TQ_LOCK(queue); 38623ac9029SStephen Hurd while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask)) 38723ac9029SStephen Hurd TQ_SLEEP(queue, gtask, &queue->tq_mutex, PWAIT, "-", 0); 38823ac9029SStephen Hurd TQ_UNLOCK(queue); 38923ac9029SStephen Hurd } 39023ac9029SStephen Hurd 39123ac9029SStephen Hurd void 39223ac9029SStephen Hurd gtaskqueue_drain_all(struct gtaskqueue *queue) 39323ac9029SStephen Hurd { 39423ac9029SStephen Hurd 39523ac9029SStephen Hurd if (!queue->tq_spin) 39623ac9029SStephen Hurd WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 39723ac9029SStephen Hurd 39823ac9029SStephen Hurd TQ_LOCK(queue); 39923ac9029SStephen Hurd gtaskqueue_drain_tq_queue(queue); 40023ac9029SStephen Hurd gtaskqueue_drain_tq_active(queue); 40123ac9029SStephen Hurd TQ_UNLOCK(queue); 40223ac9029SStephen Hurd } 40323ac9029SStephen Hurd 40423ac9029SStephen Hurd static int 40523ac9029SStephen Hurd _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, 406ab2e3f79SStephen Hurd cpuset_t *mask, const char *name, va_list ap) 40723ac9029SStephen Hurd { 40823ac9029SStephen Hurd char ktname[MAXCOMLEN + 1]; 40923ac9029SStephen Hurd struct thread *td; 41023ac9029SStephen Hurd struct gtaskqueue *tq; 41123ac9029SStephen Hurd int i, error; 41223ac9029SStephen Hurd 41323ac9029SStephen Hurd if (count <= 0) 41423ac9029SStephen Hurd return (EINVAL); 41523ac9029SStephen Hurd 41623ac9029SStephen Hurd vsnprintf(ktname, sizeof(ktname), name, ap); 41723ac9029SStephen Hurd tq = *tqp; 41823ac9029SStephen Hurd 41923ac9029SStephen Hurd tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE, 42023ac9029SStephen Hurd M_NOWAIT | M_ZERO); 42123ac9029SStephen Hurd if (tq->tq_threads == NULL) { 42223ac9029SStephen Hurd printf("%s: no memory for %s threads\n", __func__, ktname); 42323ac9029SStephen Hurd return (ENOMEM); 42423ac9029SStephen Hurd } 42523ac9029SStephen Hurd 42623ac9029SStephen Hurd for (i = 0; i < count; i++) { 42723ac9029SStephen Hurd if (count == 1) 42823ac9029SStephen Hurd error = kthread_add(gtaskqueue_thread_loop, tqp, NULL, 42923ac9029SStephen Hurd &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 43023ac9029SStephen Hurd else 43123ac9029SStephen Hurd error = kthread_add(gtaskqueue_thread_loop, tqp, NULL, 43223ac9029SStephen Hurd &tq->tq_threads[i], RFSTOPPED, 0, 43323ac9029SStephen Hurd "%s_%d", ktname, i); 43423ac9029SStephen Hurd if (error) { 43523ac9029SStephen Hurd /* should be ok to continue, taskqueue_free will dtrt */ 43623ac9029SStephen Hurd printf("%s: kthread_add(%s): error %d", __func__, 43723ac9029SStephen Hurd ktname, error); 43823ac9029SStephen Hurd tq->tq_threads[i] = NULL; /* paranoid */ 43923ac9029SStephen Hurd } else 44023ac9029SStephen Hurd tq->tq_tcount++; 44123ac9029SStephen Hurd } 44223ac9029SStephen Hurd for (i = 0; i < count; i++) { 44323ac9029SStephen Hurd if (tq->tq_threads[i] == NULL) 44423ac9029SStephen Hurd continue; 44523ac9029SStephen Hurd td = tq->tq_threads[i]; 44623ac9029SStephen Hurd if (mask) { 44723ac9029SStephen Hurd error = cpuset_setthread(td->td_tid, mask); 44823ac9029SStephen Hurd /* 44923ac9029SStephen Hurd * Failing to pin is rarely an actual fatal error; 45023ac9029SStephen Hurd * it'll just affect performance. 45123ac9029SStephen Hurd */ 45223ac9029SStephen Hurd if (error) 45323ac9029SStephen Hurd printf("%s: curthread=%llu: can't pin; " 45423ac9029SStephen Hurd "error=%d\n", 45523ac9029SStephen Hurd __func__, 45623ac9029SStephen Hurd (unsigned long long) td->td_tid, 45723ac9029SStephen Hurd error); 45823ac9029SStephen Hurd } 45923ac9029SStephen Hurd thread_lock(td); 46023ac9029SStephen Hurd sched_prio(td, pri); 46123ac9029SStephen Hurd sched_add(td, SRQ_BORING); 46223ac9029SStephen Hurd thread_unlock(td); 46323ac9029SStephen Hurd } 46423ac9029SStephen Hurd 46523ac9029SStephen Hurd return (0); 46623ac9029SStephen Hurd } 46723ac9029SStephen Hurd 46823ac9029SStephen Hurd static int 46923ac9029SStephen Hurd gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, 470ab2e3f79SStephen Hurd const char *name, ...) 47123ac9029SStephen Hurd { 47223ac9029SStephen Hurd va_list ap; 47323ac9029SStephen Hurd int error; 47423ac9029SStephen Hurd 47523ac9029SStephen Hurd va_start(ap, name); 476ab2e3f79SStephen Hurd error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap); 47723ac9029SStephen Hurd va_end(ap); 47823ac9029SStephen Hurd return (error); 47923ac9029SStephen Hurd } 48023ac9029SStephen Hurd 48123ac9029SStephen Hurd static inline void 48223ac9029SStephen Hurd gtaskqueue_run_callback(struct gtaskqueue *tq, 48323ac9029SStephen Hurd enum taskqueue_callback_type cb_type) 48423ac9029SStephen Hurd { 48523ac9029SStephen Hurd taskqueue_callback_fn tq_callback; 48623ac9029SStephen Hurd 48723ac9029SStephen Hurd TQ_ASSERT_UNLOCKED(tq); 48823ac9029SStephen Hurd tq_callback = tq->tq_callbacks[cb_type]; 48923ac9029SStephen Hurd if (tq_callback != NULL) 49023ac9029SStephen Hurd tq_callback(tq->tq_cb_contexts[cb_type]); 49123ac9029SStephen Hurd } 49223ac9029SStephen Hurd 49323ac9029SStephen Hurd static void 494ab2e3f79SStephen Hurd gtaskqueue_thread_loop(void *arg) 49523ac9029SStephen Hurd { 496ab2e3f79SStephen Hurd struct gtaskqueue **tqp, *tq; 49723ac9029SStephen Hurd 498ab2e3f79SStephen Hurd tqp = arg; 499ab2e3f79SStephen Hurd tq = *tqp; 500ab2e3f79SStephen Hurd gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 501d300df01SStephen Hurd TQ_LOCK(tq); 502d300df01SStephen Hurd while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 503ab2e3f79SStephen Hurd /* XXX ? */ 50423ac9029SStephen Hurd gtaskqueue_run_locked(tq); 50523ac9029SStephen Hurd /* 50623ac9029SStephen Hurd * Because taskqueue_run() can drop tq_mutex, we need to 50723ac9029SStephen Hurd * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 50823ac9029SStephen Hurd * meantime, which means we missed a wakeup. 50923ac9029SStephen Hurd */ 51023ac9029SStephen Hurd if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 51123ac9029SStephen Hurd break; 51223ac9029SStephen Hurd TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0); 51323ac9029SStephen Hurd } 51423ac9029SStephen Hurd gtaskqueue_run_locked(tq); 51523ac9029SStephen Hurd /* 51623ac9029SStephen Hurd * This thread is on its way out, so just drop the lock temporarily 51723ac9029SStephen Hurd * in order to call the shutdown callback. This allows the callback 51823ac9029SStephen Hurd * to look at the taskqueue, even just before it dies. 51923ac9029SStephen Hurd */ 52023ac9029SStephen Hurd TQ_UNLOCK(tq); 52123ac9029SStephen Hurd gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 52223ac9029SStephen Hurd TQ_LOCK(tq); 52323ac9029SStephen Hurd 52423ac9029SStephen Hurd /* rendezvous with thread that asked us to terminate */ 52523ac9029SStephen Hurd tq->tq_tcount--; 52623ac9029SStephen Hurd wakeup_one(tq->tq_threads); 52723ac9029SStephen Hurd TQ_UNLOCK(tq); 52823ac9029SStephen Hurd kthread_exit(); 52923ac9029SStephen Hurd } 53023ac9029SStephen Hurd 53123ac9029SStephen Hurd static void 53223ac9029SStephen Hurd gtaskqueue_thread_enqueue(void *context) 53323ac9029SStephen Hurd { 53423ac9029SStephen Hurd struct gtaskqueue **tqp, *tq; 53523ac9029SStephen Hurd 53623ac9029SStephen Hurd tqp = context; 53723ac9029SStephen Hurd tq = *tqp; 53823ac9029SStephen Hurd wakeup_one(tq); 53923ac9029SStephen Hurd } 54023ac9029SStephen Hurd 54123ac9029SStephen Hurd 54223ac9029SStephen Hurd static struct gtaskqueue * 54323ac9029SStephen Hurd gtaskqueue_create_fast(const char *name, int mflags, 54423ac9029SStephen Hurd taskqueue_enqueue_fn enqueue, void *context) 54523ac9029SStephen Hurd { 54623ac9029SStephen Hurd return _gtaskqueue_create(name, mflags, enqueue, context, 54723ac9029SStephen Hurd MTX_SPIN, "fast_taskqueue"); 54823ac9029SStephen Hurd } 54923ac9029SStephen Hurd 55023ac9029SStephen Hurd 55123ac9029SStephen Hurd struct taskqgroup_cpu { 55223ac9029SStephen Hurd LIST_HEAD(, grouptask) tgc_tasks; 55323ac9029SStephen Hurd struct gtaskqueue *tgc_taskq; 55423ac9029SStephen Hurd int tgc_cnt; 55523ac9029SStephen Hurd int tgc_cpu; 55623ac9029SStephen Hurd }; 55723ac9029SStephen Hurd 55823ac9029SStephen Hurd struct taskqgroup { 55923ac9029SStephen Hurd struct taskqgroup_cpu tqg_queue[MAXCPU]; 56023ac9029SStephen Hurd struct mtx tqg_lock; 56123ac9029SStephen Hurd char * tqg_name; 56223ac9029SStephen Hurd int tqg_adjusting; 56323ac9029SStephen Hurd int tqg_stride; 56423ac9029SStephen Hurd int tqg_cnt; 56523ac9029SStephen Hurd }; 56623ac9029SStephen Hurd 56723ac9029SStephen Hurd struct taskq_bind_task { 56823ac9029SStephen Hurd struct gtask bt_task; 56923ac9029SStephen Hurd int bt_cpuid; 57023ac9029SStephen Hurd }; 57123ac9029SStephen Hurd 57223ac9029SStephen Hurd static void 573ab2e3f79SStephen Hurd taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx, int cpu) 57423ac9029SStephen Hurd { 57523ac9029SStephen Hurd struct taskqgroup_cpu *qcpu; 57623ac9029SStephen Hurd 57723ac9029SStephen Hurd qcpu = &qgroup->tqg_queue[idx]; 57823ac9029SStephen Hurd LIST_INIT(&qcpu->tgc_tasks); 579ab2e3f79SStephen Hurd qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK, 58023ac9029SStephen Hurd taskqueue_thread_enqueue, &qcpu->tgc_taskq); 581ab2e3f79SStephen Hurd gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT, 582ab2e3f79SStephen Hurd "%s_%d", qgroup->tqg_name, idx); 58312d1b8c9SSean Bruno qcpu->tgc_cpu = cpu; 58423ac9029SStephen Hurd } 58523ac9029SStephen Hurd 58623ac9029SStephen Hurd static void 58723ac9029SStephen Hurd taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx) 58823ac9029SStephen Hurd { 58923ac9029SStephen Hurd 59023ac9029SStephen Hurd gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq); 59123ac9029SStephen Hurd } 59223ac9029SStephen Hurd 59323ac9029SStephen Hurd /* 59423ac9029SStephen Hurd * Find the taskq with least # of tasks that doesn't currently have any 59523ac9029SStephen Hurd * other queues from the uniq identifier. 59623ac9029SStephen Hurd */ 59723ac9029SStephen Hurd static int 59823ac9029SStephen Hurd taskqgroup_find(struct taskqgroup *qgroup, void *uniq) 59923ac9029SStephen Hurd { 60023ac9029SStephen Hurd struct grouptask *n; 60123ac9029SStephen Hurd int i, idx, mincnt; 60223ac9029SStephen Hurd int strict; 60323ac9029SStephen Hurd 60423ac9029SStephen Hurd mtx_assert(&qgroup->tqg_lock, MA_OWNED); 60523ac9029SStephen Hurd if (qgroup->tqg_cnt == 0) 60623ac9029SStephen Hurd return (0); 60723ac9029SStephen Hurd idx = -1; 60823ac9029SStephen Hurd mincnt = INT_MAX; 60923ac9029SStephen Hurd /* 61023ac9029SStephen Hurd * Two passes; First scan for a queue with the least tasks that 61123ac9029SStephen Hurd * does not already service this uniq id. If that fails simply find 61223ac9029SStephen Hurd * the queue with the least total tasks; 61323ac9029SStephen Hurd */ 61423ac9029SStephen Hurd for (strict = 1; mincnt == INT_MAX; strict = 0) { 61523ac9029SStephen Hurd for (i = 0; i < qgroup->tqg_cnt; i++) { 61623ac9029SStephen Hurd if (qgroup->tqg_queue[i].tgc_cnt > mincnt) 61723ac9029SStephen Hurd continue; 61823ac9029SStephen Hurd if (strict) { 61923ac9029SStephen Hurd LIST_FOREACH(n, 62023ac9029SStephen Hurd &qgroup->tqg_queue[i].tgc_tasks, gt_list) 62123ac9029SStephen Hurd if (n->gt_uniq == uniq) 62223ac9029SStephen Hurd break; 62323ac9029SStephen Hurd if (n != NULL) 62423ac9029SStephen Hurd continue; 62523ac9029SStephen Hurd } 62623ac9029SStephen Hurd mincnt = qgroup->tqg_queue[i].tgc_cnt; 62723ac9029SStephen Hurd idx = i; 62823ac9029SStephen Hurd } 62923ac9029SStephen Hurd } 63023ac9029SStephen Hurd if (idx == -1) 63123ac9029SStephen Hurd panic("taskqgroup_find: Failed to pick a qid."); 63223ac9029SStephen Hurd 63323ac9029SStephen Hurd return (idx); 63423ac9029SStephen Hurd } 635de414cfeSSean Bruno 636bd84f700SSean Bruno /* 637bd84f700SSean Bruno * smp_started is unusable since it is not set for UP kernels or even for 638bd84f700SSean Bruno * SMP kernels when there is 1 CPU. This is usually handled by adding a 639bd84f700SSean Bruno * (mp_ncpus == 1) test, but that would be broken here since we need to 640bd84f700SSean Bruno * to synchronize with the SI_SUB_SMP ordering. Even in the pure SMP case 641bd84f700SSean Bruno * smp_started only gives a fuzzy ordering relative to SI_SUB_SMP. 642bd84f700SSean Bruno * 643bd84f700SSean Bruno * So maintain our own flag. It must be set after all CPUs are started 644bd84f700SSean Bruno * and before SI_SUB_SMP:SI_ORDER_ANY so that the SYSINIT for delayed 645bd84f700SSean Bruno * adjustment is properly delayed. SI_ORDER_FOURTH is clearly before 646bd84f700SSean Bruno * SI_ORDER_ANY and unclearly after the CPUs are started. It would be 647bd84f700SSean Bruno * simpler for adjustment to pass a flag indicating if it is delayed. 648bd84f700SSean Bruno */ 649de414cfeSSean Bruno 650bd84f700SSean Bruno static int tqg_smp_started; 651bd84f700SSean Bruno 652bd84f700SSean Bruno static void 653bd84f700SSean Bruno tqg_record_smp_started(void *arg) 654bd84f700SSean Bruno { 655bd84f700SSean Bruno tqg_smp_started = 1; 656bd84f700SSean Bruno } 657bd84f700SSean Bruno 658bd84f700SSean Bruno SYSINIT(tqg_record_smp_started, SI_SUB_SMP, SI_ORDER_FOURTH, 659bd84f700SSean Bruno tqg_record_smp_started, NULL); 66023ac9029SStephen Hurd 66123ac9029SStephen Hurd void 66223ac9029SStephen Hurd taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask, 66323ac9029SStephen Hurd void *uniq, int irq, char *name) 66423ac9029SStephen Hurd { 66523ac9029SStephen Hurd cpuset_t mask; 666326aacb0SStephen Hurd int qid, error; 66723ac9029SStephen Hurd 66823ac9029SStephen Hurd gtask->gt_uniq = uniq; 66923ac9029SStephen Hurd gtask->gt_name = name; 67023ac9029SStephen Hurd gtask->gt_irq = irq; 67123ac9029SStephen Hurd gtask->gt_cpu = -1; 67223ac9029SStephen Hurd mtx_lock(&qgroup->tqg_lock); 67323ac9029SStephen Hurd qid = taskqgroup_find(qgroup, uniq); 67423ac9029SStephen Hurd qgroup->tqg_queue[qid].tgc_cnt++; 67523ac9029SStephen Hurd LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 67623ac9029SStephen Hurd gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 677bd84f700SSean Bruno if (irq != -1 && tqg_smp_started) { 67812d1b8c9SSean Bruno gtask->gt_cpu = qgroup->tqg_queue[qid].tgc_cpu; 67923ac9029SStephen Hurd CPU_ZERO(&mask); 68023ac9029SStephen Hurd CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); 68123ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 682326aacb0SStephen Hurd error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask); 683326aacb0SStephen Hurd if (error) 684326aacb0SStephen Hurd printf("%s: setaffinity failed: %d\n", __func__, error); 68523ac9029SStephen Hurd } else 68623ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 68723ac9029SStephen Hurd } 68823ac9029SStephen Hurd 68912d1b8c9SSean Bruno static void 69012d1b8c9SSean Bruno taskqgroup_attach_deferred(struct taskqgroup *qgroup, struct grouptask *gtask) 69112d1b8c9SSean Bruno { 69212d1b8c9SSean Bruno cpuset_t mask; 693326aacb0SStephen Hurd int qid, cpu, error; 69412d1b8c9SSean Bruno 69512d1b8c9SSean Bruno mtx_lock(&qgroup->tqg_lock); 69612d1b8c9SSean Bruno qid = taskqgroup_find(qgroup, gtask->gt_uniq); 69712d1b8c9SSean Bruno cpu = qgroup->tqg_queue[qid].tgc_cpu; 69812d1b8c9SSean Bruno if (gtask->gt_irq != -1) { 69912d1b8c9SSean Bruno mtx_unlock(&qgroup->tqg_lock); 70012d1b8c9SSean Bruno 70112d1b8c9SSean Bruno CPU_ZERO(&mask); 70212d1b8c9SSean Bruno CPU_SET(cpu, &mask); 703326aacb0SStephen Hurd error = intr_setaffinity(gtask->gt_irq, CPU_WHICH_IRQ, &mask); 70412d1b8c9SSean Bruno mtx_lock(&qgroup->tqg_lock); 705326aacb0SStephen Hurd if (error) 706326aacb0SStephen Hurd printf("%s: setaffinity failed: %d\n", __func__, error); 707326aacb0SStephen Hurd 70812d1b8c9SSean Bruno } 70912d1b8c9SSean Bruno qgroup->tqg_queue[qid].tgc_cnt++; 71012d1b8c9SSean Bruno 71112d1b8c9SSean Bruno LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, 71212d1b8c9SSean Bruno gt_list); 713abf38392SSean Bruno MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL); 71412d1b8c9SSean Bruno gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 71512d1b8c9SSean Bruno mtx_unlock(&qgroup->tqg_lock); 71612d1b8c9SSean Bruno } 71712d1b8c9SSean Bruno 71823ac9029SStephen Hurd int 71923ac9029SStephen Hurd taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask, 72023ac9029SStephen Hurd void *uniq, int cpu, int irq, char *name) 72123ac9029SStephen Hurd { 72223ac9029SStephen Hurd cpuset_t mask; 723326aacb0SStephen Hurd int i, qid, error; 72423ac9029SStephen Hurd 72523ac9029SStephen Hurd qid = -1; 72623ac9029SStephen Hurd gtask->gt_uniq = uniq; 72723ac9029SStephen Hurd gtask->gt_name = name; 72823ac9029SStephen Hurd gtask->gt_irq = irq; 72923ac9029SStephen Hurd gtask->gt_cpu = cpu; 73023ac9029SStephen Hurd mtx_lock(&qgroup->tqg_lock); 731bd84f700SSean Bruno if (tqg_smp_started) { 732ab2e3f79SStephen Hurd for (i = 0; i < qgroup->tqg_cnt; i++) 73323ac9029SStephen Hurd if (qgroup->tqg_queue[i].tgc_cpu == cpu) { 73423ac9029SStephen Hurd qid = i; 73523ac9029SStephen Hurd break; 73623ac9029SStephen Hurd } 73723ac9029SStephen Hurd if (qid == -1) { 73823ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 739326aacb0SStephen Hurd printf("%s: qid not found for %s cpu=%d\n", __func__, name, cpu); 74023ac9029SStephen Hurd return (EINVAL); 74123ac9029SStephen Hurd } 74223ac9029SStephen Hurd } else 74323ac9029SStephen Hurd qid = 0; 74423ac9029SStephen Hurd qgroup->tqg_queue[qid].tgc_cnt++; 74523ac9029SStephen Hurd LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 74623ac9029SStephen Hurd gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 74712d1b8c9SSean Bruno cpu = qgroup->tqg_queue[qid].tgc_cpu; 74812d1b8c9SSean Bruno mtx_unlock(&qgroup->tqg_lock); 74912d1b8c9SSean Bruno 75023ac9029SStephen Hurd CPU_ZERO(&mask); 75112d1b8c9SSean Bruno CPU_SET(cpu, &mask); 752326aacb0SStephen Hurd if (irq != -1 && tqg_smp_started) { 753326aacb0SStephen Hurd error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask); 754326aacb0SStephen Hurd if (error) 755326aacb0SStephen Hurd printf("%s: setaffinity failed: %d\n", __func__, error); 756326aacb0SStephen Hurd } 75712d1b8c9SSean Bruno return (0); 75812d1b8c9SSean Bruno } 75912d1b8c9SSean Bruno 76012d1b8c9SSean Bruno static int 76112d1b8c9SSean Bruno taskqgroup_attach_cpu_deferred(struct taskqgroup *qgroup, struct grouptask *gtask) 76212d1b8c9SSean Bruno { 76312d1b8c9SSean Bruno cpuset_t mask; 764326aacb0SStephen Hurd int i, qid, irq, cpu, error; 76512d1b8c9SSean Bruno 76612d1b8c9SSean Bruno qid = -1; 76712d1b8c9SSean Bruno irq = gtask->gt_irq; 76812d1b8c9SSean Bruno cpu = gtask->gt_cpu; 769bd84f700SSean Bruno MPASS(tqg_smp_started); 77012d1b8c9SSean Bruno mtx_lock(&qgroup->tqg_lock); 77112d1b8c9SSean Bruno for (i = 0; i < qgroup->tqg_cnt; i++) 77212d1b8c9SSean Bruno if (qgroup->tqg_queue[i].tgc_cpu == cpu) { 77312d1b8c9SSean Bruno qid = i; 77412d1b8c9SSean Bruno break; 77512d1b8c9SSean Bruno } 77612d1b8c9SSean Bruno if (qid == -1) { 77723ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 778326aacb0SStephen Hurd printf("%s: qid not found for %s cpu=%d\n", __func__, name, cpu); 77912d1b8c9SSean Bruno return (EINVAL); 78012d1b8c9SSean Bruno } 78112d1b8c9SSean Bruno qgroup->tqg_queue[qid].tgc_cnt++; 78212d1b8c9SSean Bruno LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 783abf38392SSean Bruno MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL); 78412d1b8c9SSean Bruno gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 78512d1b8c9SSean Bruno mtx_unlock(&qgroup->tqg_lock); 78612d1b8c9SSean Bruno 78712d1b8c9SSean Bruno CPU_ZERO(&mask); 78812d1b8c9SSean Bruno CPU_SET(cpu, &mask); 78912d1b8c9SSean Bruno 790326aacb0SStephen Hurd if (irq != -1) { 791326aacb0SStephen Hurd error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask); 792326aacb0SStephen Hurd if (error) 793326aacb0SStephen Hurd printf("%s: setaffinity failed: %d\n", __func__, error); 794326aacb0SStephen Hurd } 79523ac9029SStephen Hurd return (0); 79623ac9029SStephen Hurd } 79723ac9029SStephen Hurd 79823ac9029SStephen Hurd void 79923ac9029SStephen Hurd taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask) 80023ac9029SStephen Hurd { 80123ac9029SStephen Hurd int i; 80223ac9029SStephen Hurd 80323ac9029SStephen Hurd mtx_lock(&qgroup->tqg_lock); 80423ac9029SStephen Hurd for (i = 0; i < qgroup->tqg_cnt; i++) 80523ac9029SStephen Hurd if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue) 80623ac9029SStephen Hurd break; 80723ac9029SStephen Hurd if (i == qgroup->tqg_cnt) 80823ac9029SStephen Hurd panic("taskqgroup_detach: task not in group\n"); 80923ac9029SStephen Hurd qgroup->tqg_queue[i].tgc_cnt--; 81023ac9029SStephen Hurd LIST_REMOVE(gtask, gt_list); 81123ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 81223ac9029SStephen Hurd gtask->gt_taskqueue = NULL; 81323ac9029SStephen Hurd } 81423ac9029SStephen Hurd 81523ac9029SStephen Hurd static void 81623ac9029SStephen Hurd taskqgroup_binder(void *ctx) 81723ac9029SStephen Hurd { 81823ac9029SStephen Hurd struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx; 81923ac9029SStephen Hurd cpuset_t mask; 82023ac9029SStephen Hurd int error; 82123ac9029SStephen Hurd 82223ac9029SStephen Hurd CPU_ZERO(&mask); 82323ac9029SStephen Hurd CPU_SET(gtask->bt_cpuid, &mask); 82423ac9029SStephen Hurd error = cpuset_setthread(curthread->td_tid, &mask); 82523ac9029SStephen Hurd thread_lock(curthread); 82623ac9029SStephen Hurd sched_bind(curthread, gtask->bt_cpuid); 82723ac9029SStephen Hurd thread_unlock(curthread); 82823ac9029SStephen Hurd 82923ac9029SStephen Hurd if (error) 830326aacb0SStephen Hurd printf("%s: setaffinity failed: %d\n", __func__, 83123ac9029SStephen Hurd error); 83223ac9029SStephen Hurd free(gtask, M_DEVBUF); 833d300df01SStephen Hurd } 834d300df01SStephen Hurd 83523ac9029SStephen Hurd static void 83623ac9029SStephen Hurd taskqgroup_bind(struct taskqgroup *qgroup) 83723ac9029SStephen Hurd { 83823ac9029SStephen Hurd struct taskq_bind_task *gtask; 83923ac9029SStephen Hurd int i; 84023ac9029SStephen Hurd 84123ac9029SStephen Hurd /* 84223ac9029SStephen Hurd * Bind taskqueue threads to specific CPUs, if they have been assigned 84323ac9029SStephen Hurd * one. 84423ac9029SStephen Hurd */ 845026204b4SSean Bruno if (qgroup->tqg_cnt == 1) 846026204b4SSean Bruno return; 847026204b4SSean Bruno 84823ac9029SStephen Hurd for (i = 0; i < qgroup->tqg_cnt; i++) { 8491ee17b07SSean Bruno gtask = malloc(sizeof (*gtask), M_DEVBUF, M_WAITOK); 85023ac9029SStephen Hurd GTASK_INIT(>ask->bt_task, 0, 0, taskqgroup_binder, gtask); 85123ac9029SStephen Hurd gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu; 85223ac9029SStephen Hurd grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq, 85323ac9029SStephen Hurd >ask->bt_task); 85423ac9029SStephen Hurd } 85523ac9029SStephen Hurd } 85623ac9029SStephen Hurd 85723ac9029SStephen Hurd static int 858ab2e3f79SStephen Hurd _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride) 85923ac9029SStephen Hurd { 86023ac9029SStephen Hurd LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL); 86123ac9029SStephen Hurd struct grouptask *gtask; 86212d1b8c9SSean Bruno int i, k, old_cnt, old_cpu, cpu; 86323ac9029SStephen Hurd 86423ac9029SStephen Hurd mtx_assert(&qgroup->tqg_lock, MA_OWNED); 86523ac9029SStephen Hurd 866bd84f700SSean Bruno if (cnt < 1 || cnt * stride > mp_ncpus || !tqg_smp_started) { 867bd84f700SSean Bruno printf("%s: failed cnt: %d stride: %d " 86806bb7c50SSean Bruno "mp_ncpus: %d tqg_smp_started: %d\n", 86906bb7c50SSean Bruno __func__, cnt, stride, mp_ncpus, tqg_smp_started); 87023ac9029SStephen Hurd return (EINVAL); 87123ac9029SStephen Hurd } 87223ac9029SStephen Hurd if (qgroup->tqg_adjusting) { 873326aacb0SStephen Hurd printf("%s failed: adjusting\n", __func__); 87423ac9029SStephen Hurd return (EBUSY); 87523ac9029SStephen Hurd } 87623ac9029SStephen Hurd qgroup->tqg_adjusting = 1; 87723ac9029SStephen Hurd old_cnt = qgroup->tqg_cnt; 87812d1b8c9SSean Bruno old_cpu = 0; 879ab2e3f79SStephen Hurd if (old_cnt < cnt) 880ab2e3f79SStephen Hurd old_cpu = qgroup->tqg_queue[old_cnt].tgc_cpu; 88123ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 88223ac9029SStephen Hurd /* 88323ac9029SStephen Hurd * Set up queue for tasks added before boot. 88423ac9029SStephen Hurd */ 88523ac9029SStephen Hurd if (old_cnt == 0) { 88623ac9029SStephen Hurd LIST_SWAP(>ask_head, &qgroup->tqg_queue[0].tgc_tasks, 88723ac9029SStephen Hurd grouptask, gt_list); 88823ac9029SStephen Hurd qgroup->tqg_queue[0].tgc_cnt = 0; 88923ac9029SStephen Hurd } 89023ac9029SStephen Hurd 89123ac9029SStephen Hurd /* 89223ac9029SStephen Hurd * If new taskq threads have been added. 89323ac9029SStephen Hurd */ 89412d1b8c9SSean Bruno cpu = old_cpu; 89512d1b8c9SSean Bruno for (i = old_cnt; i < cnt; i++) { 896ab2e3f79SStephen Hurd taskqgroup_cpu_create(qgroup, i, cpu); 897abf38392SSean Bruno 898abf38392SSean Bruno for (k = 0; k < stride; k++) 899abf38392SSean Bruno cpu = CPU_NEXT(cpu); 90012d1b8c9SSean Bruno } 90123ac9029SStephen Hurd mtx_lock(&qgroup->tqg_lock); 90223ac9029SStephen Hurd qgroup->tqg_cnt = cnt; 90323ac9029SStephen Hurd qgroup->tqg_stride = stride; 90423ac9029SStephen Hurd 90523ac9029SStephen Hurd /* 90623ac9029SStephen Hurd * Adjust drivers to use new taskqs. 90723ac9029SStephen Hurd */ 90823ac9029SStephen Hurd for (i = 0; i < old_cnt; i++) { 90923ac9029SStephen Hurd while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) { 91023ac9029SStephen Hurd LIST_REMOVE(gtask, gt_list); 91123ac9029SStephen Hurd qgroup->tqg_queue[i].tgc_cnt--; 91223ac9029SStephen Hurd LIST_INSERT_HEAD(>ask_head, gtask, gt_list); 91323ac9029SStephen Hurd } 91423ac9029SStephen Hurd } 91512d1b8c9SSean Bruno mtx_unlock(&qgroup->tqg_lock); 91612d1b8c9SSean Bruno 91723ac9029SStephen Hurd while ((gtask = LIST_FIRST(>ask_head))) { 91823ac9029SStephen Hurd LIST_REMOVE(gtask, gt_list); 91923ac9029SStephen Hurd if (gtask->gt_cpu == -1) 92012d1b8c9SSean Bruno taskqgroup_attach_deferred(qgroup, gtask); 92112d1b8c9SSean Bruno else if (taskqgroup_attach_cpu_deferred(qgroup, gtask)) 92212d1b8c9SSean Bruno taskqgroup_attach_deferred(qgroup, gtask); 92323ac9029SStephen Hurd } 92423ac9029SStephen Hurd 925abf38392SSean Bruno #ifdef INVARIANTS 926abf38392SSean Bruno mtx_lock(&qgroup->tqg_lock); 927abf38392SSean Bruno for (i = 0; i < qgroup->tqg_cnt; i++) { 928abf38392SSean Bruno MPASS(qgroup->tqg_queue[i].tgc_taskq != NULL); 929abf38392SSean Bruno LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) 930abf38392SSean Bruno MPASS(gtask->gt_taskqueue != NULL); 931abf38392SSean Bruno } 932abf38392SSean Bruno mtx_unlock(&qgroup->tqg_lock); 933abf38392SSean Bruno #endif 93423ac9029SStephen Hurd /* 93523ac9029SStephen Hurd * If taskq thread count has been reduced. 93623ac9029SStephen Hurd */ 93723ac9029SStephen Hurd for (i = cnt; i < old_cnt; i++) 93823ac9029SStephen Hurd taskqgroup_cpu_remove(qgroup, i); 93923ac9029SStephen Hurd 9401ee17b07SSean Bruno taskqgroup_bind(qgroup); 9411ee17b07SSean Bruno 94223ac9029SStephen Hurd mtx_lock(&qgroup->tqg_lock); 94323ac9029SStephen Hurd qgroup->tqg_adjusting = 0; 94423ac9029SStephen Hurd 94523ac9029SStephen Hurd return (0); 94623ac9029SStephen Hurd } 94723ac9029SStephen Hurd 94823ac9029SStephen Hurd int 949ab2e3f79SStephen Hurd taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride) 95023ac9029SStephen Hurd { 95123ac9029SStephen Hurd int error; 95223ac9029SStephen Hurd 95323ac9029SStephen Hurd mtx_lock(&qgroup->tqg_lock); 954ab2e3f79SStephen Hurd error = _taskqgroup_adjust(qgroup, cnt, stride); 95523ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 95623ac9029SStephen Hurd 95723ac9029SStephen Hurd return (error); 95823ac9029SStephen Hurd } 95923ac9029SStephen Hurd 96023ac9029SStephen Hurd struct taskqgroup * 96123ac9029SStephen Hurd taskqgroup_create(char *name) 96223ac9029SStephen Hurd { 96323ac9029SStephen Hurd struct taskqgroup *qgroup; 96423ac9029SStephen Hurd 96523ac9029SStephen Hurd qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO); 96623ac9029SStephen Hurd mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF); 96723ac9029SStephen Hurd qgroup->tqg_name = name; 96823ac9029SStephen Hurd LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks); 969ab2e3f79SStephen Hurd 97023ac9029SStephen Hurd return (qgroup); 97123ac9029SStephen Hurd } 97223ac9029SStephen Hurd 97323ac9029SStephen Hurd void 97423ac9029SStephen Hurd taskqgroup_destroy(struct taskqgroup *qgroup) 97523ac9029SStephen Hurd { 97623ac9029SStephen Hurd 97723ac9029SStephen Hurd } 978