1 /*- 2 * Copyright (c) 2000 Doug Rabson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD: src/sys/kern/subr_taskqueue.c,v 1.1.2.3 2003/09/10 00:40:39 ken Exp $ 27 * $DragonFly: src/sys/kern/subr_taskqueue.c,v 1.13 2008/06/07 11:44:04 mneumann Exp $ 28 */ 29 30 #include <sys/param.h> 31 #include <sys/queue.h> 32 #include <sys/systm.h> 33 #include <sys/kernel.h> 34 #include <sys/taskqueue.h> 35 #include <sys/interrupt.h> 36 #include <sys/lock.h> 37 #include <sys/malloc.h> 38 #include <sys/kthread.h> 39 #include <sys/thread2.h> 40 #include <sys/spinlock.h> 41 #include <sys/spinlock2.h> 42 #include <sys/serialize.h> 43 #include <sys/proc.h> 44 #include <machine/varargs.h> 45 46 MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 47 48 static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues; 49 static struct lock taskqueue_queues_lock; 50 51 struct taskqueue { 52 STAILQ_ENTRY(taskqueue) tq_link; 53 STAILQ_HEAD(, task) tq_queue; 54 const char *tq_name; 55 taskqueue_enqueue_fn tq_enqueue; 56 void *tq_context; 57 58 struct task *tq_running; 59 struct spinlock tq_lock; 60 struct thread **tq_threads; 61 int tq_tcount; 62 int tq_flags; 63 }; 64 65 #define TQ_FLAGS_ACTIVE (1 << 0) 66 #define TQ_FLAGS_BLOCKED (1 << 1) 67 #define TQ_FLAGS_PENDING (1 << 2) 68 69 static void taskqueue_run(struct taskqueue *queue, int lock_held); 70 71 static __inline void 72 TQ_LOCK_INIT(struct taskqueue *tq) 73 { 74 spin_init(&tq->tq_lock); 75 } 76 77 static __inline void 78 TQ_LOCK_UNINIT(struct taskqueue *tq) 79 { 80 spin_uninit(&tq->tq_lock); 81 } 82 83 static __inline void 84 TQ_LOCK(struct taskqueue *tq) 85 { 86 spin_lock(&tq->tq_lock); 87 } 88 89 static __inline void 90 TQ_UNLOCK(struct taskqueue *tq) 91 { 92 spin_unlock(&tq->tq_lock); 93 } 94 95 static __inline void 96 TQ_SLEEP(struct taskqueue *tq, void *ident, const char *wmesg) 97 { 98 ssleep(ident, &tq->tq_lock, 0, wmesg, 0); 99 } 100 101 struct taskqueue * 102 taskqueue_create(const char *name, int mflags, 103 taskqueue_enqueue_fn enqueue, void *context) 104 { 105 struct taskqueue *queue; 106 107 queue = kmalloc(sizeof(*queue), M_TASKQUEUE, mflags | M_ZERO); 108 if (!queue) 109 return NULL; 110 STAILQ_INIT(&queue->tq_queue); 111 queue->tq_name = name; 112 queue->tq_enqueue = enqueue; 113 queue->tq_context = context; 114 queue->tq_flags |= TQ_FLAGS_ACTIVE; 115 TQ_LOCK_INIT(queue); 116 117 lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE); 118 STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link); 119 lockmgr(&taskqueue_queues_lock, LK_RELEASE); 120 121 return queue; 122 } 123 124 static void 125 taskqueue_terminate(struct thread **pp, struct taskqueue *tq) 126 { 127 while(tq->tq_tcount > 0) { 128 wakeup(tq); 129 TQ_SLEEP(tq, pp, "taskqueue_terminate"); 130 } 131 } 132 133 void 134 taskqueue_free(struct taskqueue *queue) 135 { 136 TQ_LOCK(queue); 137 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 138 taskqueue_run(queue, 1); 139 taskqueue_terminate(queue->tq_threads, queue); 140 TQ_UNLOCK(queue); 141 142 lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE); 143 STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link); 144 lockmgr(&taskqueue_queues_lock, LK_RELEASE); 145 146 TQ_LOCK_UNINIT(queue); 147 148 kfree(queue, M_TASKQUEUE); 149 } 150 151 struct taskqueue * 152 taskqueue_find(const char *name) 153 { 154 struct taskqueue *queue; 155 156 lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE); 157 STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) { 158 if (!strcmp(queue->tq_name, name)) { 159 lockmgr(&taskqueue_queues_lock, LK_RELEASE); 160 return queue; 161 } 162 } 163 lockmgr(&taskqueue_queues_lock, LK_RELEASE); 164 return NULL; 165 } 166 167 /* 168 * NOTE! If using the per-cpu taskqueues ``taskqueue_thread[mycpuid]'', 169 * be sure NOT TO SHARE the ``task'' between CPUs. TASKS ARE NOT LOCKED. 170 * So either use a throwaway task which will only be enqueued once, or 171 * use one task per CPU! 172 */ 173 int 174 taskqueue_enqueue(struct taskqueue *queue, struct task *task) 175 { 176 struct task *ins; 177 struct task *prev; 178 179 TQ_LOCK(queue); 180 181 /* 182 * Don't allow new tasks on a queue which is being freed. 183 */ 184 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) { 185 TQ_UNLOCK(queue); 186 return EPIPE; 187 } 188 189 /* 190 * Count multiple enqueues. 191 */ 192 if (task->ta_pending) { 193 task->ta_pending++; 194 TQ_UNLOCK(queue); 195 return 0; 196 } 197 198 /* 199 * Optimise the case when all tasks have the same priority. 200 */ 201 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 202 if (!prev || prev->ta_priority >= task->ta_priority) { 203 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 204 } else { 205 prev = NULL; 206 for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 207 prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 208 if (ins->ta_priority < task->ta_priority) 209 break; 210 211 if (prev) 212 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 213 else 214 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 215 } 216 217 task->ta_pending = 1; 218 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) { 219 if (queue->tq_enqueue) 220 queue->tq_enqueue(queue->tq_context); 221 } else { 222 queue->tq_flags |= TQ_FLAGS_PENDING; 223 } 224 225 TQ_UNLOCK(queue); 226 227 return 0; 228 } 229 230 void 231 taskqueue_block(struct taskqueue *queue) 232 { 233 TQ_LOCK(queue); 234 queue->tq_flags |= TQ_FLAGS_BLOCKED; 235 TQ_UNLOCK(queue); 236 } 237 238 void 239 taskqueue_unblock(struct taskqueue *queue) 240 { 241 TQ_LOCK(queue); 242 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 243 if (queue->tq_flags & TQ_FLAGS_PENDING) { 244 queue->tq_flags &= ~TQ_FLAGS_PENDING; 245 if (queue->tq_enqueue) 246 queue->tq_enqueue(queue->tq_context); 247 } 248 TQ_UNLOCK(queue); 249 } 250 251 void 252 taskqueue_run(struct taskqueue *queue, int lock_held) 253 { 254 struct task *task; 255 int pending; 256 257 if (lock_held == 0) 258 TQ_LOCK(queue); 259 while (STAILQ_FIRST(&queue->tq_queue)) { 260 /* 261 * Carefully remove the first task from the queue and 262 * zero its pending count. 263 */ 264 task = STAILQ_FIRST(&queue->tq_queue); 265 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 266 pending = task->ta_pending; 267 task->ta_pending = 0; 268 queue->tq_running = task; 269 TQ_UNLOCK(queue); 270 271 task->ta_func(task->ta_context, pending); 272 273 TQ_LOCK(queue); 274 queue->tq_running = NULL; 275 wakeup(task); 276 } 277 if (lock_held == 0) 278 TQ_UNLOCK(queue); 279 } 280 281 void 282 taskqueue_drain(struct taskqueue *queue, struct task *task) 283 { 284 TQ_LOCK(queue); 285 while (task->ta_pending != 0 || task == queue->tq_running) 286 TQ_SLEEP(queue, task, "-"); 287 TQ_UNLOCK(queue); 288 } 289 290 static void 291 taskqueue_swi_enqueue(void *context) 292 { 293 setsofttq(); 294 } 295 296 static void 297 taskqueue_swi_run(void *arg, void *frame) 298 { 299 taskqueue_run(taskqueue_swi, 0); 300 } 301 302 static void 303 taskqueue_swi_mp_run(void *arg, void *frame) 304 { 305 taskqueue_run(taskqueue_swi_mp, 0); 306 } 307 308 int 309 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, int ncpu, 310 const char *fmt, ...) 311 { 312 __va_list ap; 313 struct thread *td; 314 struct taskqueue *tq; 315 int i, error, cpu; 316 char ktname[MAXCOMLEN]; 317 318 if (count <= 0) 319 return EINVAL; 320 321 tq = *tqp; 322 cpu = ncpu; 323 324 __va_start(ap, fmt); 325 kvsnprintf(ktname, MAXCOMLEN, fmt, ap); 326 __va_end(ap); 327 328 tq->tq_threads = kmalloc(sizeof(struct thread *) * count, M_TASKQUEUE, 329 M_WAITOK | M_ZERO); 330 331 for (i = 0; i < count; i++) { 332 /* 333 * If no specific cpu was specified and more than one thread 334 * is to be created, we distribute the threads amongst all 335 * cpus. 336 */ 337 if ((ncpu <= -1) && (count > 1)) 338 cpu = i%ncpus; 339 340 if (count == 1) { 341 error = lwkt_create(taskqueue_thread_loop, tqp, 342 &tq->tq_threads[i], NULL, 343 TDF_STOPREQ, cpu, 344 "%s", ktname); 345 } else { 346 error = lwkt_create(taskqueue_thread_loop, tqp, 347 &tq->tq_threads[i], NULL, 348 TDF_STOPREQ, cpu, 349 "%s_%d", ktname, i); 350 } 351 if (error) { 352 kprintf("%s: kthread_add(%s): error %d", __func__, 353 ktname, error); 354 tq->tq_threads[i] = NULL; 355 } else { 356 td = tq->tq_threads[i]; 357 lwkt_setpri_initial(td, pri); 358 lwkt_schedule(td); 359 tq->tq_tcount++; 360 } 361 } 362 363 return 0; 364 } 365 366 void 367 taskqueue_thread_loop(void *arg) 368 { 369 struct taskqueue **tqp, *tq; 370 371 tqp = arg; 372 tq = *tqp; 373 TQ_LOCK(tq); 374 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 375 taskqueue_run(tq, 1); 376 TQ_SLEEP(tq, tq, "tqthr"); 377 } 378 379 /* rendezvous with thread that asked us to terminate */ 380 tq->tq_tcount--; 381 wakeup_one(tq->tq_threads); 382 TQ_UNLOCK(tq); 383 lwkt_exit(); 384 } 385 386 void 387 taskqueue_thread_enqueue(void *context) 388 { 389 struct taskqueue **tqp, *tq; 390 391 tqp = context; 392 tq = *tqp; 393 394 wakeup_one(tq); 395 } 396 397 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0, 398 register_swi(SWI_TQ, taskqueue_swi_run, NULL, "swi_taskq", NULL)); 399 /* 400 * XXX: possibly use a different SWI_TQ_MP or so. 401 * related: sys/interrupt.h 402 * related: platform/XXX/isa/ipl_funcs.c 403 */ 404 TASKQUEUE_DEFINE(swi_mp, taskqueue_swi_enqueue, 0, 405 register_swi(SWI_TQ, taskqueue_swi_mp_run, NULL, "swi_mp_taskq", NULL)); 406 407 struct taskqueue *taskqueue_thread[MAXCPU]; 408 409 static void 410 taskqueue_init(void) 411 { 412 int cpu; 413 414 lockinit(&taskqueue_queues_lock, "tqqueues", 0, 0); 415 STAILQ_INIT(&taskqueue_queues); 416 417 for (cpu = 0; cpu < ncpus; cpu++) { 418 taskqueue_thread[cpu] = taskqueue_create("thread", M_INTWAIT, 419 taskqueue_thread_enqueue, &taskqueue_thread[cpu]); 420 taskqueue_start_threads(&taskqueue_thread[cpu], 1, 421 TDPRI_KERN_DAEMON, cpu, "taskq_cpu %d", cpu); 422 } 423 } 424 425 SYSINIT(taskqueueinit, SI_SUB_PRE_DRIVERS, SI_ORDER_ANY, taskqueue_init, NULL); 426