xref: /freebsd/sys/kern/subr_gtaskqueue.c (revision 326aacb0)
123ac9029SStephen Hurd /*-
223ac9029SStephen Hurd  * Copyright (c) 2000 Doug Rabson
323ac9029SStephen Hurd  * Copyright (c) 2014 Jeff Roberson
423ac9029SStephen Hurd  * Copyright (c) 2016 Matthew Macy
523ac9029SStephen Hurd  * All rights reserved.
623ac9029SStephen Hurd  *
723ac9029SStephen Hurd  * Redistribution and use in source and binary forms, with or without
823ac9029SStephen Hurd  * modification, are permitted provided that the following conditions
923ac9029SStephen Hurd  * are met:
1023ac9029SStephen Hurd  * 1. Redistributions of source code must retain the above copyright
1123ac9029SStephen Hurd  *    notice, this list of conditions and the following disclaimer.
1223ac9029SStephen Hurd  * 2. Redistributions in binary form must reproduce the above copyright
1323ac9029SStephen Hurd  *    notice, this list of conditions and the following disclaimer in the
1423ac9029SStephen Hurd  *    documentation and/or other materials provided with the distribution.
1523ac9029SStephen Hurd  *
1623ac9029SStephen Hurd  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
1723ac9029SStephen Hurd  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
1823ac9029SStephen Hurd  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
1923ac9029SStephen Hurd  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
2023ac9029SStephen Hurd  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
2123ac9029SStephen Hurd  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
2223ac9029SStephen Hurd  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
2323ac9029SStephen Hurd  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
2423ac9029SStephen Hurd  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
2523ac9029SStephen Hurd  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
2623ac9029SStephen Hurd  * SUCH DAMAGE.
2723ac9029SStephen Hurd  */
2823ac9029SStephen Hurd 
2923ac9029SStephen Hurd #include <sys/cdefs.h>
3023ac9029SStephen Hurd __FBSDID("$FreeBSD$");
3123ac9029SStephen Hurd 
3223ac9029SStephen Hurd #include <sys/param.h>
3323ac9029SStephen Hurd #include <sys/systm.h>
3423ac9029SStephen Hurd #include <sys/bus.h>
3523ac9029SStephen Hurd #include <sys/cpuset.h>
3623ac9029SStephen Hurd #include <sys/interrupt.h>
3723ac9029SStephen Hurd #include <sys/kernel.h>
3823ac9029SStephen Hurd #include <sys/kthread.h>
3923ac9029SStephen Hurd #include <sys/libkern.h>
4023ac9029SStephen Hurd #include <sys/limits.h>
4123ac9029SStephen Hurd #include <sys/lock.h>
4223ac9029SStephen Hurd #include <sys/malloc.h>
4323ac9029SStephen Hurd #include <sys/mutex.h>
4423ac9029SStephen Hurd #include <sys/proc.h>
4523ac9029SStephen Hurd #include <sys/sched.h>
4623ac9029SStephen Hurd #include <sys/smp.h>
4723ac9029SStephen Hurd #include <sys/gtaskqueue.h>
4823ac9029SStephen Hurd #include <sys/unistd.h>
4923ac9029SStephen Hurd #include <machine/stdarg.h>
5023ac9029SStephen Hurd 
51a0fcc371SStephen Hurd static MALLOC_DEFINE(M_GTASKQUEUE, "gtaskqueue", "Group Task Queues");
5223ac9029SStephen Hurd static void	gtaskqueue_thread_enqueue(void *);
5323ac9029SStephen Hurd static void	gtaskqueue_thread_loop(void *arg);
54ab2e3f79SStephen Hurd 
55ab2e3f79SStephen Hurd TASKQGROUP_DEFINE(softirq, mp_ncpus, 1);
56d945ed64SSean Bruno 
5723ac9029SStephen Hurd struct gtaskqueue_busy {
5823ac9029SStephen Hurd 	struct gtask	*tb_running;
5923ac9029SStephen Hurd 	TAILQ_ENTRY(gtaskqueue_busy) tb_link;
6023ac9029SStephen Hurd };
6123ac9029SStephen Hurd 
6223ac9029SStephen Hurd static struct gtask * const TB_DRAIN_WAITER = (struct gtask *)0x1;
6323ac9029SStephen Hurd 
6423ac9029SStephen Hurd struct gtaskqueue {
6523ac9029SStephen Hurd 	STAILQ_HEAD(, gtask)	tq_queue;
6623ac9029SStephen Hurd 	gtaskqueue_enqueue_fn	tq_enqueue;
6723ac9029SStephen Hurd 	void			*tq_context;
6823ac9029SStephen Hurd 	char			*tq_name;
6923ac9029SStephen Hurd 	TAILQ_HEAD(, gtaskqueue_busy) tq_active;
7023ac9029SStephen Hurd 	struct mtx		tq_mutex;
7123ac9029SStephen Hurd 	struct thread		**tq_threads;
7223ac9029SStephen Hurd 	int			tq_tcount;
7323ac9029SStephen Hurd 	int			tq_spin;
7423ac9029SStephen Hurd 	int			tq_flags;
7523ac9029SStephen Hurd 	int			tq_callouts;
7623ac9029SStephen Hurd 	taskqueue_callback_fn	tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
7723ac9029SStephen Hurd 	void			*tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
7823ac9029SStephen Hurd };
7923ac9029SStephen Hurd 
8023ac9029SStephen Hurd #define	TQ_FLAGS_ACTIVE		(1 << 0)
8123ac9029SStephen Hurd #define	TQ_FLAGS_BLOCKED	(1 << 1)
8223ac9029SStephen Hurd #define	TQ_FLAGS_UNLOCKED_ENQUEUE	(1 << 2)
8323ac9029SStephen Hurd 
8423ac9029SStephen Hurd #define	DT_CALLOUT_ARMED	(1 << 0)
8523ac9029SStephen Hurd 
8623ac9029SStephen Hurd #define	TQ_LOCK(tq)							\
8723ac9029SStephen Hurd 	do {								\
8823ac9029SStephen Hurd 		if ((tq)->tq_spin)					\
8923ac9029SStephen Hurd 			mtx_lock_spin(&(tq)->tq_mutex);			\
9023ac9029SStephen Hurd 		else							\
9123ac9029SStephen Hurd 			mtx_lock(&(tq)->tq_mutex);			\
9223ac9029SStephen Hurd 	} while (0)
9323ac9029SStephen Hurd #define	TQ_ASSERT_LOCKED(tq)	mtx_assert(&(tq)->tq_mutex, MA_OWNED)
9423ac9029SStephen Hurd 
9523ac9029SStephen Hurd #define	TQ_UNLOCK(tq)							\
9623ac9029SStephen Hurd 	do {								\
9723ac9029SStephen Hurd 		if ((tq)->tq_spin)					\
9823ac9029SStephen Hurd 			mtx_unlock_spin(&(tq)->tq_mutex);		\
9923ac9029SStephen Hurd 		else							\
10023ac9029SStephen Hurd 			mtx_unlock(&(tq)->tq_mutex);			\
10123ac9029SStephen Hurd 	} while (0)
10223ac9029SStephen Hurd #define	TQ_ASSERT_UNLOCKED(tq)	mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
10323ac9029SStephen Hurd 
1041248952aSSean Bruno #ifdef INVARIANTS
1051248952aSSean Bruno static void
1061248952aSSean Bruno gtask_dump(struct gtask *gtask)
1071248952aSSean Bruno {
1081248952aSSean Bruno 	printf("gtask: %p ta_flags=%x ta_priority=%d ta_func=%p ta_context=%p\n",
1091248952aSSean Bruno 	       gtask, gtask->ta_flags, gtask->ta_priority, gtask->ta_func, gtask->ta_context);
1101248952aSSean Bruno }
1111248952aSSean Bruno #endif
1121248952aSSean Bruno 
11323ac9029SStephen Hurd static __inline int
11423ac9029SStephen Hurd TQ_SLEEP(struct gtaskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
11523ac9029SStephen Hurd     int t)
11623ac9029SStephen Hurd {
11723ac9029SStephen Hurd 	if (tq->tq_spin)
11823ac9029SStephen Hurd 		return (msleep_spin(p, m, wm, t));
11923ac9029SStephen Hurd 	return (msleep(p, m, pri, wm, t));
12023ac9029SStephen Hurd }
12123ac9029SStephen Hurd 
12223ac9029SStephen Hurd static struct gtaskqueue *
12323ac9029SStephen Hurd _gtaskqueue_create(const char *name, int mflags,
12423ac9029SStephen Hurd 		 taskqueue_enqueue_fn enqueue, void *context,
12523ac9029SStephen Hurd 		 int mtxflags, const char *mtxname __unused)
12623ac9029SStephen Hurd {
12723ac9029SStephen Hurd 	struct gtaskqueue *queue;
12823ac9029SStephen Hurd 	char *tq_name;
12923ac9029SStephen Hurd 
13023ac9029SStephen Hurd 	tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO);
13123ac9029SStephen Hurd 	if (!tq_name)
13223ac9029SStephen Hurd 		return (NULL);
13323ac9029SStephen Hurd 
13423ac9029SStephen Hurd 	snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
13523ac9029SStephen Hurd 
13623ac9029SStephen Hurd 	queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO);
13723ac9029SStephen Hurd 	if (!queue)
13823ac9029SStephen Hurd 		return (NULL);
13923ac9029SStephen Hurd 
14023ac9029SStephen Hurd 	STAILQ_INIT(&queue->tq_queue);
14123ac9029SStephen Hurd 	TAILQ_INIT(&queue->tq_active);
14223ac9029SStephen Hurd 	queue->tq_enqueue = enqueue;
14323ac9029SStephen Hurd 	queue->tq_context = context;
14423ac9029SStephen Hurd 	queue->tq_name = tq_name;
14523ac9029SStephen Hurd 	queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
14623ac9029SStephen Hurd 	queue->tq_flags |= TQ_FLAGS_ACTIVE;
14723ac9029SStephen Hurd 	if (enqueue == gtaskqueue_thread_enqueue)
14823ac9029SStephen Hurd 		queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
14923ac9029SStephen Hurd 	mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
15023ac9029SStephen Hurd 
15123ac9029SStephen Hurd 	return (queue);
15223ac9029SStephen Hurd }
15323ac9029SStephen Hurd 
15423ac9029SStephen Hurd 
15523ac9029SStephen Hurd /*
15623ac9029SStephen Hurd  * Signal a taskqueue thread to terminate.
15723ac9029SStephen Hurd  */
15823ac9029SStephen Hurd static void
15923ac9029SStephen Hurd gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq)
16023ac9029SStephen Hurd {
16123ac9029SStephen Hurd 
16223ac9029SStephen Hurd 	while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
16323ac9029SStephen Hurd 		wakeup(tq);
16423ac9029SStephen Hurd 		TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
16523ac9029SStephen Hurd 	}
16623ac9029SStephen Hurd }
16723ac9029SStephen Hurd 
16823ac9029SStephen Hurd static void
16923ac9029SStephen Hurd gtaskqueue_free(struct gtaskqueue *queue)
17023ac9029SStephen Hurd {
17123ac9029SStephen Hurd 
17223ac9029SStephen Hurd 	TQ_LOCK(queue);
17323ac9029SStephen Hurd 	queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
17423ac9029SStephen Hurd 	gtaskqueue_terminate(queue->tq_threads, queue);
17523ac9029SStephen Hurd 	KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
17623ac9029SStephen Hurd 	KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
17723ac9029SStephen Hurd 	mtx_destroy(&queue->tq_mutex);
17823ac9029SStephen Hurd 	free(queue->tq_threads, M_GTASKQUEUE);
17923ac9029SStephen Hurd 	free(queue->tq_name, M_GTASKQUEUE);
18023ac9029SStephen Hurd 	free(queue, M_GTASKQUEUE);
18123ac9029SStephen Hurd }
18223ac9029SStephen Hurd 
18323ac9029SStephen Hurd int
18423ac9029SStephen Hurd grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask)
18523ac9029SStephen Hurd {
1861248952aSSean Bruno #ifdef INVARIANTS
1871248952aSSean Bruno 	if (queue == NULL) {
1881248952aSSean Bruno 		gtask_dump(gtask);
1891248952aSSean Bruno 		panic("queue == NULL");
1901248952aSSean Bruno 	}
1911248952aSSean Bruno #endif
19223ac9029SStephen Hurd 	TQ_LOCK(queue);
19323ac9029SStephen Hurd 	if (gtask->ta_flags & TASK_ENQUEUED) {
19423ac9029SStephen Hurd 		TQ_UNLOCK(queue);
19523ac9029SStephen Hurd 		return (0);
19623ac9029SStephen Hurd 	}
19723ac9029SStephen Hurd 	STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link);
19823ac9029SStephen Hurd 	gtask->ta_flags |= TASK_ENQUEUED;
19923ac9029SStephen Hurd 	TQ_UNLOCK(queue);
200ab2e3f79SStephen Hurd 	if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
20123ac9029SStephen Hurd 		queue->tq_enqueue(queue->tq_context);
20223ac9029SStephen Hurd 	return (0);
20323ac9029SStephen Hurd }
20423ac9029SStephen Hurd 
20523ac9029SStephen Hurd static void
20623ac9029SStephen Hurd gtaskqueue_task_nop_fn(void *context)
20723ac9029SStephen Hurd {
20823ac9029SStephen Hurd }
20923ac9029SStephen Hurd 
21023ac9029SStephen Hurd /*
21123ac9029SStephen Hurd  * Block until all currently queued tasks in this taskqueue
21223ac9029SStephen Hurd  * have begun execution.  Tasks queued during execution of
21323ac9029SStephen Hurd  * this function are ignored.
21423ac9029SStephen Hurd  */
21523ac9029SStephen Hurd static void
21623ac9029SStephen Hurd gtaskqueue_drain_tq_queue(struct gtaskqueue *queue)
21723ac9029SStephen Hurd {
21823ac9029SStephen Hurd 	struct gtask t_barrier;
21923ac9029SStephen Hurd 
22023ac9029SStephen Hurd 	if (STAILQ_EMPTY(&queue->tq_queue))
22123ac9029SStephen Hurd 		return;
22223ac9029SStephen Hurd 
22323ac9029SStephen Hurd 	/*
22423ac9029SStephen Hurd 	 * Enqueue our barrier after all current tasks, but with
22523ac9029SStephen Hurd 	 * the highest priority so that newly queued tasks cannot
22623ac9029SStephen Hurd 	 * pass it.  Because of the high priority, we can not use
22723ac9029SStephen Hurd 	 * taskqueue_enqueue_locked directly (which drops the lock
22823ac9029SStephen Hurd 	 * anyway) so just insert it at tail while we have the
22923ac9029SStephen Hurd 	 * queue lock.
23023ac9029SStephen Hurd 	 */
23123ac9029SStephen Hurd 	GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier);
23223ac9029SStephen Hurd 	STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
23323ac9029SStephen Hurd 	t_barrier.ta_flags |= TASK_ENQUEUED;
23423ac9029SStephen Hurd 
23523ac9029SStephen Hurd 	/*
23623ac9029SStephen Hurd 	 * Once the barrier has executed, all previously queued tasks
23723ac9029SStephen Hurd 	 * have completed or are currently executing.
23823ac9029SStephen Hurd 	 */
23923ac9029SStephen Hurd 	while (t_barrier.ta_flags & TASK_ENQUEUED)
24023ac9029SStephen Hurd 		TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
24123ac9029SStephen Hurd }
24223ac9029SStephen Hurd 
24323ac9029SStephen Hurd /*
24423ac9029SStephen Hurd  * Block until all currently executing tasks for this taskqueue
24523ac9029SStephen Hurd  * complete.  Tasks that begin execution during the execution
24623ac9029SStephen Hurd  * of this function are ignored.
24723ac9029SStephen Hurd  */
24823ac9029SStephen Hurd static void
24923ac9029SStephen Hurd gtaskqueue_drain_tq_active(struct gtaskqueue *queue)
25023ac9029SStephen Hurd {
25123ac9029SStephen Hurd 	struct gtaskqueue_busy tb_marker, *tb_first;
25223ac9029SStephen Hurd 
25323ac9029SStephen Hurd 	if (TAILQ_EMPTY(&queue->tq_active))
25423ac9029SStephen Hurd 		return;
25523ac9029SStephen Hurd 
25623ac9029SStephen Hurd 	/* Block taskq_terminate().*/
25723ac9029SStephen Hurd 	queue->tq_callouts++;
25823ac9029SStephen Hurd 
25923ac9029SStephen Hurd 	/*
26023ac9029SStephen Hurd 	 * Wait for all currently executing taskqueue threads
26123ac9029SStephen Hurd 	 * to go idle.
26223ac9029SStephen Hurd 	 */
26323ac9029SStephen Hurd 	tb_marker.tb_running = TB_DRAIN_WAITER;
26423ac9029SStephen Hurd 	TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
26523ac9029SStephen Hurd 	while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
26623ac9029SStephen Hurd 		TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
26723ac9029SStephen Hurd 	TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
26823ac9029SStephen Hurd 
26923ac9029SStephen Hurd 	/*
27023ac9029SStephen Hurd 	 * Wakeup any other drain waiter that happened to queue up
27123ac9029SStephen Hurd 	 * without any intervening active thread.
27223ac9029SStephen Hurd 	 */
27323ac9029SStephen Hurd 	tb_first = TAILQ_FIRST(&queue->tq_active);
27423ac9029SStephen Hurd 	if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
27523ac9029SStephen Hurd 		wakeup(tb_first);
27623ac9029SStephen Hurd 
27723ac9029SStephen Hurd 	/* Release taskqueue_terminate(). */
27823ac9029SStephen Hurd 	queue->tq_callouts--;
27923ac9029SStephen Hurd 	if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
28023ac9029SStephen Hurd 		wakeup_one(queue->tq_threads);
28123ac9029SStephen Hurd }
28223ac9029SStephen Hurd 
28323ac9029SStephen Hurd void
28423ac9029SStephen Hurd gtaskqueue_block(struct gtaskqueue *queue)
28523ac9029SStephen Hurd {
28623ac9029SStephen Hurd 
28723ac9029SStephen Hurd 	TQ_LOCK(queue);
28823ac9029SStephen Hurd 	queue->tq_flags |= TQ_FLAGS_BLOCKED;
28923ac9029SStephen Hurd 	TQ_UNLOCK(queue);
29023ac9029SStephen Hurd }
29123ac9029SStephen Hurd 
29223ac9029SStephen Hurd void
29323ac9029SStephen Hurd gtaskqueue_unblock(struct gtaskqueue *queue)
29423ac9029SStephen Hurd {
29523ac9029SStephen Hurd 
29623ac9029SStephen Hurd 	TQ_LOCK(queue);
29723ac9029SStephen Hurd 	queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
29823ac9029SStephen Hurd 	if (!STAILQ_EMPTY(&queue->tq_queue))
29923ac9029SStephen Hurd 		queue->tq_enqueue(queue->tq_context);
30023ac9029SStephen Hurd 	TQ_UNLOCK(queue);
30123ac9029SStephen Hurd }
30223ac9029SStephen Hurd 
30323ac9029SStephen Hurd static void
30423ac9029SStephen Hurd gtaskqueue_run_locked(struct gtaskqueue *queue)
30523ac9029SStephen Hurd {
30623ac9029SStephen Hurd 	struct gtaskqueue_busy tb;
30723ac9029SStephen Hurd 	struct gtaskqueue_busy *tb_first;
30823ac9029SStephen Hurd 	struct gtask *gtask;
30923ac9029SStephen Hurd 
31023ac9029SStephen Hurd 	KASSERT(queue != NULL, ("tq is NULL"));
31123ac9029SStephen Hurd 	TQ_ASSERT_LOCKED(queue);
31223ac9029SStephen Hurd 	tb.tb_running = NULL;
31323ac9029SStephen Hurd 
31423ac9029SStephen Hurd 	while (STAILQ_FIRST(&queue->tq_queue)) {
31523ac9029SStephen Hurd 		TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
31623ac9029SStephen Hurd 
31723ac9029SStephen Hurd 		/*
31823ac9029SStephen Hurd 		 * Carefully remove the first task from the queue and
31923ac9029SStephen Hurd 		 * clear its TASK_ENQUEUED flag
32023ac9029SStephen Hurd 		 */
32123ac9029SStephen Hurd 		gtask = STAILQ_FIRST(&queue->tq_queue);
32223ac9029SStephen Hurd 		KASSERT(gtask != NULL, ("task is NULL"));
32323ac9029SStephen Hurd 		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
32423ac9029SStephen Hurd 		gtask->ta_flags &= ~TASK_ENQUEUED;
32523ac9029SStephen Hurd 		tb.tb_running = gtask;
32623ac9029SStephen Hurd 		TQ_UNLOCK(queue);
32723ac9029SStephen Hurd 
32823ac9029SStephen Hurd 		KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL"));
32923ac9029SStephen Hurd 		gtask->ta_func(gtask->ta_context);
33023ac9029SStephen Hurd 
33123ac9029SStephen Hurd 		TQ_LOCK(queue);
33223ac9029SStephen Hurd 		tb.tb_running = NULL;
33323ac9029SStephen Hurd 		wakeup(gtask);
33423ac9029SStephen Hurd 
33523ac9029SStephen Hurd 		TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
33623ac9029SStephen Hurd 		tb_first = TAILQ_FIRST(&queue->tq_active);
33723ac9029SStephen Hurd 		if (tb_first != NULL &&
33823ac9029SStephen Hurd 		    tb_first->tb_running == TB_DRAIN_WAITER)
33923ac9029SStephen Hurd 			wakeup(tb_first);
34023ac9029SStephen Hurd 	}
34123ac9029SStephen Hurd }
34223ac9029SStephen Hurd 
34323ac9029SStephen Hurd static int
34423ac9029SStephen Hurd task_is_running(struct gtaskqueue *queue, struct gtask *gtask)
34523ac9029SStephen Hurd {
34623ac9029SStephen Hurd 	struct gtaskqueue_busy *tb;
34723ac9029SStephen Hurd 
34823ac9029SStephen Hurd 	TQ_ASSERT_LOCKED(queue);
34923ac9029SStephen Hurd 	TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
35023ac9029SStephen Hurd 		if (tb->tb_running == gtask)
35123ac9029SStephen Hurd 			return (1);
35223ac9029SStephen Hurd 	}
35323ac9029SStephen Hurd 	return (0);
35423ac9029SStephen Hurd }
35523ac9029SStephen Hurd 
35623ac9029SStephen Hurd static int
35723ac9029SStephen Hurd gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask)
35823ac9029SStephen Hurd {
35923ac9029SStephen Hurd 
36023ac9029SStephen Hurd 	if (gtask->ta_flags & TASK_ENQUEUED)
36123ac9029SStephen Hurd 		STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link);
36223ac9029SStephen Hurd 	gtask->ta_flags &= ~TASK_ENQUEUED;
36323ac9029SStephen Hurd 	return (task_is_running(queue, gtask) ? EBUSY : 0);
36423ac9029SStephen Hurd }
36523ac9029SStephen Hurd 
36623ac9029SStephen Hurd int
36723ac9029SStephen Hurd gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask)
36823ac9029SStephen Hurd {
36923ac9029SStephen Hurd 	int error;
37023ac9029SStephen Hurd 
37123ac9029SStephen Hurd 	TQ_LOCK(queue);
37223ac9029SStephen Hurd 	error = gtaskqueue_cancel_locked(queue, gtask);
37323ac9029SStephen Hurd 	TQ_UNLOCK(queue);
37423ac9029SStephen Hurd 
37523ac9029SStephen Hurd 	return (error);
37623ac9029SStephen Hurd }
37723ac9029SStephen Hurd 
37823ac9029SStephen Hurd void
37923ac9029SStephen Hurd gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask)
38023ac9029SStephen Hurd {
38123ac9029SStephen Hurd 
38223ac9029SStephen Hurd 	if (!queue->tq_spin)
38323ac9029SStephen Hurd 		WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
38423ac9029SStephen Hurd 
38523ac9029SStephen Hurd 	TQ_LOCK(queue);
38623ac9029SStephen Hurd 	while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask))
38723ac9029SStephen Hurd 		TQ_SLEEP(queue, gtask, &queue->tq_mutex, PWAIT, "-", 0);
38823ac9029SStephen Hurd 	TQ_UNLOCK(queue);
38923ac9029SStephen Hurd }
39023ac9029SStephen Hurd 
39123ac9029SStephen Hurd void
39223ac9029SStephen Hurd gtaskqueue_drain_all(struct gtaskqueue *queue)
39323ac9029SStephen Hurd {
39423ac9029SStephen Hurd 
39523ac9029SStephen Hurd 	if (!queue->tq_spin)
39623ac9029SStephen Hurd 		WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
39723ac9029SStephen Hurd 
39823ac9029SStephen Hurd 	TQ_LOCK(queue);
39923ac9029SStephen Hurd 	gtaskqueue_drain_tq_queue(queue);
40023ac9029SStephen Hurd 	gtaskqueue_drain_tq_active(queue);
40123ac9029SStephen Hurd 	TQ_UNLOCK(queue);
40223ac9029SStephen Hurd }
40323ac9029SStephen Hurd 
40423ac9029SStephen Hurd static int
40523ac9029SStephen Hurd _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
406ab2e3f79SStephen Hurd     cpuset_t *mask, const char *name, va_list ap)
40723ac9029SStephen Hurd {
40823ac9029SStephen Hurd 	char ktname[MAXCOMLEN + 1];
40923ac9029SStephen Hurd 	struct thread *td;
41023ac9029SStephen Hurd 	struct gtaskqueue *tq;
41123ac9029SStephen Hurd 	int i, error;
41223ac9029SStephen Hurd 
41323ac9029SStephen Hurd 	if (count <= 0)
41423ac9029SStephen Hurd 		return (EINVAL);
41523ac9029SStephen Hurd 
41623ac9029SStephen Hurd 	vsnprintf(ktname, sizeof(ktname), name, ap);
41723ac9029SStephen Hurd 	tq = *tqp;
41823ac9029SStephen Hurd 
41923ac9029SStephen Hurd 	tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE,
42023ac9029SStephen Hurd 	    M_NOWAIT | M_ZERO);
42123ac9029SStephen Hurd 	if (tq->tq_threads == NULL) {
42223ac9029SStephen Hurd 		printf("%s: no memory for %s threads\n", __func__, ktname);
42323ac9029SStephen Hurd 		return (ENOMEM);
42423ac9029SStephen Hurd 	}
42523ac9029SStephen Hurd 
42623ac9029SStephen Hurd 	for (i = 0; i < count; i++) {
42723ac9029SStephen Hurd 		if (count == 1)
42823ac9029SStephen Hurd 			error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
42923ac9029SStephen Hurd 			    &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
43023ac9029SStephen Hurd 		else
43123ac9029SStephen Hurd 			error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
43223ac9029SStephen Hurd 			    &tq->tq_threads[i], RFSTOPPED, 0,
43323ac9029SStephen Hurd 			    "%s_%d", ktname, i);
43423ac9029SStephen Hurd 		if (error) {
43523ac9029SStephen Hurd 			/* should be ok to continue, taskqueue_free will dtrt */
43623ac9029SStephen Hurd 			printf("%s: kthread_add(%s): error %d", __func__,
43723ac9029SStephen Hurd 			    ktname, error);
43823ac9029SStephen Hurd 			tq->tq_threads[i] = NULL;		/* paranoid */
43923ac9029SStephen Hurd 		} else
44023ac9029SStephen Hurd 			tq->tq_tcount++;
44123ac9029SStephen Hurd 	}
44223ac9029SStephen Hurd 	for (i = 0; i < count; i++) {
44323ac9029SStephen Hurd 		if (tq->tq_threads[i] == NULL)
44423ac9029SStephen Hurd 			continue;
44523ac9029SStephen Hurd 		td = tq->tq_threads[i];
44623ac9029SStephen Hurd 		if (mask) {
44723ac9029SStephen Hurd 			error = cpuset_setthread(td->td_tid, mask);
44823ac9029SStephen Hurd 			/*
44923ac9029SStephen Hurd 			 * Failing to pin is rarely an actual fatal error;
45023ac9029SStephen Hurd 			 * it'll just affect performance.
45123ac9029SStephen Hurd 			 */
45223ac9029SStephen Hurd 			if (error)
45323ac9029SStephen Hurd 				printf("%s: curthread=%llu: can't pin; "
45423ac9029SStephen Hurd 				    "error=%d\n",
45523ac9029SStephen Hurd 				    __func__,
45623ac9029SStephen Hurd 				    (unsigned long long) td->td_tid,
45723ac9029SStephen Hurd 				    error);
45823ac9029SStephen Hurd 		}
45923ac9029SStephen Hurd 		thread_lock(td);
46023ac9029SStephen Hurd 		sched_prio(td, pri);
46123ac9029SStephen Hurd 		sched_add(td, SRQ_BORING);
46223ac9029SStephen Hurd 		thread_unlock(td);
46323ac9029SStephen Hurd 	}
46423ac9029SStephen Hurd 
46523ac9029SStephen Hurd 	return (0);
46623ac9029SStephen Hurd }
46723ac9029SStephen Hurd 
46823ac9029SStephen Hurd static int
46923ac9029SStephen Hurd gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
470ab2e3f79SStephen Hurd     const char *name, ...)
47123ac9029SStephen Hurd {
47223ac9029SStephen Hurd 	va_list ap;
47323ac9029SStephen Hurd 	int error;
47423ac9029SStephen Hurd 
47523ac9029SStephen Hurd 	va_start(ap, name);
476ab2e3f79SStephen Hurd 	error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap);
47723ac9029SStephen Hurd 	va_end(ap);
47823ac9029SStephen Hurd 	return (error);
47923ac9029SStephen Hurd }
48023ac9029SStephen Hurd 
48123ac9029SStephen Hurd static inline void
48223ac9029SStephen Hurd gtaskqueue_run_callback(struct gtaskqueue *tq,
48323ac9029SStephen Hurd     enum taskqueue_callback_type cb_type)
48423ac9029SStephen Hurd {
48523ac9029SStephen Hurd 	taskqueue_callback_fn tq_callback;
48623ac9029SStephen Hurd 
48723ac9029SStephen Hurd 	TQ_ASSERT_UNLOCKED(tq);
48823ac9029SStephen Hurd 	tq_callback = tq->tq_callbacks[cb_type];
48923ac9029SStephen Hurd 	if (tq_callback != NULL)
49023ac9029SStephen Hurd 		tq_callback(tq->tq_cb_contexts[cb_type]);
49123ac9029SStephen Hurd }
49223ac9029SStephen Hurd 
49323ac9029SStephen Hurd static void
494ab2e3f79SStephen Hurd gtaskqueue_thread_loop(void *arg)
49523ac9029SStephen Hurd {
496ab2e3f79SStephen Hurd 	struct gtaskqueue **tqp, *tq;
49723ac9029SStephen Hurd 
498ab2e3f79SStephen Hurd 	tqp = arg;
499ab2e3f79SStephen Hurd 	tq = *tqp;
500ab2e3f79SStephen Hurd 	gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
501d300df01SStephen Hurd 	TQ_LOCK(tq);
502d300df01SStephen Hurd 	while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
503ab2e3f79SStephen Hurd 		/* XXX ? */
50423ac9029SStephen Hurd 		gtaskqueue_run_locked(tq);
50523ac9029SStephen Hurd 		/*
50623ac9029SStephen Hurd 		 * Because taskqueue_run() can drop tq_mutex, we need to
50723ac9029SStephen Hurd 		 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
50823ac9029SStephen Hurd 		 * meantime, which means we missed a wakeup.
50923ac9029SStephen Hurd 		 */
51023ac9029SStephen Hurd 		if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
51123ac9029SStephen Hurd 			break;
51223ac9029SStephen Hurd 		TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
51323ac9029SStephen Hurd 	}
51423ac9029SStephen Hurd 	gtaskqueue_run_locked(tq);
51523ac9029SStephen Hurd 	/*
51623ac9029SStephen Hurd 	 * This thread is on its way out, so just drop the lock temporarily
51723ac9029SStephen Hurd 	 * in order to call the shutdown callback.  This allows the callback
51823ac9029SStephen Hurd 	 * to look at the taskqueue, even just before it dies.
51923ac9029SStephen Hurd 	 */
52023ac9029SStephen Hurd 	TQ_UNLOCK(tq);
52123ac9029SStephen Hurd 	gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
52223ac9029SStephen Hurd 	TQ_LOCK(tq);
52323ac9029SStephen Hurd 
52423ac9029SStephen Hurd 	/* rendezvous with thread that asked us to terminate */
52523ac9029SStephen Hurd 	tq->tq_tcount--;
52623ac9029SStephen Hurd 	wakeup_one(tq->tq_threads);
52723ac9029SStephen Hurd 	TQ_UNLOCK(tq);
52823ac9029SStephen Hurd 	kthread_exit();
52923ac9029SStephen Hurd }
53023ac9029SStephen Hurd 
53123ac9029SStephen Hurd static void
53223ac9029SStephen Hurd gtaskqueue_thread_enqueue(void *context)
53323ac9029SStephen Hurd {
53423ac9029SStephen Hurd 	struct gtaskqueue **tqp, *tq;
53523ac9029SStephen Hurd 
53623ac9029SStephen Hurd 	tqp = context;
53723ac9029SStephen Hurd 	tq = *tqp;
53823ac9029SStephen Hurd 	wakeup_one(tq);
53923ac9029SStephen Hurd }
54023ac9029SStephen Hurd 
54123ac9029SStephen Hurd 
54223ac9029SStephen Hurd static struct gtaskqueue *
54323ac9029SStephen Hurd gtaskqueue_create_fast(const char *name, int mflags,
54423ac9029SStephen Hurd 		 taskqueue_enqueue_fn enqueue, void *context)
54523ac9029SStephen Hurd {
54623ac9029SStephen Hurd 	return _gtaskqueue_create(name, mflags, enqueue, context,
54723ac9029SStephen Hurd 			MTX_SPIN, "fast_taskqueue");
54823ac9029SStephen Hurd }
54923ac9029SStephen Hurd 
55023ac9029SStephen Hurd 
55123ac9029SStephen Hurd struct taskqgroup_cpu {
55223ac9029SStephen Hurd 	LIST_HEAD(, grouptask)	tgc_tasks;
55323ac9029SStephen Hurd 	struct gtaskqueue	*tgc_taskq;
55423ac9029SStephen Hurd 	int	tgc_cnt;
55523ac9029SStephen Hurd 	int	tgc_cpu;
55623ac9029SStephen Hurd };
55723ac9029SStephen Hurd 
55823ac9029SStephen Hurd struct taskqgroup {
55923ac9029SStephen Hurd 	struct taskqgroup_cpu tqg_queue[MAXCPU];
56023ac9029SStephen Hurd 	struct mtx	tqg_lock;
56123ac9029SStephen Hurd 	char *		tqg_name;
56223ac9029SStephen Hurd 	int		tqg_adjusting;
56323ac9029SStephen Hurd 	int		tqg_stride;
56423ac9029SStephen Hurd 	int		tqg_cnt;
56523ac9029SStephen Hurd };
56623ac9029SStephen Hurd 
56723ac9029SStephen Hurd struct taskq_bind_task {
56823ac9029SStephen Hurd 	struct gtask bt_task;
56923ac9029SStephen Hurd 	int	bt_cpuid;
57023ac9029SStephen Hurd };
57123ac9029SStephen Hurd 
57223ac9029SStephen Hurd static void
573ab2e3f79SStephen Hurd taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx, int cpu)
57423ac9029SStephen Hurd {
57523ac9029SStephen Hurd 	struct taskqgroup_cpu *qcpu;
57623ac9029SStephen Hurd 
57723ac9029SStephen Hurd 	qcpu = &qgroup->tqg_queue[idx];
57823ac9029SStephen Hurd 	LIST_INIT(&qcpu->tgc_tasks);
579ab2e3f79SStephen Hurd 	qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK,
58023ac9029SStephen Hurd 	    taskqueue_thread_enqueue, &qcpu->tgc_taskq);
581ab2e3f79SStephen Hurd 	gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT,
582ab2e3f79SStephen Hurd 	    "%s_%d", qgroup->tqg_name, idx);
58312d1b8c9SSean Bruno 	qcpu->tgc_cpu = cpu;
58423ac9029SStephen Hurd }
58523ac9029SStephen Hurd 
58623ac9029SStephen Hurd static void
58723ac9029SStephen Hurd taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx)
58823ac9029SStephen Hurd {
58923ac9029SStephen Hurd 
59023ac9029SStephen Hurd 	gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq);
59123ac9029SStephen Hurd }
59223ac9029SStephen Hurd 
59323ac9029SStephen Hurd /*
59423ac9029SStephen Hurd  * Find the taskq with least # of tasks that doesn't currently have any
59523ac9029SStephen Hurd  * other queues from the uniq identifier.
59623ac9029SStephen Hurd  */
59723ac9029SStephen Hurd static int
59823ac9029SStephen Hurd taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
59923ac9029SStephen Hurd {
60023ac9029SStephen Hurd 	struct grouptask *n;
60123ac9029SStephen Hurd 	int i, idx, mincnt;
60223ac9029SStephen Hurd 	int strict;
60323ac9029SStephen Hurd 
60423ac9029SStephen Hurd 	mtx_assert(&qgroup->tqg_lock, MA_OWNED);
60523ac9029SStephen Hurd 	if (qgroup->tqg_cnt == 0)
60623ac9029SStephen Hurd 		return (0);
60723ac9029SStephen Hurd 	idx = -1;
60823ac9029SStephen Hurd 	mincnt = INT_MAX;
60923ac9029SStephen Hurd 	/*
61023ac9029SStephen Hurd 	 * Two passes;  First scan for a queue with the least tasks that
61123ac9029SStephen Hurd 	 * does not already service this uniq id.  If that fails simply find
61223ac9029SStephen Hurd 	 * the queue with the least total tasks;
61323ac9029SStephen Hurd 	 */
61423ac9029SStephen Hurd 	for (strict = 1; mincnt == INT_MAX; strict = 0) {
61523ac9029SStephen Hurd 		for (i = 0; i < qgroup->tqg_cnt; i++) {
61623ac9029SStephen Hurd 			if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
61723ac9029SStephen Hurd 				continue;
61823ac9029SStephen Hurd 			if (strict) {
61923ac9029SStephen Hurd 				LIST_FOREACH(n,
62023ac9029SStephen Hurd 				    &qgroup->tqg_queue[i].tgc_tasks, gt_list)
62123ac9029SStephen Hurd 					if (n->gt_uniq == uniq)
62223ac9029SStephen Hurd 						break;
62323ac9029SStephen Hurd 				if (n != NULL)
62423ac9029SStephen Hurd 					continue;
62523ac9029SStephen Hurd 			}
62623ac9029SStephen Hurd 			mincnt = qgroup->tqg_queue[i].tgc_cnt;
62723ac9029SStephen Hurd 			idx = i;
62823ac9029SStephen Hurd 		}
62923ac9029SStephen Hurd 	}
63023ac9029SStephen Hurd 	if (idx == -1)
63123ac9029SStephen Hurd 		panic("taskqgroup_find: Failed to pick a qid.");
63223ac9029SStephen Hurd 
63323ac9029SStephen Hurd 	return (idx);
63423ac9029SStephen Hurd }
635de414cfeSSean Bruno 
636bd84f700SSean Bruno /*
637bd84f700SSean Bruno  * smp_started is unusable since it is not set for UP kernels or even for
638bd84f700SSean Bruno  * SMP kernels when there is 1 CPU.  This is usually handled by adding a
639bd84f700SSean Bruno  * (mp_ncpus == 1) test, but that would be broken here since we need to
640bd84f700SSean Bruno  * to synchronize with the SI_SUB_SMP ordering.  Even in the pure SMP case
641bd84f700SSean Bruno  * smp_started only gives a fuzzy ordering relative to SI_SUB_SMP.
642bd84f700SSean Bruno  *
643bd84f700SSean Bruno  * So maintain our own flag.  It must be set after all CPUs are started
644bd84f700SSean Bruno  * and before SI_SUB_SMP:SI_ORDER_ANY so that the SYSINIT for delayed
645bd84f700SSean Bruno  * adjustment is properly delayed.  SI_ORDER_FOURTH is clearly before
646bd84f700SSean Bruno  * SI_ORDER_ANY and unclearly after the CPUs are started.  It would be
647bd84f700SSean Bruno  * simpler for adjustment to pass a flag indicating if it is delayed.
648bd84f700SSean Bruno  */
649de414cfeSSean Bruno 
650bd84f700SSean Bruno static int tqg_smp_started;
651bd84f700SSean Bruno 
652bd84f700SSean Bruno static void
653bd84f700SSean Bruno tqg_record_smp_started(void *arg)
654bd84f700SSean Bruno {
655bd84f700SSean Bruno 	tqg_smp_started = 1;
656bd84f700SSean Bruno }
657bd84f700SSean Bruno 
658bd84f700SSean Bruno SYSINIT(tqg_record_smp_started, SI_SUB_SMP, SI_ORDER_FOURTH,
659bd84f700SSean Bruno 	tqg_record_smp_started, NULL);
66023ac9029SStephen Hurd 
66123ac9029SStephen Hurd void
66223ac9029SStephen Hurd taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
66323ac9029SStephen Hurd     void *uniq, int irq, char *name)
66423ac9029SStephen Hurd {
66523ac9029SStephen Hurd 	cpuset_t mask;
666326aacb0SStephen Hurd 	int qid, error;
66723ac9029SStephen Hurd 
66823ac9029SStephen Hurd 	gtask->gt_uniq = uniq;
66923ac9029SStephen Hurd 	gtask->gt_name = name;
67023ac9029SStephen Hurd 	gtask->gt_irq = irq;
67123ac9029SStephen Hurd 	gtask->gt_cpu = -1;
67223ac9029SStephen Hurd 	mtx_lock(&qgroup->tqg_lock);
67323ac9029SStephen Hurd 	qid = taskqgroup_find(qgroup, uniq);
67423ac9029SStephen Hurd 	qgroup->tqg_queue[qid].tgc_cnt++;
67523ac9029SStephen Hurd 	LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
67623ac9029SStephen Hurd 	gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
677bd84f700SSean Bruno 	if (irq != -1 && tqg_smp_started) {
67812d1b8c9SSean Bruno 		gtask->gt_cpu = qgroup->tqg_queue[qid].tgc_cpu;
67923ac9029SStephen Hurd 		CPU_ZERO(&mask);
68023ac9029SStephen Hurd 		CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
68123ac9029SStephen Hurd 		mtx_unlock(&qgroup->tqg_lock);
682326aacb0SStephen Hurd 		error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask);
683326aacb0SStephen Hurd 		if (error)
684326aacb0SStephen Hurd 			printf("%s: setaffinity failed: %d\n", __func__, error);
68523ac9029SStephen Hurd 	} else
68623ac9029SStephen Hurd 		mtx_unlock(&qgroup->tqg_lock);
68723ac9029SStephen Hurd }
68823ac9029SStephen Hurd 
68912d1b8c9SSean Bruno static void
69012d1b8c9SSean Bruno taskqgroup_attach_deferred(struct taskqgroup *qgroup, struct grouptask *gtask)
69112d1b8c9SSean Bruno {
69212d1b8c9SSean Bruno 	cpuset_t mask;
693326aacb0SStephen Hurd 	int qid, cpu, error;
69412d1b8c9SSean Bruno 
69512d1b8c9SSean Bruno 	mtx_lock(&qgroup->tqg_lock);
69612d1b8c9SSean Bruno 	qid = taskqgroup_find(qgroup, gtask->gt_uniq);
69712d1b8c9SSean Bruno 	cpu = qgroup->tqg_queue[qid].tgc_cpu;
69812d1b8c9SSean Bruno 	if (gtask->gt_irq != -1) {
69912d1b8c9SSean Bruno 		mtx_unlock(&qgroup->tqg_lock);
70012d1b8c9SSean Bruno 
70112d1b8c9SSean Bruno 		CPU_ZERO(&mask);
70212d1b8c9SSean Bruno 		CPU_SET(cpu, &mask);
703326aacb0SStephen Hurd 		error = intr_setaffinity(gtask->gt_irq, CPU_WHICH_IRQ, &mask);
70412d1b8c9SSean Bruno 		mtx_lock(&qgroup->tqg_lock);
705326aacb0SStephen Hurd 		if (error)
706326aacb0SStephen Hurd 			printf("%s: setaffinity failed: %d\n", __func__, error);
707326aacb0SStephen Hurd 
70812d1b8c9SSean Bruno 	}
70912d1b8c9SSean Bruno 	qgroup->tqg_queue[qid].tgc_cnt++;
71012d1b8c9SSean Bruno 
71112d1b8c9SSean Bruno 	LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask,
71212d1b8c9SSean Bruno 			 gt_list);
713abf38392SSean Bruno 	MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL);
71412d1b8c9SSean Bruno 	gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
71512d1b8c9SSean Bruno 	mtx_unlock(&qgroup->tqg_lock);
71612d1b8c9SSean Bruno }
71712d1b8c9SSean Bruno 
71823ac9029SStephen Hurd int
71923ac9029SStephen Hurd taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
72023ac9029SStephen Hurd 	void *uniq, int cpu, int irq, char *name)
72123ac9029SStephen Hurd {
72223ac9029SStephen Hurd 	cpuset_t mask;
723326aacb0SStephen Hurd 	int i, qid, error;
72423ac9029SStephen Hurd 
72523ac9029SStephen Hurd 	qid = -1;
72623ac9029SStephen Hurd 	gtask->gt_uniq = uniq;
72723ac9029SStephen Hurd 	gtask->gt_name = name;
72823ac9029SStephen Hurd 	gtask->gt_irq = irq;
72923ac9029SStephen Hurd 	gtask->gt_cpu = cpu;
73023ac9029SStephen Hurd 	mtx_lock(&qgroup->tqg_lock);
731bd84f700SSean Bruno 	if (tqg_smp_started) {
732ab2e3f79SStephen Hurd 		for (i = 0; i < qgroup->tqg_cnt; i++)
73323ac9029SStephen Hurd 			if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
73423ac9029SStephen Hurd 				qid = i;
73523ac9029SStephen Hurd 				break;
73623ac9029SStephen Hurd 			}
73723ac9029SStephen Hurd 		if (qid == -1) {
73823ac9029SStephen Hurd 			mtx_unlock(&qgroup->tqg_lock);
739326aacb0SStephen Hurd 			printf("%s: qid not found for %s cpu=%d\n", __func__, name, cpu);
74023ac9029SStephen Hurd 			return (EINVAL);
74123ac9029SStephen Hurd 		}
74223ac9029SStephen Hurd 	} else
74323ac9029SStephen Hurd 		qid = 0;
74423ac9029SStephen Hurd 	qgroup->tqg_queue[qid].tgc_cnt++;
74523ac9029SStephen Hurd 	LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
74623ac9029SStephen Hurd 	gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
74712d1b8c9SSean Bruno 	cpu = qgroup->tqg_queue[qid].tgc_cpu;
74812d1b8c9SSean Bruno 	mtx_unlock(&qgroup->tqg_lock);
74912d1b8c9SSean Bruno 
75023ac9029SStephen Hurd 	CPU_ZERO(&mask);
75112d1b8c9SSean Bruno 	CPU_SET(cpu, &mask);
752326aacb0SStephen Hurd 	if (irq != -1 && tqg_smp_started) {
753326aacb0SStephen Hurd 		error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask);
754326aacb0SStephen Hurd 		if (error)
755326aacb0SStephen Hurd 			printf("%s: setaffinity failed: %d\n", __func__, error);
756326aacb0SStephen Hurd 	}
75712d1b8c9SSean Bruno 	return (0);
75812d1b8c9SSean Bruno }
75912d1b8c9SSean Bruno 
76012d1b8c9SSean Bruno static int
76112d1b8c9SSean Bruno taskqgroup_attach_cpu_deferred(struct taskqgroup *qgroup, struct grouptask *gtask)
76212d1b8c9SSean Bruno {
76312d1b8c9SSean Bruno 	cpuset_t mask;
764326aacb0SStephen Hurd 	int i, qid, irq, cpu, error;
76512d1b8c9SSean Bruno 
76612d1b8c9SSean Bruno 	qid = -1;
76712d1b8c9SSean Bruno 	irq = gtask->gt_irq;
76812d1b8c9SSean Bruno 	cpu = gtask->gt_cpu;
769bd84f700SSean Bruno 	MPASS(tqg_smp_started);
77012d1b8c9SSean Bruno 	mtx_lock(&qgroup->tqg_lock);
77112d1b8c9SSean Bruno 	for (i = 0; i < qgroup->tqg_cnt; i++)
77212d1b8c9SSean Bruno 		if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
77312d1b8c9SSean Bruno 			qid = i;
77412d1b8c9SSean Bruno 			break;
77512d1b8c9SSean Bruno 		}
77612d1b8c9SSean Bruno 	if (qid == -1) {
77723ac9029SStephen Hurd 		mtx_unlock(&qgroup->tqg_lock);
778326aacb0SStephen Hurd 		printf("%s: qid not found for %s cpu=%d\n", __func__, name, cpu);
77912d1b8c9SSean Bruno 		return (EINVAL);
78012d1b8c9SSean Bruno 	}
78112d1b8c9SSean Bruno 	qgroup->tqg_queue[qid].tgc_cnt++;
78212d1b8c9SSean Bruno 	LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
783abf38392SSean Bruno 	MPASS(qgroup->tqg_queue[qid].tgc_taskq != NULL);
78412d1b8c9SSean Bruno 	gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
78512d1b8c9SSean Bruno 	mtx_unlock(&qgroup->tqg_lock);
78612d1b8c9SSean Bruno 
78712d1b8c9SSean Bruno 	CPU_ZERO(&mask);
78812d1b8c9SSean Bruno 	CPU_SET(cpu, &mask);
78912d1b8c9SSean Bruno 
790326aacb0SStephen Hurd 	if (irq != -1) {
791326aacb0SStephen Hurd 		error = intr_setaffinity(irq, CPU_WHICH_IRQ, &mask);
792326aacb0SStephen Hurd 		if (error)
793326aacb0SStephen Hurd 			printf("%s: setaffinity failed: %d\n", __func__, error);
794326aacb0SStephen Hurd 	}
79523ac9029SStephen Hurd 	return (0);
79623ac9029SStephen Hurd }
79723ac9029SStephen Hurd 
79823ac9029SStephen Hurd void
79923ac9029SStephen Hurd taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
80023ac9029SStephen Hurd {
80123ac9029SStephen Hurd 	int i;
80223ac9029SStephen Hurd 
80323ac9029SStephen Hurd 	mtx_lock(&qgroup->tqg_lock);
80423ac9029SStephen Hurd 	for (i = 0; i < qgroup->tqg_cnt; i++)
80523ac9029SStephen Hurd 		if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
80623ac9029SStephen Hurd 			break;
80723ac9029SStephen Hurd 	if (i == qgroup->tqg_cnt)
80823ac9029SStephen Hurd 		panic("taskqgroup_detach: task not in group\n");
80923ac9029SStephen Hurd 	qgroup->tqg_queue[i].tgc_cnt--;
81023ac9029SStephen Hurd 	LIST_REMOVE(gtask, gt_list);
81123ac9029SStephen Hurd 	mtx_unlock(&qgroup->tqg_lock);
81223ac9029SStephen Hurd 	gtask->gt_taskqueue = NULL;
81323ac9029SStephen Hurd }
81423ac9029SStephen Hurd 
81523ac9029SStephen Hurd static void
81623ac9029SStephen Hurd taskqgroup_binder(void *ctx)
81723ac9029SStephen Hurd {
81823ac9029SStephen Hurd 	struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx;
81923ac9029SStephen Hurd 	cpuset_t mask;
82023ac9029SStephen Hurd 	int error;
82123ac9029SStephen Hurd 
82223ac9029SStephen Hurd 	CPU_ZERO(&mask);
82323ac9029SStephen Hurd 	CPU_SET(gtask->bt_cpuid, &mask);
82423ac9029SStephen Hurd 	error = cpuset_setthread(curthread->td_tid, &mask);
82523ac9029SStephen Hurd 	thread_lock(curthread);
82623ac9029SStephen Hurd 	sched_bind(curthread, gtask->bt_cpuid);
82723ac9029SStephen Hurd 	thread_unlock(curthread);
82823ac9029SStephen Hurd 
82923ac9029SStephen Hurd 	if (error)
830326aacb0SStephen Hurd 		printf("%s: setaffinity failed: %d\n", __func__,
83123ac9029SStephen Hurd 		    error);
83223ac9029SStephen Hurd 	free(gtask, M_DEVBUF);
833d300df01SStephen Hurd }
834d300df01SStephen Hurd 
83523ac9029SStephen Hurd static void
83623ac9029SStephen Hurd taskqgroup_bind(struct taskqgroup *qgroup)
83723ac9029SStephen Hurd {
83823ac9029SStephen Hurd 	struct taskq_bind_task *gtask;
83923ac9029SStephen Hurd 	int i;
84023ac9029SStephen Hurd 
84123ac9029SStephen Hurd 	/*
84223ac9029SStephen Hurd 	 * Bind taskqueue threads to specific CPUs, if they have been assigned
84323ac9029SStephen Hurd 	 * one.
84423ac9029SStephen Hurd 	 */
845026204b4SSean Bruno 	if (qgroup->tqg_cnt == 1)
846026204b4SSean Bruno 		return;
847026204b4SSean Bruno 
84823ac9029SStephen Hurd 	for (i = 0; i < qgroup->tqg_cnt; i++) {
8491ee17b07SSean Bruno 		gtask = malloc(sizeof (*gtask), M_DEVBUF, M_WAITOK);
85023ac9029SStephen Hurd 		GTASK_INIT(&gtask->bt_task, 0, 0, taskqgroup_binder, gtask);
85123ac9029SStephen Hurd 		gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
85223ac9029SStephen Hurd 		grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq,
85323ac9029SStephen Hurd 		    &gtask->bt_task);
85423ac9029SStephen Hurd 	}
85523ac9029SStephen Hurd }
85623ac9029SStephen Hurd 
85723ac9029SStephen Hurd static int
858ab2e3f79SStephen Hurd _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
85923ac9029SStephen Hurd {
86023ac9029SStephen Hurd 	LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL);
86123ac9029SStephen Hurd 	struct grouptask *gtask;
86212d1b8c9SSean Bruno 	int i, k, old_cnt, old_cpu, cpu;
86323ac9029SStephen Hurd 
86423ac9029SStephen Hurd 	mtx_assert(&qgroup->tqg_lock, MA_OWNED);
86523ac9029SStephen Hurd 
866bd84f700SSean Bruno 	if (cnt < 1 || cnt * stride > mp_ncpus || !tqg_smp_started) {
867bd84f700SSean Bruno 		printf("%s: failed cnt: %d stride: %d "
86806bb7c50SSean Bruno 		    "mp_ncpus: %d tqg_smp_started: %d\n",
86906bb7c50SSean Bruno 		    __func__, cnt, stride, mp_ncpus, tqg_smp_started);
87023ac9029SStephen Hurd 		return (EINVAL);
87123ac9029SStephen Hurd 	}
87223ac9029SStephen Hurd 	if (qgroup->tqg_adjusting) {
873326aacb0SStephen Hurd 		printf("%s failed: adjusting\n", __func__);
87423ac9029SStephen Hurd 		return (EBUSY);
87523ac9029SStephen Hurd 	}
87623ac9029SStephen Hurd 	qgroup->tqg_adjusting = 1;
87723ac9029SStephen Hurd 	old_cnt = qgroup->tqg_cnt;
87812d1b8c9SSean Bruno 	old_cpu = 0;
879ab2e3f79SStephen Hurd 	if (old_cnt < cnt)
880ab2e3f79SStephen Hurd 		old_cpu = qgroup->tqg_queue[old_cnt].tgc_cpu;
88123ac9029SStephen Hurd 	mtx_unlock(&qgroup->tqg_lock);
88223ac9029SStephen Hurd 	/*
88323ac9029SStephen Hurd 	 * Set up queue for tasks added before boot.
88423ac9029SStephen Hurd 	 */
88523ac9029SStephen Hurd 	if (old_cnt == 0) {
88623ac9029SStephen Hurd 		LIST_SWAP(&gtask_head, &qgroup->tqg_queue[0].tgc_tasks,
88723ac9029SStephen Hurd 		    grouptask, gt_list);
88823ac9029SStephen Hurd 		qgroup->tqg_queue[0].tgc_cnt = 0;
88923ac9029SStephen Hurd 	}
89023ac9029SStephen Hurd 
89123ac9029SStephen Hurd 	/*
89223ac9029SStephen Hurd 	 * If new taskq threads have been added.
89323ac9029SStephen Hurd 	 */
89412d1b8c9SSean Bruno 	cpu = old_cpu;
89512d1b8c9SSean Bruno 	for (i = old_cnt; i < cnt; i++) {
896ab2e3f79SStephen Hurd 		taskqgroup_cpu_create(qgroup, i, cpu);
897abf38392SSean Bruno 
898abf38392SSean Bruno 		for (k = 0; k < stride; k++)
899abf38392SSean Bruno 			cpu = CPU_NEXT(cpu);
90012d1b8c9SSean Bruno 	}
90123ac9029SStephen Hurd 	mtx_lock(&qgroup->tqg_lock);
90223ac9029SStephen Hurd 	qgroup->tqg_cnt = cnt;
90323ac9029SStephen Hurd 	qgroup->tqg_stride = stride;
90423ac9029SStephen Hurd 
90523ac9029SStephen Hurd 	/*
90623ac9029SStephen Hurd 	 * Adjust drivers to use new taskqs.
90723ac9029SStephen Hurd 	 */
90823ac9029SStephen Hurd 	for (i = 0; i < old_cnt; i++) {
90923ac9029SStephen Hurd 		while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) {
91023ac9029SStephen Hurd 			LIST_REMOVE(gtask, gt_list);
91123ac9029SStephen Hurd 			qgroup->tqg_queue[i].tgc_cnt--;
91223ac9029SStephen Hurd 			LIST_INSERT_HEAD(&gtask_head, gtask, gt_list);
91323ac9029SStephen Hurd 		}
91423ac9029SStephen Hurd 	}
91512d1b8c9SSean Bruno 	mtx_unlock(&qgroup->tqg_lock);
91612d1b8c9SSean Bruno 
91723ac9029SStephen Hurd 	while ((gtask = LIST_FIRST(&gtask_head))) {
91823ac9029SStephen Hurd 		LIST_REMOVE(gtask, gt_list);
91923ac9029SStephen Hurd 		if (gtask->gt_cpu == -1)
92012d1b8c9SSean Bruno 			taskqgroup_attach_deferred(qgroup, gtask);
92112d1b8c9SSean Bruno 		else if (taskqgroup_attach_cpu_deferred(qgroup, gtask))
92212d1b8c9SSean Bruno 			taskqgroup_attach_deferred(qgroup, gtask);
92323ac9029SStephen Hurd 	}
92423ac9029SStephen Hurd 
925abf38392SSean Bruno #ifdef INVARIANTS
926abf38392SSean Bruno 	mtx_lock(&qgroup->tqg_lock);
927abf38392SSean Bruno 	for (i = 0; i < qgroup->tqg_cnt; i++) {
928abf38392SSean Bruno 		MPASS(qgroup->tqg_queue[i].tgc_taskq != NULL);
929abf38392SSean Bruno 		LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list)
930abf38392SSean Bruno 			MPASS(gtask->gt_taskqueue != NULL);
931abf38392SSean Bruno 	}
932abf38392SSean Bruno 	mtx_unlock(&qgroup->tqg_lock);
933abf38392SSean Bruno #endif
93423ac9029SStephen Hurd 	/*
93523ac9029SStephen Hurd 	 * If taskq thread count has been reduced.
93623ac9029SStephen Hurd 	 */
93723ac9029SStephen Hurd 	for (i = cnt; i < old_cnt; i++)
93823ac9029SStephen Hurd 		taskqgroup_cpu_remove(qgroup, i);
93923ac9029SStephen Hurd 
9401ee17b07SSean Bruno 	taskqgroup_bind(qgroup);
9411ee17b07SSean Bruno 
94223ac9029SStephen Hurd 	mtx_lock(&qgroup->tqg_lock);
94323ac9029SStephen Hurd 	qgroup->tqg_adjusting = 0;
94423ac9029SStephen Hurd 
94523ac9029SStephen Hurd 	return (0);
94623ac9029SStephen Hurd }
94723ac9029SStephen Hurd 
94823ac9029SStephen Hurd int
949ab2e3f79SStephen Hurd taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
95023ac9029SStephen Hurd {
95123ac9029SStephen Hurd 	int error;
95223ac9029SStephen Hurd 
95323ac9029SStephen Hurd 	mtx_lock(&qgroup->tqg_lock);
954ab2e3f79SStephen Hurd 	error = _taskqgroup_adjust(qgroup, cnt, stride);
95523ac9029SStephen Hurd 	mtx_unlock(&qgroup->tqg_lock);
95623ac9029SStephen Hurd 
95723ac9029SStephen Hurd 	return (error);
95823ac9029SStephen Hurd }
95923ac9029SStephen Hurd 
96023ac9029SStephen Hurd struct taskqgroup *
96123ac9029SStephen Hurd taskqgroup_create(char *name)
96223ac9029SStephen Hurd {
96323ac9029SStephen Hurd 	struct taskqgroup *qgroup;
96423ac9029SStephen Hurd 
96523ac9029SStephen Hurd 	qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO);
96623ac9029SStephen Hurd 	mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF);
96723ac9029SStephen Hurd 	qgroup->tqg_name = name;
96823ac9029SStephen Hurd 	LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks);
969ab2e3f79SStephen Hurd 
97023ac9029SStephen Hurd 	return (qgroup);
97123ac9029SStephen Hurd }
97223ac9029SStephen Hurd 
97323ac9029SStephen Hurd void
97423ac9029SStephen Hurd taskqgroup_destroy(struct taskqgroup *qgroup)
97523ac9029SStephen Hurd {
97623ac9029SStephen Hurd 
97723ac9029SStephen Hurd }
978