xref: /dragonfly/sys/kern/subr_gtaskqueue.c (revision 8f2ce533)
1 /*-
2  * Copyright (c) 2000 Doug Rabson
3  * Copyright (c) 2014 Jeff Roberson
4  * Copyright (c) 2016 Matthew Macy
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  */
28 
29 #include <sys/cdefs.h>
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/bus.h>
33 #include <sys/cpumask.h>
34 #include <sys/kernel.h>
35 #include <sys/libkern.h>
36 #include <sys/limits.h>
37 #include <sys/lock.h>
38 #include <sys/malloc.h>
39 #include <sys/proc.h>
40 #include <sys/sched.h>
41 #include <sys/gtaskqueue.h>
42 #include <sys/unistd.h>
43 #include <machine/stdarg.h>
44 
45 static MALLOC_DEFINE(M_GTASKQUEUE, "gtaskqueue", "Group Task Queues");
46 static void	gtaskqueue_thread_enqueue(void *);
47 static void	gtaskqueue_thread_loop(void *arg);
48 static int	task_is_running(struct gtaskqueue *queue, struct gtask *gtask);
49 static void	gtaskqueue_drain_locked(struct gtaskqueue *queue, struct gtask *gtask);
50 
51 TASKQGROUP_DEFINE(softirq, ncpus, 1);
52 
53 struct gtaskqueue_busy {
54 	struct gtask		*tb_running;
55 	u_int			 tb_seq;
56 	LIST_ENTRY(gtaskqueue_busy) tb_link;
57 };
58 
59 typedef void (*gtaskqueue_enqueue_fn)(void *context);
60 
61 struct gtaskqueue {
62 	STAILQ_HEAD(, gtask)	tq_queue;
63 	LIST_HEAD(, gtaskqueue_busy) tq_active;
64 	u_int			tq_seq;
65 	int			tq_callouts;
66 	struct lock		tq_lock;
67 	gtaskqueue_enqueue_fn	tq_enqueue;
68 	void			*tq_context;
69 	const char		*tq_name;
70 	struct thread		**tq_threads;
71 	int			tq_tcount;
72 	int			tq_flags;
73 #if 0
74 	taskqueue_callback_fn	tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
75 	void			*tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
76 #endif
77 };
78 
79 #define	TQ_FLAGS_ACTIVE		(1 << 0)
80 #define	TQ_FLAGS_BLOCKED	(1 << 1)
81 #define	TQ_FLAGS_UNLOCKED_ENQUEUE	(1 << 2)
82 
83 #define	DT_CALLOUT_ARMED	(1 << 0)
84 
85 #define	TQ_LOCK(tq)		lockmgr(&(tq)->tq_lock, LK_EXCLUSIVE)
86 #define	TQ_ASSERT_LOCKED(tq)	KKASSERT(lockstatus(&(tq)->tq_lock, NULL) != 0)
87 #define	TQ_UNLOCK(tq)		lockmgr(&(tq)->tq_lock, LK_RELEASE);
88 #define	TQ_ASSERT_UNLOCKED(tq)	KKASSERT(lockstatus(&(tq)->tq_lock) == 0)
89 
90 #ifdef INVARIANTS
91 static void
92 gtask_dump(struct gtask *gtask)
93 {
94 	kprintf("gtask: %p ta_flags=%x ta_priority=%d ta_func=%p "
95 		"ta_context=%p\n",
96 		gtask, gtask->ta_flags, gtask->ta_priority,
97 		gtask->ta_func, gtask->ta_context);
98 }
99 #endif
100 
101 static __inline int
102 TQ_SLEEP(struct gtaskqueue *tq, void *p, const char *wm)
103 {
104 	return (lksleep(p, &tq->tq_lock, 0, wm, 0));
105 }
106 
107 static struct gtaskqueue *
108 _gtaskqueue_create(const char *name, int mflags,
109 		 taskqueue_enqueue_fn enqueue, void *context,
110 		 int lkflags, const char *mtxname __unused)
111 {
112 	struct gtaskqueue *queue;
113 
114 	queue = kmalloc(sizeof(struct gtaskqueue),
115 			M_GTASKQUEUE, mflags | M_ZERO);
116 	if (!queue) {
117 		kprintf("_gtaskqueue_create: kmalloc failed %08x\n", mflags);
118 		return (NULL);
119 	}
120 
121 	STAILQ_INIT(&queue->tq_queue);
122 	LIST_INIT(&queue->tq_active);
123 	queue->tq_enqueue = enqueue;
124 	queue->tq_context = context;
125 	queue->tq_name = name ? name : "taskqueue";
126 	queue->tq_flags |= TQ_FLAGS_ACTIVE;
127 	if (enqueue == gtaskqueue_thread_enqueue)
128 		queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
129 	lockinit(&queue->tq_lock, queue->tq_name, 0, 0);
130 
131 	return (queue);
132 }
133 
134 /*
135  * Signal a taskqueue thread to terminate.
136  */
137 static void
138 gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq)
139 {
140 
141 	while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
142 		wakeup(tq);
143 		TQ_SLEEP(tq, pp, "gtq_destroy");
144 	}
145 }
146 
147 static void __unused
148 gtaskqueue_free(struct gtaskqueue *queue)
149 {
150 
151 	TQ_LOCK(queue);
152 	queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
153 	gtaskqueue_terminate(queue->tq_threads, queue);
154 	KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?"));
155 	KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
156 	lockuninit(&queue->tq_lock);
157 	kfree(queue->tq_threads, M_GTASKQUEUE);
158 	/*kfree(queue->tq_name, M_GTASKQUEUE);*/
159 	kfree(queue, M_GTASKQUEUE);
160 }
161 
162 /*
163  * Wait for all to complete, then prevent it from being enqueued
164  */
165 void
166 grouptask_block(struct grouptask *grouptask)
167 {
168 	struct gtaskqueue *queue = grouptask->gt_taskqueue;
169 	struct gtask *gtask = &grouptask->gt_task;
170 
171 #ifdef INVARIANTS
172 	if (queue == NULL) {
173 		gtask_dump(gtask);
174 		panic("queue == NULL");
175 	}
176 #endif
177 	TQ_LOCK(queue);
178 	gtask->ta_flags |= TASK_NOENQUEUE;
179 	gtaskqueue_drain_locked(queue, gtask);
180 	TQ_UNLOCK(queue);
181 }
182 
183 void
184 grouptask_unblock(struct grouptask *grouptask)
185 {
186 	struct gtaskqueue *queue = grouptask->gt_taskqueue;
187 	struct gtask *gtask = &grouptask->gt_task;
188 
189 #ifdef INVARIANTS
190 	if (queue == NULL) {
191 		gtask_dump(gtask);
192 		panic("queue == NULL");
193 	}
194 #endif
195 	TQ_LOCK(queue);
196 	gtask->ta_flags &= ~TASK_NOENQUEUE;
197 	TQ_UNLOCK(queue);
198 }
199 
200 int
201 grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask)
202 {
203 #ifdef INVARIANTS
204 	if (queue == NULL) {
205 		gtask_dump(gtask);
206 		panic("queue == NULL");
207 	}
208 #endif
209 	TQ_LOCK(queue);
210 	if (gtask->ta_flags & TASK_ENQUEUED) {
211 		TQ_UNLOCK(queue);
212 		return (0);
213 	}
214 	if (gtask->ta_flags & TASK_NOENQUEUE) {
215 		TQ_UNLOCK(queue);
216 		return (EAGAIN);
217 	}
218 	STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link);
219 	gtask->ta_flags |= TASK_ENQUEUED;
220 	TQ_UNLOCK(queue);
221 	if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
222 		queue->tq_enqueue(queue->tq_context);
223 	return (0);
224 }
225 
226 static void
227 gtaskqueue_task_nop_fn(void *context)
228 {
229 }
230 
231 /*
232  * Block until all currently queued tasks in this taskqueue
233  * have begun execution.  Tasks queued during execution of
234  * this function are ignored.
235  */
236 static void
237 gtaskqueue_drain_tq_queue(struct gtaskqueue *queue)
238 {
239 	struct gtask t_barrier;
240 
241 	if (STAILQ_EMPTY(&queue->tq_queue))
242 		return;
243 
244 	/*
245 	 * Enqueue our barrier after all current tasks, but with
246 	 * the highest priority so that newly queued tasks cannot
247 	 * pass it.  Because of the high priority, we can not use
248 	 * taskqueue_enqueue_locked directly (which drops the lock
249 	 * anyway) so just insert it at tail while we have the
250 	 * queue lock.
251 	 */
252 	GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier);
253 	STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
254 	t_barrier.ta_flags |= TASK_ENQUEUED;
255 
256 	/*
257 	 * Once the barrier has executed, all previously queued tasks
258 	 * have completed or are currently executing.
259 	 */
260 	while (t_barrier.ta_flags & TASK_ENQUEUED)
261 		TQ_SLEEP(queue, &t_barrier, "gtq_qdrain");
262 }
263 
264 /*
265  * Block until all currently executing tasks for this taskqueue
266  * complete.  Tasks that begin execution during the execution
267  * of this function are ignored.
268  */
269 static void
270 gtaskqueue_drain_tq_active(struct gtaskqueue *queue)
271 {
272 	struct gtaskqueue_busy *tb;
273 	u_int seq;
274 
275 	if (LIST_EMPTY(&queue->tq_active))
276 		return;
277 
278 	/* Block taskq_terminate().*/
279 	queue->tq_callouts++;
280 
281 	/* Wait for any active task with sequence from the past. */
282 	seq = queue->tq_seq;
283 restart:
284 	LIST_FOREACH(tb, &queue->tq_active, tb_link) {
285 		if ((int)(tb->tb_seq - seq) <= 0) {
286 			TQ_SLEEP(queue, tb->tb_running, "gtq_adrain");
287 			goto restart;
288 		}
289 	}
290 
291 	/* Release taskqueue_terminate(). */
292 	queue->tq_callouts--;
293 	if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
294 		wakeup_one(queue->tq_threads);
295 }
296 
297 void
298 gtaskqueue_block(struct gtaskqueue *queue)
299 {
300 
301 	TQ_LOCK(queue);
302 	queue->tq_flags |= TQ_FLAGS_BLOCKED;
303 	TQ_UNLOCK(queue);
304 }
305 
306 void
307 gtaskqueue_unblock(struct gtaskqueue *queue)
308 {
309 
310 	TQ_LOCK(queue);
311 	queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
312 	if (!STAILQ_EMPTY(&queue->tq_queue))
313 		queue->tq_enqueue(queue->tq_context);
314 	TQ_UNLOCK(queue);
315 }
316 
317 static void
318 gtaskqueue_run_locked(struct gtaskqueue *queue)
319 {
320 	struct gtaskqueue_busy tb;
321 	struct gtask *gtask;
322 #if 0
323 	struct epoch_tracker et;
324 	bool in_net_epoch;
325 #endif
326 
327 	KASSERT(queue != NULL, ("tq is NULL"));
328 	TQ_ASSERT_LOCKED(queue);
329 	tb.tb_running = NULL;
330 	LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link);
331 #if 0
332 	in_net_epoch = false;
333 #endif
334 
335 	while ((gtask = STAILQ_FIRST(&queue->tq_queue)) != NULL) {
336 		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
337 		gtask->ta_flags &= ~TASK_ENQUEUED;
338 		tb.tb_running = gtask;
339 		tb.tb_seq = ++queue->tq_seq;
340 		TQ_UNLOCK(queue);
341 
342 		KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL"));
343 #if 0
344 		if (!in_net_epoch && TASK_IS_NET(gtask)) {
345 			in_net_epoch = true;
346 			NET_EPOCH_ENTER(et);
347 		} else if (in_net_epoch && !TASK_IS_NET(gtask)) {
348 			NET_EPOCH_EXIT(et);
349 			in_net_epoch = false;
350 		}
351 #endif
352 		gtask->ta_func(gtask->ta_context);
353 
354 		TQ_LOCK(queue);
355 		wakeup(gtask);
356 	}
357 #if 0
358 	if (in_net_epoch)
359 		NET_EPOCH_EXIT(et);
360 #endif
361 	LIST_REMOVE(&tb, tb_link);
362 }
363 
364 static int
365 task_is_running(struct gtaskqueue *queue, struct gtask *gtask)
366 {
367 	struct gtaskqueue_busy *tb;
368 
369 	TQ_ASSERT_LOCKED(queue);
370 	LIST_FOREACH(tb, &queue->tq_active, tb_link) {
371 		if (tb->tb_running == gtask)
372 			return (1);
373 	}
374 	return (0);
375 }
376 
377 static int
378 gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask)
379 {
380 
381 	if (gtask->ta_flags & TASK_ENQUEUED)
382 		STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link);
383 	gtask->ta_flags &= ~TASK_ENQUEUED;
384 	return (task_is_running(queue, gtask) ? EBUSY : 0);
385 }
386 
387 int
388 gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask)
389 {
390 	int error;
391 
392 	TQ_LOCK(queue);
393 	error = gtaskqueue_cancel_locked(queue, gtask);
394 	TQ_UNLOCK(queue);
395 
396 	return (error);
397 }
398 
399 static void
400 gtaskqueue_drain_locked(struct gtaskqueue *queue, struct gtask *gtask)
401 {
402 	while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask))
403 		TQ_SLEEP(queue, gtask, "gtq_drain");
404 }
405 
406 void
407 gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask)
408 {
409 	TQ_LOCK(queue);
410 	gtaskqueue_drain_locked(queue, gtask);
411 	TQ_UNLOCK(queue);
412 }
413 
414 void
415 gtaskqueue_drain_all(struct gtaskqueue *queue)
416 {
417 
418 	TQ_LOCK(queue);
419 	gtaskqueue_drain_tq_queue(queue);
420 	gtaskqueue_drain_tq_active(queue);
421 	TQ_UNLOCK(queue);
422 }
423 
424 static int __printflike(4, 0)
425 _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
426 			  const char *name, __va_list ap)
427 {
428 	char ktname[MAXCOMLEN + 1];
429 	struct thread *td;
430 	struct gtaskqueue *tq;
431 	int i, error;
432 
433 	if (count <= 0)
434 		return (EINVAL);
435 
436 	kvsnprintf(ktname, sizeof(ktname), name, ap);
437 	tq = *tqp;
438 
439 	tq->tq_threads = kmalloc(sizeof(struct thread *) * count,
440 				 M_GTASKQUEUE, M_WAITOK | M_ZERO);
441 
442 	for (i = 0; i < count; i++) {
443 		int cpu = i % ncpus;
444 		if (count == 1) {
445 			error = lwkt_create(gtaskqueue_thread_loop, tqp,
446 					    &tq->tq_threads[i], NULL,
447 					    TDF_NOSTART, cpu,
448 					    "%s", ktname);
449 		} else {
450 			error = lwkt_create(gtaskqueue_thread_loop, tqp,
451 					    &tq->tq_threads[i], NULL,
452 					    TDF_NOSTART, cpu,
453 					    "%s_%d", ktname, i);
454 		}
455 		if (error) {
456 			/* should be ok to continue, taskqueue_free will dtrt */
457 			kprintf("%s: lwkt_create(%s): error %d",
458 				__func__, ktname, error);
459 			tq->tq_threads[i] = NULL;		/* paranoid */
460 		} else
461 			tq->tq_tcount++;
462 	}
463 	for (i = 0; i < count; i++) {
464 		if (tq->tq_threads[i] == NULL)
465 			continue;
466 		td = tq->tq_threads[i];
467 		lwkt_setpri_initial(td, pri);
468 		lwkt_schedule(td);
469 	}
470 
471 	return (0);
472 }
473 
474 static int __printflike(4, 5)
475 gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
476 			 const char *name, ...)
477 {
478 	__va_list ap;
479 	int error;
480 
481 	__va_start(ap, name);
482 	error = _gtaskqueue_start_threads(tqp, count, pri, name, ap);
483 	__va_end(ap);
484 	return (error);
485 }
486 
487 #if 0
488 static inline void
489 gtaskqueue_run_callback(struct gtaskqueue *tq,
490     enum taskqueue_callback_type cb_type)
491 {
492 	taskqueue_callback_fn tq_callback;
493 
494 	TQ_ASSERT_UNLOCKED(tq);
495 	tq_callback = tq->tq_callbacks[cb_type];
496 	if (tq_callback != NULL)
497 		tq_callback(tq->tq_cb_contexts[cb_type]);
498 }
499 #endif
500 
501 static void
502 gtaskqueue_thread_loop(void *arg)
503 {
504 	struct gtaskqueue **tqp, *tq;
505 
506 	tqp = arg;
507 	tq = *tqp;
508 #if 0
509 	gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
510 #endif
511 	TQ_LOCK(tq);
512 	while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
513 		/* XXX ? */
514 		gtaskqueue_run_locked(tq);
515 		/*
516 		 * Because taskqueue_run() can drop tq_mutex, we need to
517 		 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
518 		 * meantime, which means we missed a wakeup.
519 		 */
520 		if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
521 			break;
522 		TQ_SLEEP(tq, tq, "-");
523 	}
524 	gtaskqueue_run_locked(tq);
525 	/*
526 	 * This thread is on its way out, so just drop the lock temporarily
527 	 * in order to call the shutdown callback.  This allows the callback
528 	 * to look at the taskqueue, even just before it dies.
529 	 */
530 #if 0
531 	TQ_UNLOCK(tq);
532 	gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
533 	TQ_LOCK(tq);
534 #endif
535 
536 	/* rendezvous with thread that asked us to terminate */
537 	tq->tq_tcount--;
538 	wakeup_one(tq->tq_threads);
539 	TQ_UNLOCK(tq);
540 	lwkt_exit();
541 }
542 
543 static void
544 gtaskqueue_thread_enqueue(void *context)
545 {
546 	struct gtaskqueue **tqp, *tq;
547 
548 	tqp = context;
549 	tq = *tqp;
550 	wakeup_one(tq);
551 }
552 
553 /*
554  * NOTE: FreeBSD uses MTX_SPIN locks, which doesn't make a whole lot
555  *	 of sense (over-use of spin-locks in general). In DFly we
556  *	 want to use blockable locks for almost everything.
557  */
558 static struct gtaskqueue *
559 gtaskqueue_create_fast(const char *name, int mflags,
560 		       taskqueue_enqueue_fn enqueue, void *context)
561 {
562 	return _gtaskqueue_create(name, mflags, enqueue, context,
563 				  0, "fast_taskqueue");
564 }
565 
566 struct taskqgroup_cpu {
567 	LIST_HEAD(, grouptask) tgc_tasks;
568 	struct gtaskqueue *tgc_taskq;
569 	int		tgc_cnt;
570 	int		tgc_cpu;
571 };
572 
573 struct taskqgroup {
574 	struct taskqgroup_cpu tqg_queue[MAXCPU];
575 	struct lock	tqg_lock;
576 	const char *	tqg_name;
577 	int		tqg_cnt;
578 };
579 
580 struct taskq_bind_task {
581 	struct gtask bt_task;
582 	int	bt_cpuid;
583 };
584 
585 static void
586 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx, int cpu)
587 {
588 	struct taskqgroup_cpu *qcpu;
589 
590 	qcpu = &qgroup->tqg_queue[idx];
591 	LIST_INIT(&qcpu->tgc_tasks);
592 	qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK,
593 						 gtaskqueue_thread_enqueue,
594 						 &qcpu->tgc_taskq);
595 	gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, TDPRI_KERN_DAEMON,
596 				 "%s_%d", qgroup->tqg_name, idx);
597 	qcpu->tgc_cpu = cpu;
598 }
599 
600 /*
601  * Find the taskq with least # of tasks that doesn't currently have any
602  * other queues from the uniq identifier.
603  */
604 static int
605 taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
606 {
607 	struct grouptask *n;
608 	int i, idx, mincnt;
609 	int strict;
610 
611 	KKASSERT(lockstatus(&qgroup->tqg_lock, NULL) != 0);
612 	KASSERT(qgroup->tqg_cnt != 0,
613 	    ("qgroup %s has no queues", qgroup->tqg_name));
614 
615 	/*
616 	 * Two passes: first scan for a queue with the least tasks that
617 	 * does not already service this uniq id.  If that fails simply find
618 	 * the queue with the least total tasks.
619 	 */
620 	for (idx = -1, mincnt = INT_MAX, strict = 1; mincnt == INT_MAX;
621 	    strict = 0) {
622 		for (i = 0; i < qgroup->tqg_cnt; i++) {
623 			if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
624 				continue;
625 			if (strict) {
626 				LIST_FOREACH(n, &qgroup->tqg_queue[i].tgc_tasks,
627 				    gt_list)
628 					if (n->gt_uniq == uniq)
629 						break;
630 				if (n != NULL)
631 					continue;
632 			}
633 			mincnt = qgroup->tqg_queue[i].tgc_cnt;
634 			idx = i;
635 		}
636 	}
637 	if (idx == -1)
638 		panic("%s: failed to pick a qid.", __func__);
639 
640 	return (idx);
641 }
642 
643 void
644 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
645     void *uniq, device_t dev, struct resource *irq, const char *name)
646 {
647 	int cpu, qid, error;
648 
649 	KASSERT(qgroup->tqg_cnt > 0,
650 	    ("qgroup %s has no queues", qgroup->tqg_name));
651 
652 	gtask->gt_uniq = uniq;
653 	ksnprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
654 	gtask->gt_dev = dev;
655 	gtask->gt_irq = irq;
656 	gtask->gt_cpu = -1;
657 	lockmgr(&qgroup->tqg_lock, LK_EXCLUSIVE);
658 	qid = taskqgroup_find(qgroup, uniq);
659 	qgroup->tqg_queue[qid].tgc_cnt++;
660 	LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
661 	gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
662 	if (dev != NULL && irq != NULL) {
663 		cpu = qgroup->tqg_queue[qid].tgc_cpu;
664 		gtask->gt_cpu = cpu;
665 		lockmgr(&qgroup->tqg_lock, LK_RELEASE);
666 #if 0
667 		/*
668 		 * XXX FreeBSD created a mess by separating out the cpu
669 		 * binding from bus_setup_intr().  Punt for now.
670 		 */
671 		error = bus_bind_intr(dev, irq, cpu);
672 #endif
673 		error = 0;
674 
675 		if (error)
676 			kprintf("%s: binding interrupt failed for %s: %d\n",
677 			    __func__, gtask->gt_name, error);
678 	} else {
679 		lockmgr(&qgroup->tqg_lock, LK_RELEASE);
680 	}
681 }
682 
683 int
684 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
685     void *uniq, int cpu, device_t dev, struct resource *irq, const char *name)
686 {
687 	int i, qid, error;
688 
689 	gtask->gt_uniq = uniq;
690 	ksnprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
691 	gtask->gt_dev = dev;
692 	gtask->gt_irq = irq;
693 	gtask->gt_cpu = cpu;
694 	lockmgr(&qgroup->tqg_lock, LK_EXCLUSIVE);
695 	for (i = 0, qid = -1; i < qgroup->tqg_cnt; i++) {
696 		if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
697 			qid = i;
698 			break;
699 		}
700 	}
701 	if (qid == -1) {
702 		lockmgr(&qgroup->tqg_lock, LK_RELEASE);
703 		kprintf("%s: qid not found for %s cpu=%d\n",
704 			__func__, gtask->gt_name, cpu);
705 		return (EINVAL);
706 	}
707 	qgroup->tqg_queue[qid].tgc_cnt++;
708 	LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
709 	gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
710 	cpu = qgroup->tqg_queue[qid].tgc_cpu;
711 	lockmgr(&qgroup->tqg_lock, LK_RELEASE);
712 
713 	if (dev != NULL && irq != NULL) {
714 #if 0
715 		/*
716 		 * XXX FreeBSD created a mess by separating out the cpu
717 		 * binding from bus_setup_intr().  Punt for now.
718 		 */
719 		error = bus_bind_intr(dev, irq, cpu);
720 #endif
721 		error = 0;
722 
723 		if (error) {
724 			kprintf("%s: binding interrupt failed for %s: %d\n",
725 			    __func__, gtask->gt_name, error);
726 		}
727 	}
728 	return (0);
729 }
730 
731 void
732 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
733 {
734 	int i;
735 
736 	grouptask_block(gtask);
737 	lockmgr(&qgroup->tqg_lock, LK_EXCLUSIVE);
738 	for (i = 0; i < qgroup->tqg_cnt; i++)
739 		if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
740 			break;
741 	if (i == qgroup->tqg_cnt)
742 		panic("%s: task %s not in group", __func__, gtask->gt_name);
743 	qgroup->tqg_queue[i].tgc_cnt--;
744 	LIST_REMOVE(gtask, gt_list);
745 	lockmgr(&qgroup->tqg_lock, LK_RELEASE);
746 	gtask->gt_taskqueue = NULL;
747 	gtask->gt_task.ta_flags &= ~TASK_NOENQUEUE;
748 }
749 
750 static void
751 taskqgroup_binder(void *ctx)
752 {
753 	struct taskq_bind_task *gtask;
754 
755 	gtask = ctx;
756 	lwkt_migratecpu(gtask->bt_cpuid);
757 	kfree(gtask, M_DEVBUF);
758 }
759 
760 void
761 taskqgroup_bind(struct taskqgroup *qgroup)
762 {
763 	struct taskq_bind_task *gtask;
764 	int i;
765 
766 	/*
767 	 * Bind taskqueue threads to specific CPUs, if they have been assigned
768 	 * one.
769 	 */
770 	if (qgroup->tqg_cnt == 1)
771 		return;
772 
773 	for (i = 0; i < qgroup->tqg_cnt; i++) {
774 		gtask = kmalloc(sizeof(*gtask), M_DEVBUF, M_WAITOK);
775 		GTASK_INIT(&gtask->bt_task, 0, 0, taskqgroup_binder, gtask);
776 		gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
777 		grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq,
778 				       &gtask->bt_task);
779 	}
780 }
781 
782 struct taskqgroup *
783 taskqgroup_create(const char *name, int cnt, int stride)
784 {
785 	struct taskqgroup *qgroup;
786 	int cpu, i, j;
787 
788 	qgroup = kmalloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO);
789 	lockinit(&qgroup->tqg_lock, "taskqgroup", 0, 0);
790 	qgroup->tqg_name = name;
791 	qgroup->tqg_cnt = cnt;
792 
793 	for (cpu = i = 0; i < cnt; i++) {
794 		taskqgroup_cpu_create(qgroup, i, cpu);
795 		for (j = 0; j < stride; j++)
796 			cpu = (cpu + 1) % ncpus;
797 	}
798 	return (qgroup);
799 }
800 
801 void
802 taskqgroup_destroy(struct taskqgroup *qgroup)
803 {
804 }
805 
806 void
807 taskqgroup_drain_all(struct taskqgroup *tqg)
808 {
809 	struct gtaskqueue *q;
810 
811 	for (int i = 0; i < ncpus; i++) {
812 		q = tqg->tqg_queue[i].tgc_taskq;
813 		if (q == NULL)
814 			continue;
815 		gtaskqueue_drain_all(q);
816 	}
817 }
818