123ac9029SStephen Hurd /*- 223ac9029SStephen Hurd * Copyright (c) 2000 Doug Rabson 323ac9029SStephen Hurd * Copyright (c) 2014 Jeff Roberson 423ac9029SStephen Hurd * Copyright (c) 2016 Matthew Macy 523ac9029SStephen Hurd * All rights reserved. 623ac9029SStephen Hurd * 723ac9029SStephen Hurd * Redistribution and use in source and binary forms, with or without 823ac9029SStephen Hurd * modification, are permitted provided that the following conditions 923ac9029SStephen Hurd * are met: 1023ac9029SStephen Hurd * 1. Redistributions of source code must retain the above copyright 1123ac9029SStephen Hurd * notice, this list of conditions and the following disclaimer. 1223ac9029SStephen Hurd * 2. Redistributions in binary form must reproduce the above copyright 1323ac9029SStephen Hurd * notice, this list of conditions and the following disclaimer in the 1423ac9029SStephen Hurd * documentation and/or other materials provided with the distribution. 1523ac9029SStephen Hurd * 1623ac9029SStephen Hurd * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 1723ac9029SStephen Hurd * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 1823ac9029SStephen Hurd * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 1923ac9029SStephen Hurd * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 2023ac9029SStephen Hurd * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 2123ac9029SStephen Hurd * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2223ac9029SStephen Hurd * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2323ac9029SStephen Hurd * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 2423ac9029SStephen Hurd * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 2523ac9029SStephen Hurd * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 2623ac9029SStephen Hurd * SUCH DAMAGE. 2723ac9029SStephen Hurd */ 2823ac9029SStephen Hurd 2923ac9029SStephen Hurd #include <sys/cdefs.h> 3023ac9029SStephen Hurd __FBSDID("$FreeBSD$"); 3123ac9029SStephen Hurd 3223ac9029SStephen Hurd #include <sys/param.h> 3323ac9029SStephen Hurd #include <sys/systm.h> 3423ac9029SStephen Hurd #include <sys/bus.h> 3523ac9029SStephen Hurd #include <sys/cpuset.h> 3623ac9029SStephen Hurd #include <sys/interrupt.h> 3723ac9029SStephen Hurd #include <sys/kernel.h> 3823ac9029SStephen Hurd #include <sys/kthread.h> 3923ac9029SStephen Hurd #include <sys/libkern.h> 4023ac9029SStephen Hurd #include <sys/limits.h> 4123ac9029SStephen Hurd #include <sys/lock.h> 4223ac9029SStephen Hurd #include <sys/malloc.h> 4323ac9029SStephen Hurd #include <sys/mutex.h> 4423ac9029SStephen Hurd #include <sys/proc.h> 4523ac9029SStephen Hurd #include <sys/sched.h> 4623ac9029SStephen Hurd #include <sys/smp.h> 4723ac9029SStephen Hurd #include <sys/gtaskqueue.h> 4823ac9029SStephen Hurd #include <sys/unistd.h> 4923ac9029SStephen Hurd #include <machine/stdarg.h> 5023ac9029SStephen Hurd 5123ac9029SStephen Hurd static MALLOC_DEFINE(M_GTASKQUEUE, "taskqueue", "Task Queues"); 5223ac9029SStephen Hurd static void gtaskqueue_thread_enqueue(void *); 5323ac9029SStephen Hurd static void gtaskqueue_thread_loop(void *arg); 5423ac9029SStephen Hurd 5523ac9029SStephen Hurd 5623ac9029SStephen Hurd struct gtaskqueue_busy { 5723ac9029SStephen Hurd struct gtask *tb_running; 5823ac9029SStephen Hurd TAILQ_ENTRY(gtaskqueue_busy) tb_link; 5923ac9029SStephen Hurd }; 6023ac9029SStephen Hurd 6123ac9029SStephen Hurd static struct gtask * const TB_DRAIN_WAITER = (struct gtask *)0x1; 6223ac9029SStephen Hurd 6323ac9029SStephen Hurd struct gtaskqueue { 6423ac9029SStephen Hurd STAILQ_HEAD(, gtask) tq_queue; 6523ac9029SStephen Hurd gtaskqueue_enqueue_fn tq_enqueue; 6623ac9029SStephen Hurd void *tq_context; 6723ac9029SStephen Hurd char *tq_name; 6823ac9029SStephen Hurd TAILQ_HEAD(, gtaskqueue_busy) tq_active; 6923ac9029SStephen Hurd struct mtx tq_mutex; 7023ac9029SStephen Hurd struct thread **tq_threads; 7123ac9029SStephen Hurd int tq_tcount; 7223ac9029SStephen Hurd int tq_spin; 7323ac9029SStephen Hurd int tq_flags; 7423ac9029SStephen Hurd int tq_callouts; 7523ac9029SStephen Hurd taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; 7623ac9029SStephen Hurd void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; 7723ac9029SStephen Hurd }; 7823ac9029SStephen Hurd 7923ac9029SStephen Hurd #define TQ_FLAGS_ACTIVE (1 << 0) 8023ac9029SStephen Hurd #define TQ_FLAGS_BLOCKED (1 << 1) 8123ac9029SStephen Hurd #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) 8223ac9029SStephen Hurd 8323ac9029SStephen Hurd #define DT_CALLOUT_ARMED (1 << 0) 8423ac9029SStephen Hurd 8523ac9029SStephen Hurd #define TQ_LOCK(tq) \ 8623ac9029SStephen Hurd do { \ 8723ac9029SStephen Hurd if ((tq)->tq_spin) \ 8823ac9029SStephen Hurd mtx_lock_spin(&(tq)->tq_mutex); \ 8923ac9029SStephen Hurd else \ 9023ac9029SStephen Hurd mtx_lock(&(tq)->tq_mutex); \ 9123ac9029SStephen Hurd } while (0) 9223ac9029SStephen Hurd #define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) 9323ac9029SStephen Hurd 9423ac9029SStephen Hurd #define TQ_UNLOCK(tq) \ 9523ac9029SStephen Hurd do { \ 9623ac9029SStephen Hurd if ((tq)->tq_spin) \ 9723ac9029SStephen Hurd mtx_unlock_spin(&(tq)->tq_mutex); \ 9823ac9029SStephen Hurd else \ 9923ac9029SStephen Hurd mtx_unlock(&(tq)->tq_mutex); \ 10023ac9029SStephen Hurd } while (0) 10123ac9029SStephen Hurd #define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) 10223ac9029SStephen Hurd 10323ac9029SStephen Hurd static __inline int 10423ac9029SStephen Hurd TQ_SLEEP(struct gtaskqueue *tq, void *p, struct mtx *m, int pri, const char *wm, 10523ac9029SStephen Hurd int t) 10623ac9029SStephen Hurd { 10723ac9029SStephen Hurd if (tq->tq_spin) 10823ac9029SStephen Hurd return (msleep_spin(p, m, wm, t)); 10923ac9029SStephen Hurd return (msleep(p, m, pri, wm, t)); 11023ac9029SStephen Hurd } 11123ac9029SStephen Hurd 11223ac9029SStephen Hurd static struct gtaskqueue * 11323ac9029SStephen Hurd _gtaskqueue_create(const char *name, int mflags, 11423ac9029SStephen Hurd taskqueue_enqueue_fn enqueue, void *context, 11523ac9029SStephen Hurd int mtxflags, const char *mtxname __unused) 11623ac9029SStephen Hurd { 11723ac9029SStephen Hurd struct gtaskqueue *queue; 11823ac9029SStephen Hurd char *tq_name; 11923ac9029SStephen Hurd 12023ac9029SStephen Hurd tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO); 12123ac9029SStephen Hurd if (!tq_name) 12223ac9029SStephen Hurd return (NULL); 12323ac9029SStephen Hurd 12423ac9029SStephen Hurd snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); 12523ac9029SStephen Hurd 12623ac9029SStephen Hurd queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO); 12723ac9029SStephen Hurd if (!queue) 12823ac9029SStephen Hurd return (NULL); 12923ac9029SStephen Hurd 13023ac9029SStephen Hurd STAILQ_INIT(&queue->tq_queue); 13123ac9029SStephen Hurd TAILQ_INIT(&queue->tq_active); 13223ac9029SStephen Hurd queue->tq_enqueue = enqueue; 13323ac9029SStephen Hurd queue->tq_context = context; 13423ac9029SStephen Hurd queue->tq_name = tq_name; 13523ac9029SStephen Hurd queue->tq_spin = (mtxflags & MTX_SPIN) != 0; 13623ac9029SStephen Hurd queue->tq_flags |= TQ_FLAGS_ACTIVE; 13723ac9029SStephen Hurd if (enqueue == gtaskqueue_thread_enqueue) 13823ac9029SStephen Hurd queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; 13923ac9029SStephen Hurd mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); 14023ac9029SStephen Hurd 14123ac9029SStephen Hurd return (queue); 14223ac9029SStephen Hurd } 14323ac9029SStephen Hurd 14423ac9029SStephen Hurd 14523ac9029SStephen Hurd /* 14623ac9029SStephen Hurd * Signal a taskqueue thread to terminate. 14723ac9029SStephen Hurd */ 14823ac9029SStephen Hurd static void 14923ac9029SStephen Hurd gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq) 15023ac9029SStephen Hurd { 15123ac9029SStephen Hurd 15223ac9029SStephen Hurd while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { 15323ac9029SStephen Hurd wakeup(tq); 15423ac9029SStephen Hurd TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0); 15523ac9029SStephen Hurd } 15623ac9029SStephen Hurd } 15723ac9029SStephen Hurd 15823ac9029SStephen Hurd static void 15923ac9029SStephen Hurd gtaskqueue_free(struct gtaskqueue *queue) 16023ac9029SStephen Hurd { 16123ac9029SStephen Hurd 16223ac9029SStephen Hurd TQ_LOCK(queue); 16323ac9029SStephen Hurd queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 16423ac9029SStephen Hurd gtaskqueue_terminate(queue->tq_threads, queue); 16523ac9029SStephen Hurd KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?")); 16623ac9029SStephen Hurd KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); 16723ac9029SStephen Hurd mtx_destroy(&queue->tq_mutex); 16823ac9029SStephen Hurd free(queue->tq_threads, M_GTASKQUEUE); 16923ac9029SStephen Hurd free(queue->tq_name, M_GTASKQUEUE); 17023ac9029SStephen Hurd free(queue, M_GTASKQUEUE); 17123ac9029SStephen Hurd } 17223ac9029SStephen Hurd 17323ac9029SStephen Hurd int 17423ac9029SStephen Hurd grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask) 17523ac9029SStephen Hurd { 17623ac9029SStephen Hurd TQ_LOCK(queue); 17723ac9029SStephen Hurd if (gtask->ta_flags & TASK_ENQUEUED) { 17823ac9029SStephen Hurd TQ_UNLOCK(queue); 17923ac9029SStephen Hurd return (0); 18023ac9029SStephen Hurd } 18123ac9029SStephen Hurd STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link); 18223ac9029SStephen Hurd gtask->ta_flags |= TASK_ENQUEUED; 18323ac9029SStephen Hurd TQ_UNLOCK(queue); 18423ac9029SStephen Hurd if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) 18523ac9029SStephen Hurd queue->tq_enqueue(queue->tq_context); 18623ac9029SStephen Hurd return (0); 18723ac9029SStephen Hurd } 18823ac9029SStephen Hurd 18923ac9029SStephen Hurd static void 19023ac9029SStephen Hurd gtaskqueue_task_nop_fn(void *context) 19123ac9029SStephen Hurd { 19223ac9029SStephen Hurd } 19323ac9029SStephen Hurd 19423ac9029SStephen Hurd /* 19523ac9029SStephen Hurd * Block until all currently queued tasks in this taskqueue 19623ac9029SStephen Hurd * have begun execution. Tasks queued during execution of 19723ac9029SStephen Hurd * this function are ignored. 19823ac9029SStephen Hurd */ 19923ac9029SStephen Hurd static void 20023ac9029SStephen Hurd gtaskqueue_drain_tq_queue(struct gtaskqueue *queue) 20123ac9029SStephen Hurd { 20223ac9029SStephen Hurd struct gtask t_barrier; 20323ac9029SStephen Hurd 20423ac9029SStephen Hurd if (STAILQ_EMPTY(&queue->tq_queue)) 20523ac9029SStephen Hurd return; 20623ac9029SStephen Hurd 20723ac9029SStephen Hurd /* 20823ac9029SStephen Hurd * Enqueue our barrier after all current tasks, but with 20923ac9029SStephen Hurd * the highest priority so that newly queued tasks cannot 21023ac9029SStephen Hurd * pass it. Because of the high priority, we can not use 21123ac9029SStephen Hurd * taskqueue_enqueue_locked directly (which drops the lock 21223ac9029SStephen Hurd * anyway) so just insert it at tail while we have the 21323ac9029SStephen Hurd * queue lock. 21423ac9029SStephen Hurd */ 21523ac9029SStephen Hurd GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier); 21623ac9029SStephen Hurd STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); 21723ac9029SStephen Hurd t_barrier.ta_flags |= TASK_ENQUEUED; 21823ac9029SStephen Hurd 21923ac9029SStephen Hurd /* 22023ac9029SStephen Hurd * Once the barrier has executed, all previously queued tasks 22123ac9029SStephen Hurd * have completed or are currently executing. 22223ac9029SStephen Hurd */ 22323ac9029SStephen Hurd while (t_barrier.ta_flags & TASK_ENQUEUED) 22423ac9029SStephen Hurd TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0); 22523ac9029SStephen Hurd } 22623ac9029SStephen Hurd 22723ac9029SStephen Hurd /* 22823ac9029SStephen Hurd * Block until all currently executing tasks for this taskqueue 22923ac9029SStephen Hurd * complete. Tasks that begin execution during the execution 23023ac9029SStephen Hurd * of this function are ignored. 23123ac9029SStephen Hurd */ 23223ac9029SStephen Hurd static void 23323ac9029SStephen Hurd gtaskqueue_drain_tq_active(struct gtaskqueue *queue) 23423ac9029SStephen Hurd { 23523ac9029SStephen Hurd struct gtaskqueue_busy tb_marker, *tb_first; 23623ac9029SStephen Hurd 23723ac9029SStephen Hurd if (TAILQ_EMPTY(&queue->tq_active)) 23823ac9029SStephen Hurd return; 23923ac9029SStephen Hurd 24023ac9029SStephen Hurd /* Block taskq_terminate().*/ 24123ac9029SStephen Hurd queue->tq_callouts++; 24223ac9029SStephen Hurd 24323ac9029SStephen Hurd /* 24423ac9029SStephen Hurd * Wait for all currently executing taskqueue threads 24523ac9029SStephen Hurd * to go idle. 24623ac9029SStephen Hurd */ 24723ac9029SStephen Hurd tb_marker.tb_running = TB_DRAIN_WAITER; 24823ac9029SStephen Hurd TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link); 24923ac9029SStephen Hurd while (TAILQ_FIRST(&queue->tq_active) != &tb_marker) 25023ac9029SStephen Hurd TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0); 25123ac9029SStephen Hurd TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link); 25223ac9029SStephen Hurd 25323ac9029SStephen Hurd /* 25423ac9029SStephen Hurd * Wakeup any other drain waiter that happened to queue up 25523ac9029SStephen Hurd * without any intervening active thread. 25623ac9029SStephen Hurd */ 25723ac9029SStephen Hurd tb_first = TAILQ_FIRST(&queue->tq_active); 25823ac9029SStephen Hurd if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER) 25923ac9029SStephen Hurd wakeup(tb_first); 26023ac9029SStephen Hurd 26123ac9029SStephen Hurd /* Release taskqueue_terminate(). */ 26223ac9029SStephen Hurd queue->tq_callouts--; 26323ac9029SStephen Hurd if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 26423ac9029SStephen Hurd wakeup_one(queue->tq_threads); 26523ac9029SStephen Hurd } 26623ac9029SStephen Hurd 26723ac9029SStephen Hurd void 26823ac9029SStephen Hurd gtaskqueue_block(struct gtaskqueue *queue) 26923ac9029SStephen Hurd { 27023ac9029SStephen Hurd 27123ac9029SStephen Hurd TQ_LOCK(queue); 27223ac9029SStephen Hurd queue->tq_flags |= TQ_FLAGS_BLOCKED; 27323ac9029SStephen Hurd TQ_UNLOCK(queue); 27423ac9029SStephen Hurd } 27523ac9029SStephen Hurd 27623ac9029SStephen Hurd void 27723ac9029SStephen Hurd gtaskqueue_unblock(struct gtaskqueue *queue) 27823ac9029SStephen Hurd { 27923ac9029SStephen Hurd 28023ac9029SStephen Hurd TQ_LOCK(queue); 28123ac9029SStephen Hurd queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 28223ac9029SStephen Hurd if (!STAILQ_EMPTY(&queue->tq_queue)) 28323ac9029SStephen Hurd queue->tq_enqueue(queue->tq_context); 28423ac9029SStephen Hurd TQ_UNLOCK(queue); 28523ac9029SStephen Hurd } 28623ac9029SStephen Hurd 28723ac9029SStephen Hurd static void 28823ac9029SStephen Hurd gtaskqueue_run_locked(struct gtaskqueue *queue) 28923ac9029SStephen Hurd { 29023ac9029SStephen Hurd struct gtaskqueue_busy tb; 29123ac9029SStephen Hurd struct gtaskqueue_busy *tb_first; 29223ac9029SStephen Hurd struct gtask *gtask; 29323ac9029SStephen Hurd 29423ac9029SStephen Hurd KASSERT(queue != NULL, ("tq is NULL")); 29523ac9029SStephen Hurd TQ_ASSERT_LOCKED(queue); 29623ac9029SStephen Hurd tb.tb_running = NULL; 29723ac9029SStephen Hurd 29823ac9029SStephen Hurd while (STAILQ_FIRST(&queue->tq_queue)) { 29923ac9029SStephen Hurd TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); 30023ac9029SStephen Hurd 30123ac9029SStephen Hurd /* 30223ac9029SStephen Hurd * Carefully remove the first task from the queue and 30323ac9029SStephen Hurd * clear its TASK_ENQUEUED flag 30423ac9029SStephen Hurd */ 30523ac9029SStephen Hurd gtask = STAILQ_FIRST(&queue->tq_queue); 30623ac9029SStephen Hurd KASSERT(gtask != NULL, ("task is NULL")); 30723ac9029SStephen Hurd STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 30823ac9029SStephen Hurd gtask->ta_flags &= ~TASK_ENQUEUED; 30923ac9029SStephen Hurd tb.tb_running = gtask; 31023ac9029SStephen Hurd TQ_UNLOCK(queue); 31123ac9029SStephen Hurd 31223ac9029SStephen Hurd KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL")); 31323ac9029SStephen Hurd gtask->ta_func(gtask->ta_context); 31423ac9029SStephen Hurd 31523ac9029SStephen Hurd TQ_LOCK(queue); 31623ac9029SStephen Hurd tb.tb_running = NULL; 31723ac9029SStephen Hurd wakeup(gtask); 31823ac9029SStephen Hurd 31923ac9029SStephen Hurd TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); 32023ac9029SStephen Hurd tb_first = TAILQ_FIRST(&queue->tq_active); 32123ac9029SStephen Hurd if (tb_first != NULL && 32223ac9029SStephen Hurd tb_first->tb_running == TB_DRAIN_WAITER) 32323ac9029SStephen Hurd wakeup(tb_first); 32423ac9029SStephen Hurd } 32523ac9029SStephen Hurd } 32623ac9029SStephen Hurd 32723ac9029SStephen Hurd static int 32823ac9029SStephen Hurd task_is_running(struct gtaskqueue *queue, struct gtask *gtask) 32923ac9029SStephen Hurd { 33023ac9029SStephen Hurd struct gtaskqueue_busy *tb; 33123ac9029SStephen Hurd 33223ac9029SStephen Hurd TQ_ASSERT_LOCKED(queue); 33323ac9029SStephen Hurd TAILQ_FOREACH(tb, &queue->tq_active, tb_link) { 33423ac9029SStephen Hurd if (tb->tb_running == gtask) 33523ac9029SStephen Hurd return (1); 33623ac9029SStephen Hurd } 33723ac9029SStephen Hurd return (0); 33823ac9029SStephen Hurd } 33923ac9029SStephen Hurd 34023ac9029SStephen Hurd static int 34123ac9029SStephen Hurd gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask) 34223ac9029SStephen Hurd { 34323ac9029SStephen Hurd 34423ac9029SStephen Hurd if (gtask->ta_flags & TASK_ENQUEUED) 34523ac9029SStephen Hurd STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link); 34623ac9029SStephen Hurd gtask->ta_flags &= ~TASK_ENQUEUED; 34723ac9029SStephen Hurd return (task_is_running(queue, gtask) ? EBUSY : 0); 34823ac9029SStephen Hurd } 34923ac9029SStephen Hurd 35023ac9029SStephen Hurd int 35123ac9029SStephen Hurd gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask) 35223ac9029SStephen Hurd { 35323ac9029SStephen Hurd int error; 35423ac9029SStephen Hurd 35523ac9029SStephen Hurd TQ_LOCK(queue); 35623ac9029SStephen Hurd error = gtaskqueue_cancel_locked(queue, gtask); 35723ac9029SStephen Hurd TQ_UNLOCK(queue); 35823ac9029SStephen Hurd 35923ac9029SStephen Hurd return (error); 36023ac9029SStephen Hurd } 36123ac9029SStephen Hurd 36223ac9029SStephen Hurd void 36323ac9029SStephen Hurd gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask) 36423ac9029SStephen Hurd { 36523ac9029SStephen Hurd 36623ac9029SStephen Hurd if (!queue->tq_spin) 36723ac9029SStephen Hurd WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 36823ac9029SStephen Hurd 36923ac9029SStephen Hurd TQ_LOCK(queue); 37023ac9029SStephen Hurd while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask)) 37123ac9029SStephen Hurd TQ_SLEEP(queue, gtask, &queue->tq_mutex, PWAIT, "-", 0); 37223ac9029SStephen Hurd TQ_UNLOCK(queue); 37323ac9029SStephen Hurd } 37423ac9029SStephen Hurd 37523ac9029SStephen Hurd void 37623ac9029SStephen Hurd gtaskqueue_drain_all(struct gtaskqueue *queue) 37723ac9029SStephen Hurd { 37823ac9029SStephen Hurd 37923ac9029SStephen Hurd if (!queue->tq_spin) 38023ac9029SStephen Hurd WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); 38123ac9029SStephen Hurd 38223ac9029SStephen Hurd TQ_LOCK(queue); 38323ac9029SStephen Hurd gtaskqueue_drain_tq_queue(queue); 38423ac9029SStephen Hurd gtaskqueue_drain_tq_active(queue); 38523ac9029SStephen Hurd TQ_UNLOCK(queue); 38623ac9029SStephen Hurd } 38723ac9029SStephen Hurd 38823ac9029SStephen Hurd static int 38923ac9029SStephen Hurd _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, 39023ac9029SStephen Hurd cpuset_t *mask, const char *name, va_list ap) 39123ac9029SStephen Hurd { 39223ac9029SStephen Hurd char ktname[MAXCOMLEN + 1]; 39323ac9029SStephen Hurd struct thread *td; 39423ac9029SStephen Hurd struct gtaskqueue *tq; 39523ac9029SStephen Hurd int i, error; 39623ac9029SStephen Hurd 39723ac9029SStephen Hurd if (count <= 0) 39823ac9029SStephen Hurd return (EINVAL); 39923ac9029SStephen Hurd 40023ac9029SStephen Hurd vsnprintf(ktname, sizeof(ktname), name, ap); 40123ac9029SStephen Hurd tq = *tqp; 40223ac9029SStephen Hurd 40323ac9029SStephen Hurd tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE, 40423ac9029SStephen Hurd M_NOWAIT | M_ZERO); 40523ac9029SStephen Hurd if (tq->tq_threads == NULL) { 40623ac9029SStephen Hurd printf("%s: no memory for %s threads\n", __func__, ktname); 40723ac9029SStephen Hurd return (ENOMEM); 40823ac9029SStephen Hurd } 40923ac9029SStephen Hurd 41023ac9029SStephen Hurd for (i = 0; i < count; i++) { 41123ac9029SStephen Hurd if (count == 1) 41223ac9029SStephen Hurd error = kthread_add(gtaskqueue_thread_loop, tqp, NULL, 41323ac9029SStephen Hurd &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); 41423ac9029SStephen Hurd else 41523ac9029SStephen Hurd error = kthread_add(gtaskqueue_thread_loop, tqp, NULL, 41623ac9029SStephen Hurd &tq->tq_threads[i], RFSTOPPED, 0, 41723ac9029SStephen Hurd "%s_%d", ktname, i); 41823ac9029SStephen Hurd if (error) { 41923ac9029SStephen Hurd /* should be ok to continue, taskqueue_free will dtrt */ 42023ac9029SStephen Hurd printf("%s: kthread_add(%s): error %d", __func__, 42123ac9029SStephen Hurd ktname, error); 42223ac9029SStephen Hurd tq->tq_threads[i] = NULL; /* paranoid */ 42323ac9029SStephen Hurd } else 42423ac9029SStephen Hurd tq->tq_tcount++; 42523ac9029SStephen Hurd } 42623ac9029SStephen Hurd for (i = 0; i < count; i++) { 42723ac9029SStephen Hurd if (tq->tq_threads[i] == NULL) 42823ac9029SStephen Hurd continue; 42923ac9029SStephen Hurd td = tq->tq_threads[i]; 43023ac9029SStephen Hurd if (mask) { 43123ac9029SStephen Hurd error = cpuset_setthread(td->td_tid, mask); 43223ac9029SStephen Hurd /* 43323ac9029SStephen Hurd * Failing to pin is rarely an actual fatal error; 43423ac9029SStephen Hurd * it'll just affect performance. 43523ac9029SStephen Hurd */ 43623ac9029SStephen Hurd if (error) 43723ac9029SStephen Hurd printf("%s: curthread=%llu: can't pin; " 43823ac9029SStephen Hurd "error=%d\n", 43923ac9029SStephen Hurd __func__, 44023ac9029SStephen Hurd (unsigned long long) td->td_tid, 44123ac9029SStephen Hurd error); 44223ac9029SStephen Hurd } 44323ac9029SStephen Hurd thread_lock(td); 44423ac9029SStephen Hurd sched_prio(td, pri); 44523ac9029SStephen Hurd sched_add(td, SRQ_BORING); 44623ac9029SStephen Hurd thread_unlock(td); 44723ac9029SStephen Hurd } 44823ac9029SStephen Hurd 44923ac9029SStephen Hurd return (0); 45023ac9029SStephen Hurd } 45123ac9029SStephen Hurd 45223ac9029SStephen Hurd static int 45323ac9029SStephen Hurd gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, 45423ac9029SStephen Hurd const char *name, ...) 45523ac9029SStephen Hurd { 45623ac9029SStephen Hurd va_list ap; 45723ac9029SStephen Hurd int error; 45823ac9029SStephen Hurd 45923ac9029SStephen Hurd va_start(ap, name); 46023ac9029SStephen Hurd error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap); 46123ac9029SStephen Hurd va_end(ap); 46223ac9029SStephen Hurd return (error); 46323ac9029SStephen Hurd } 46423ac9029SStephen Hurd 46523ac9029SStephen Hurd static inline void 46623ac9029SStephen Hurd gtaskqueue_run_callback(struct gtaskqueue *tq, 46723ac9029SStephen Hurd enum taskqueue_callback_type cb_type) 46823ac9029SStephen Hurd { 46923ac9029SStephen Hurd taskqueue_callback_fn tq_callback; 47023ac9029SStephen Hurd 47123ac9029SStephen Hurd TQ_ASSERT_UNLOCKED(tq); 47223ac9029SStephen Hurd tq_callback = tq->tq_callbacks[cb_type]; 47323ac9029SStephen Hurd if (tq_callback != NULL) 47423ac9029SStephen Hurd tq_callback(tq->tq_cb_contexts[cb_type]); 47523ac9029SStephen Hurd } 47623ac9029SStephen Hurd 47723ac9029SStephen Hurd static void 47823ac9029SStephen Hurd gtaskqueue_thread_loop(void *arg) 47923ac9029SStephen Hurd { 48023ac9029SStephen Hurd struct gtaskqueue **tqp, *tq; 48123ac9029SStephen Hurd 48223ac9029SStephen Hurd tqp = arg; 48323ac9029SStephen Hurd tq = *tqp; 48423ac9029SStephen Hurd gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); 48523ac9029SStephen Hurd TQ_LOCK(tq); 48623ac9029SStephen Hurd while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 48723ac9029SStephen Hurd /* XXX ? */ 48823ac9029SStephen Hurd gtaskqueue_run_locked(tq); 48923ac9029SStephen Hurd /* 49023ac9029SStephen Hurd * Because taskqueue_run() can drop tq_mutex, we need to 49123ac9029SStephen Hurd * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the 49223ac9029SStephen Hurd * meantime, which means we missed a wakeup. 49323ac9029SStephen Hurd */ 49423ac9029SStephen Hurd if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) 49523ac9029SStephen Hurd break; 49623ac9029SStephen Hurd TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0); 49723ac9029SStephen Hurd } 49823ac9029SStephen Hurd gtaskqueue_run_locked(tq); 49923ac9029SStephen Hurd /* 50023ac9029SStephen Hurd * This thread is on its way out, so just drop the lock temporarily 50123ac9029SStephen Hurd * in order to call the shutdown callback. This allows the callback 50223ac9029SStephen Hurd * to look at the taskqueue, even just before it dies. 50323ac9029SStephen Hurd */ 50423ac9029SStephen Hurd TQ_UNLOCK(tq); 50523ac9029SStephen Hurd gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); 50623ac9029SStephen Hurd TQ_LOCK(tq); 50723ac9029SStephen Hurd 50823ac9029SStephen Hurd /* rendezvous with thread that asked us to terminate */ 50923ac9029SStephen Hurd tq->tq_tcount--; 51023ac9029SStephen Hurd wakeup_one(tq->tq_threads); 51123ac9029SStephen Hurd TQ_UNLOCK(tq); 51223ac9029SStephen Hurd kthread_exit(); 51323ac9029SStephen Hurd } 51423ac9029SStephen Hurd 51523ac9029SStephen Hurd static void 51623ac9029SStephen Hurd gtaskqueue_thread_enqueue(void *context) 51723ac9029SStephen Hurd { 51823ac9029SStephen Hurd struct gtaskqueue **tqp, *tq; 51923ac9029SStephen Hurd 52023ac9029SStephen Hurd tqp = context; 52123ac9029SStephen Hurd tq = *tqp; 52223ac9029SStephen Hurd wakeup_one(tq); 52323ac9029SStephen Hurd } 52423ac9029SStephen Hurd 52523ac9029SStephen Hurd 52623ac9029SStephen Hurd static struct gtaskqueue * 52723ac9029SStephen Hurd gtaskqueue_create_fast(const char *name, int mflags, 52823ac9029SStephen Hurd taskqueue_enqueue_fn enqueue, void *context) 52923ac9029SStephen Hurd { 53023ac9029SStephen Hurd return _gtaskqueue_create(name, mflags, enqueue, context, 53123ac9029SStephen Hurd MTX_SPIN, "fast_taskqueue"); 53223ac9029SStephen Hurd } 53323ac9029SStephen Hurd 53423ac9029SStephen Hurd 53523ac9029SStephen Hurd struct taskqgroup_cpu { 53623ac9029SStephen Hurd LIST_HEAD(, grouptask) tgc_tasks; 53723ac9029SStephen Hurd struct gtaskqueue *tgc_taskq; 53823ac9029SStephen Hurd int tgc_cnt; 53923ac9029SStephen Hurd int tgc_cpu; 54023ac9029SStephen Hurd }; 54123ac9029SStephen Hurd 54223ac9029SStephen Hurd struct taskqgroup { 54323ac9029SStephen Hurd struct taskqgroup_cpu tqg_queue[MAXCPU]; 54423ac9029SStephen Hurd struct mtx tqg_lock; 54523ac9029SStephen Hurd char * tqg_name; 54623ac9029SStephen Hurd int tqg_adjusting; 54723ac9029SStephen Hurd int tqg_stride; 54823ac9029SStephen Hurd int tqg_cnt; 54923ac9029SStephen Hurd }; 55023ac9029SStephen Hurd 55123ac9029SStephen Hurd struct taskq_bind_task { 55223ac9029SStephen Hurd struct gtask bt_task; 55323ac9029SStephen Hurd int bt_cpuid; 55423ac9029SStephen Hurd }; 55523ac9029SStephen Hurd 55623ac9029SStephen Hurd static void 55723ac9029SStephen Hurd taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx) 55823ac9029SStephen Hurd { 55923ac9029SStephen Hurd struct taskqgroup_cpu *qcpu; 56023ac9029SStephen Hurd 56123ac9029SStephen Hurd qcpu = &qgroup->tqg_queue[idx]; 56223ac9029SStephen Hurd LIST_INIT(&qcpu->tgc_tasks); 56323ac9029SStephen Hurd qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK, 56423ac9029SStephen Hurd taskqueue_thread_enqueue, &qcpu->tgc_taskq); 56523ac9029SStephen Hurd gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT, 56623ac9029SStephen Hurd "%s_%d", qgroup->tqg_name, idx); 56723ac9029SStephen Hurd qcpu->tgc_cpu = idx * qgroup->tqg_stride; 56823ac9029SStephen Hurd } 56923ac9029SStephen Hurd 57023ac9029SStephen Hurd static void 57123ac9029SStephen Hurd taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx) 57223ac9029SStephen Hurd { 57323ac9029SStephen Hurd 57423ac9029SStephen Hurd gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq); 57523ac9029SStephen Hurd } 57623ac9029SStephen Hurd 57723ac9029SStephen Hurd /* 57823ac9029SStephen Hurd * Find the taskq with least # of tasks that doesn't currently have any 57923ac9029SStephen Hurd * other queues from the uniq identifier. 58023ac9029SStephen Hurd */ 58123ac9029SStephen Hurd static int 58223ac9029SStephen Hurd taskqgroup_find(struct taskqgroup *qgroup, void *uniq) 58323ac9029SStephen Hurd { 58423ac9029SStephen Hurd struct grouptask *n; 58523ac9029SStephen Hurd int i, idx, mincnt; 58623ac9029SStephen Hurd int strict; 58723ac9029SStephen Hurd 58823ac9029SStephen Hurd mtx_assert(&qgroup->tqg_lock, MA_OWNED); 58923ac9029SStephen Hurd if (qgroup->tqg_cnt == 0) 59023ac9029SStephen Hurd return (0); 59123ac9029SStephen Hurd idx = -1; 59223ac9029SStephen Hurd mincnt = INT_MAX; 59323ac9029SStephen Hurd /* 59423ac9029SStephen Hurd * Two passes; First scan for a queue with the least tasks that 59523ac9029SStephen Hurd * does not already service this uniq id. If that fails simply find 59623ac9029SStephen Hurd * the queue with the least total tasks; 59723ac9029SStephen Hurd */ 59823ac9029SStephen Hurd for (strict = 1; mincnt == INT_MAX; strict = 0) { 59923ac9029SStephen Hurd for (i = 0; i < qgroup->tqg_cnt; i++) { 60023ac9029SStephen Hurd if (qgroup->tqg_queue[i].tgc_cnt > mincnt) 60123ac9029SStephen Hurd continue; 60223ac9029SStephen Hurd if (strict) { 60323ac9029SStephen Hurd LIST_FOREACH(n, 60423ac9029SStephen Hurd &qgroup->tqg_queue[i].tgc_tasks, gt_list) 60523ac9029SStephen Hurd if (n->gt_uniq == uniq) 60623ac9029SStephen Hurd break; 60723ac9029SStephen Hurd if (n != NULL) 60823ac9029SStephen Hurd continue; 60923ac9029SStephen Hurd } 61023ac9029SStephen Hurd mincnt = qgroup->tqg_queue[i].tgc_cnt; 61123ac9029SStephen Hurd idx = i; 61223ac9029SStephen Hurd } 61323ac9029SStephen Hurd } 61423ac9029SStephen Hurd if (idx == -1) 61523ac9029SStephen Hurd panic("taskqgroup_find: Failed to pick a qid."); 61623ac9029SStephen Hurd 61723ac9029SStephen Hurd return (idx); 61823ac9029SStephen Hurd } 61923ac9029SStephen Hurd 62023ac9029SStephen Hurd void 62123ac9029SStephen Hurd taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask, 62223ac9029SStephen Hurd void *uniq, int irq, char *name) 62323ac9029SStephen Hurd { 62423ac9029SStephen Hurd cpuset_t mask; 62523ac9029SStephen Hurd int qid; 62623ac9029SStephen Hurd 62723ac9029SStephen Hurd gtask->gt_uniq = uniq; 62823ac9029SStephen Hurd gtask->gt_name = name; 62923ac9029SStephen Hurd gtask->gt_irq = irq; 63023ac9029SStephen Hurd gtask->gt_cpu = -1; 63123ac9029SStephen Hurd mtx_lock(&qgroup->tqg_lock); 63223ac9029SStephen Hurd qid = taskqgroup_find(qgroup, uniq); 63323ac9029SStephen Hurd qgroup->tqg_queue[qid].tgc_cnt++; 63423ac9029SStephen Hurd LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 63523ac9029SStephen Hurd gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 63623ac9029SStephen Hurd if (irq != -1 && smp_started) { 63723ac9029SStephen Hurd CPU_ZERO(&mask); 63823ac9029SStephen Hurd CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); 63923ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 64023ac9029SStephen Hurd intr_setaffinity(irq, &mask); 64123ac9029SStephen Hurd } else 64223ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 64323ac9029SStephen Hurd } 64423ac9029SStephen Hurd 64523ac9029SStephen Hurd int 64623ac9029SStephen Hurd taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask, 64723ac9029SStephen Hurd void *uniq, int cpu, int irq, char *name) 64823ac9029SStephen Hurd { 64923ac9029SStephen Hurd cpuset_t mask; 65023ac9029SStephen Hurd int i, qid; 65123ac9029SStephen Hurd 65223ac9029SStephen Hurd qid = -1; 65323ac9029SStephen Hurd gtask->gt_uniq = uniq; 65423ac9029SStephen Hurd gtask->gt_name = name; 65523ac9029SStephen Hurd gtask->gt_irq = irq; 65623ac9029SStephen Hurd gtask->gt_cpu = cpu; 65723ac9029SStephen Hurd mtx_lock(&qgroup->tqg_lock); 65823ac9029SStephen Hurd if (smp_started) { 65923ac9029SStephen Hurd for (i = 0; i < qgroup->tqg_cnt; i++) 66023ac9029SStephen Hurd if (qgroup->tqg_queue[i].tgc_cpu == cpu) { 66123ac9029SStephen Hurd qid = i; 66223ac9029SStephen Hurd break; 66323ac9029SStephen Hurd } 66423ac9029SStephen Hurd if (qid == -1) { 66523ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 66623ac9029SStephen Hurd return (EINVAL); 66723ac9029SStephen Hurd } 66823ac9029SStephen Hurd } else 66923ac9029SStephen Hurd qid = 0; 67023ac9029SStephen Hurd qgroup->tqg_queue[qid].tgc_cnt++; 67123ac9029SStephen Hurd LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); 67223ac9029SStephen Hurd gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 67323ac9029SStephen Hurd if (irq != -1 && smp_started) { 67423ac9029SStephen Hurd CPU_ZERO(&mask); 67523ac9029SStephen Hurd CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); 67623ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 67723ac9029SStephen Hurd intr_setaffinity(irq, &mask); 67823ac9029SStephen Hurd } else 67923ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 68023ac9029SStephen Hurd return (0); 68123ac9029SStephen Hurd } 68223ac9029SStephen Hurd 68323ac9029SStephen Hurd void 68423ac9029SStephen Hurd taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask) 68523ac9029SStephen Hurd { 68623ac9029SStephen Hurd int i; 68723ac9029SStephen Hurd 68823ac9029SStephen Hurd mtx_lock(&qgroup->tqg_lock); 68923ac9029SStephen Hurd for (i = 0; i < qgroup->tqg_cnt; i++) 69023ac9029SStephen Hurd if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue) 69123ac9029SStephen Hurd break; 69223ac9029SStephen Hurd if (i == qgroup->tqg_cnt) 69323ac9029SStephen Hurd panic("taskqgroup_detach: task not in group\n"); 69423ac9029SStephen Hurd qgroup->tqg_queue[i].tgc_cnt--; 69523ac9029SStephen Hurd LIST_REMOVE(gtask, gt_list); 69623ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 69723ac9029SStephen Hurd gtask->gt_taskqueue = NULL; 69823ac9029SStephen Hurd } 69923ac9029SStephen Hurd 70023ac9029SStephen Hurd static void 70123ac9029SStephen Hurd taskqgroup_binder(void *ctx) 70223ac9029SStephen Hurd { 70323ac9029SStephen Hurd struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx; 70423ac9029SStephen Hurd cpuset_t mask; 70523ac9029SStephen Hurd int error; 70623ac9029SStephen Hurd 70723ac9029SStephen Hurd CPU_ZERO(&mask); 70823ac9029SStephen Hurd CPU_SET(gtask->bt_cpuid, &mask); 70923ac9029SStephen Hurd error = cpuset_setthread(curthread->td_tid, &mask); 71023ac9029SStephen Hurd thread_lock(curthread); 71123ac9029SStephen Hurd sched_bind(curthread, gtask->bt_cpuid); 71223ac9029SStephen Hurd thread_unlock(curthread); 71323ac9029SStephen Hurd 71423ac9029SStephen Hurd if (error) 71523ac9029SStephen Hurd printf("taskqgroup_binder: setaffinity failed: %d\n", 71623ac9029SStephen Hurd error); 71723ac9029SStephen Hurd free(gtask, M_DEVBUF); 71823ac9029SStephen Hurd } 71923ac9029SStephen Hurd 72023ac9029SStephen Hurd static void 72123ac9029SStephen Hurd taskqgroup_bind(struct taskqgroup *qgroup) 72223ac9029SStephen Hurd { 72323ac9029SStephen Hurd struct taskq_bind_task *gtask; 72423ac9029SStephen Hurd int i; 72523ac9029SStephen Hurd 72623ac9029SStephen Hurd /* 72723ac9029SStephen Hurd * Bind taskqueue threads to specific CPUs, if they have been assigned 72823ac9029SStephen Hurd * one. 72923ac9029SStephen Hurd */ 73023ac9029SStephen Hurd for (i = 0; i < qgroup->tqg_cnt; i++) { 73123ac9029SStephen Hurd gtask = malloc(sizeof (*gtask), M_DEVBUF, M_NOWAIT); 73223ac9029SStephen Hurd GTASK_INIT(>ask->bt_task, 0, 0, taskqgroup_binder, gtask); 73323ac9029SStephen Hurd gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu; 73423ac9029SStephen Hurd grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq, 73523ac9029SStephen Hurd >ask->bt_task); 73623ac9029SStephen Hurd } 73723ac9029SStephen Hurd } 73823ac9029SStephen Hurd 73923ac9029SStephen Hurd static int 74023ac9029SStephen Hurd _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride) 74123ac9029SStephen Hurd { 74223ac9029SStephen Hurd LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL); 74323ac9029SStephen Hurd cpuset_t mask; 74423ac9029SStephen Hurd struct grouptask *gtask; 74523ac9029SStephen Hurd int i, old_cnt, qid; 74623ac9029SStephen Hurd 74723ac9029SStephen Hurd mtx_assert(&qgroup->tqg_lock, MA_OWNED); 74823ac9029SStephen Hurd 74923ac9029SStephen Hurd if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) { 75023ac9029SStephen Hurd printf("taskqgroup_adjust failed cnt: %d stride: %d mp_ncpus: %d smp_started: %d\n", 75123ac9029SStephen Hurd cnt, stride, mp_ncpus, smp_started); 75223ac9029SStephen Hurd return (EINVAL); 75323ac9029SStephen Hurd } 75423ac9029SStephen Hurd if (qgroup->tqg_adjusting) { 75523ac9029SStephen Hurd printf("taskqgroup_adjust failed: adjusting\n"); 75623ac9029SStephen Hurd return (EBUSY); 75723ac9029SStephen Hurd } 75823ac9029SStephen Hurd qgroup->tqg_adjusting = 1; 75923ac9029SStephen Hurd old_cnt = qgroup->tqg_cnt; 76023ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 76123ac9029SStephen Hurd /* 76223ac9029SStephen Hurd * Set up queue for tasks added before boot. 76323ac9029SStephen Hurd */ 76423ac9029SStephen Hurd if (old_cnt == 0) { 76523ac9029SStephen Hurd LIST_SWAP(>ask_head, &qgroup->tqg_queue[0].tgc_tasks, 76623ac9029SStephen Hurd grouptask, gt_list); 76723ac9029SStephen Hurd qgroup->tqg_queue[0].tgc_cnt = 0; 76823ac9029SStephen Hurd } 76923ac9029SStephen Hurd 77023ac9029SStephen Hurd /* 77123ac9029SStephen Hurd * If new taskq threads have been added. 77223ac9029SStephen Hurd */ 77323ac9029SStephen Hurd for (i = old_cnt; i < cnt; i++) 77423ac9029SStephen Hurd taskqgroup_cpu_create(qgroup, i); 77523ac9029SStephen Hurd mtx_lock(&qgroup->tqg_lock); 77623ac9029SStephen Hurd qgroup->tqg_cnt = cnt; 77723ac9029SStephen Hurd qgroup->tqg_stride = stride; 77823ac9029SStephen Hurd 77923ac9029SStephen Hurd /* 78023ac9029SStephen Hurd * Adjust drivers to use new taskqs. 78123ac9029SStephen Hurd */ 78223ac9029SStephen Hurd for (i = 0; i < old_cnt; i++) { 78323ac9029SStephen Hurd while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) { 78423ac9029SStephen Hurd LIST_REMOVE(gtask, gt_list); 78523ac9029SStephen Hurd qgroup->tqg_queue[i].tgc_cnt--; 78623ac9029SStephen Hurd LIST_INSERT_HEAD(>ask_head, gtask, gt_list); 78723ac9029SStephen Hurd } 78823ac9029SStephen Hurd } 78923ac9029SStephen Hurd 79023ac9029SStephen Hurd while ((gtask = LIST_FIRST(>ask_head))) { 79123ac9029SStephen Hurd LIST_REMOVE(gtask, gt_list); 79223ac9029SStephen Hurd if (gtask->gt_cpu == -1) 79323ac9029SStephen Hurd qid = taskqgroup_find(qgroup, gtask->gt_uniq); 79423ac9029SStephen Hurd else { 79523ac9029SStephen Hurd for (i = 0; i < qgroup->tqg_cnt; i++) 79623ac9029SStephen Hurd if (qgroup->tqg_queue[i].tgc_cpu == gtask->gt_cpu) { 79723ac9029SStephen Hurd qid = i; 79823ac9029SStephen Hurd break; 79923ac9029SStephen Hurd } 80023ac9029SStephen Hurd } 80123ac9029SStephen Hurd qgroup->tqg_queue[qid].tgc_cnt++; 80223ac9029SStephen Hurd LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, 80323ac9029SStephen Hurd gt_list); 80423ac9029SStephen Hurd gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; 80523ac9029SStephen Hurd } 80623ac9029SStephen Hurd /* 80723ac9029SStephen Hurd * Set new CPU and IRQ affinity 80823ac9029SStephen Hurd */ 80923ac9029SStephen Hurd for (i = 0; i < cnt; i++) { 81023ac9029SStephen Hurd qgroup->tqg_queue[i].tgc_cpu = i * qgroup->tqg_stride; 81123ac9029SStephen Hurd CPU_ZERO(&mask); 81223ac9029SStephen Hurd CPU_SET(qgroup->tqg_queue[i].tgc_cpu, &mask); 81323ac9029SStephen Hurd LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) { 81423ac9029SStephen Hurd if (gtask->gt_irq == -1) 81523ac9029SStephen Hurd continue; 81623ac9029SStephen Hurd intr_setaffinity(gtask->gt_irq, &mask); 81723ac9029SStephen Hurd } 81823ac9029SStephen Hurd } 81923ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 82023ac9029SStephen Hurd 82123ac9029SStephen Hurd /* 82223ac9029SStephen Hurd * If taskq thread count has been reduced. 82323ac9029SStephen Hurd */ 82423ac9029SStephen Hurd for (i = cnt; i < old_cnt; i++) 82523ac9029SStephen Hurd taskqgroup_cpu_remove(qgroup, i); 82623ac9029SStephen Hurd 82723ac9029SStephen Hurd mtx_lock(&qgroup->tqg_lock); 82823ac9029SStephen Hurd qgroup->tqg_adjusting = 0; 82923ac9029SStephen Hurd 83023ac9029SStephen Hurd taskqgroup_bind(qgroup); 83123ac9029SStephen Hurd 83223ac9029SStephen Hurd return (0); 83323ac9029SStephen Hurd } 83423ac9029SStephen Hurd 83523ac9029SStephen Hurd int 83623ac9029SStephen Hurd taskqgroup_adjust(struct taskqgroup *qgroup, int cpu, int stride) 83723ac9029SStephen Hurd { 83823ac9029SStephen Hurd int error; 83923ac9029SStephen Hurd 84023ac9029SStephen Hurd mtx_lock(&qgroup->tqg_lock); 84123ac9029SStephen Hurd error = _taskqgroup_adjust(qgroup, cpu, stride); 84223ac9029SStephen Hurd mtx_unlock(&qgroup->tqg_lock); 84323ac9029SStephen Hurd 84423ac9029SStephen Hurd return (error); 84523ac9029SStephen Hurd } 84623ac9029SStephen Hurd 84723ac9029SStephen Hurd struct taskqgroup * 84823ac9029SStephen Hurd taskqgroup_create(char *name) 84923ac9029SStephen Hurd { 85023ac9029SStephen Hurd struct taskqgroup *qgroup; 85123ac9029SStephen Hurd 85223ac9029SStephen Hurd qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO); 85323ac9029SStephen Hurd mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF); 85423ac9029SStephen Hurd qgroup->tqg_name = name; 85523ac9029SStephen Hurd LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks); 85623ac9029SStephen Hurd 85723ac9029SStephen Hurd return (qgroup); 85823ac9029SStephen Hurd } 85923ac9029SStephen Hurd 86023ac9029SStephen Hurd void 86123ac9029SStephen Hurd taskqgroup_destroy(struct taskqgroup *qgroup) 86223ac9029SStephen Hurd { 86323ac9029SStephen Hurd 86423ac9029SStephen Hurd } 865