1 /*- 2 * Copyright (c) 2000 Doug Rabson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD: src/sys/kern/subr_taskqueue.c,v 1.69 2012/08/28 13:35:37 jhb Exp $" 27 */ 28 29 #include <sys/param.h> 30 #include <sys/queue.h> 31 #include <sys/systm.h> 32 #include <sys/kernel.h> 33 #include <sys/taskqueue.h> 34 #include <sys/interrupt.h> 35 #include <sys/lock.h> 36 #include <sys/malloc.h> 37 #include <sys/kthread.h> 38 #include <sys/spinlock.h> 39 #include <sys/spinlock2.h> 40 #include <sys/serialize.h> 41 #include <sys/proc.h> 42 43 MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); 44 45 static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues; 46 static struct lock taskqueue_queues_lock; 47 48 struct taskqueue { 49 STAILQ_ENTRY(taskqueue) tq_link; 50 STAILQ_HEAD(, task) tq_queue; 51 const char *tq_name; 52 /* NOTE: tq must be locked before calling tq_enqueue */ 53 taskqueue_enqueue_fn tq_enqueue; 54 void *tq_context; 55 56 struct task *tq_running; 57 struct spinlock tq_lock; 58 struct thread **tq_threads; 59 int tq_tcount; 60 int tq_flags; 61 int tq_callouts; 62 }; 63 64 #define TQ_FLAGS_ACTIVE (1 << 0) 65 #define TQ_FLAGS_BLOCKED (1 << 1) 66 #define TQ_FLAGS_PENDING (1 << 2) 67 68 #define DT_CALLOUT_ARMED (1 << 0) 69 70 void 71 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, 72 int priority, task_fn_t func, void *context) 73 { 74 75 TASK_INIT(&timeout_task->t, priority, func, context); 76 callout_init(&timeout_task->c); /* XXX use callout_init_mp() */ 77 timeout_task->q = queue; 78 timeout_task->f = 0; 79 } 80 81 static void taskqueue_run(struct taskqueue *queue, int lock_held); 82 83 static __inline void 84 TQ_LOCK_INIT(struct taskqueue *tq) 85 { 86 spin_init(&tq->tq_lock, "tqlock"); 87 } 88 89 static __inline void 90 TQ_LOCK_UNINIT(struct taskqueue *tq) 91 { 92 spin_uninit(&tq->tq_lock); 93 } 94 95 static __inline void 96 TQ_LOCK(struct taskqueue *tq) 97 { 98 spin_lock(&tq->tq_lock); 99 } 100 101 static __inline void 102 TQ_UNLOCK(struct taskqueue *tq) 103 { 104 spin_unlock(&tq->tq_lock); 105 } 106 107 static __inline void 108 TQ_SLEEP(struct taskqueue *tq, void *ident, const char *wmesg) 109 { 110 ssleep(ident, &tq->tq_lock, 0, wmesg, 0); 111 } 112 113 struct taskqueue * 114 taskqueue_create(const char *name, int mflags, 115 taskqueue_enqueue_fn enqueue, void *context) 116 { 117 struct taskqueue *queue; 118 119 queue = kmalloc(sizeof(*queue), M_TASKQUEUE, mflags | M_ZERO); 120 if (!queue) 121 return NULL; 122 STAILQ_INIT(&queue->tq_queue); 123 queue->tq_name = name; 124 queue->tq_enqueue = enqueue; 125 queue->tq_context = context; 126 queue->tq_flags |= TQ_FLAGS_ACTIVE; 127 TQ_LOCK_INIT(queue); 128 129 lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE); 130 STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link); 131 lockmgr(&taskqueue_queues_lock, LK_RELEASE); 132 133 return queue; 134 } 135 136 /* NOTE: tq must be locked */ 137 static void 138 taskqueue_terminate(struct thread **pp, struct taskqueue *tq) 139 { 140 while(tq->tq_tcount > 0) { 141 /* Unlock spinlock before wakeup() */ 142 TQ_UNLOCK(tq); 143 wakeup(tq); 144 TQ_LOCK(tq); 145 TQ_SLEEP(tq, pp, "taskqueue_terminate"); 146 } 147 } 148 149 void 150 taskqueue_free(struct taskqueue *queue) 151 { 152 TQ_LOCK(queue); 153 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; 154 taskqueue_run(queue, 1); 155 taskqueue_terminate(queue->tq_threads, queue); 156 TQ_UNLOCK(queue); 157 158 lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE); 159 STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link); 160 lockmgr(&taskqueue_queues_lock, LK_RELEASE); 161 162 TQ_LOCK_UNINIT(queue); 163 164 kfree(queue, M_TASKQUEUE); 165 } 166 167 struct taskqueue * 168 taskqueue_find(const char *name) 169 { 170 struct taskqueue *queue; 171 172 lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE); 173 STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) { 174 if (!strcmp(queue->tq_name, name)) { 175 lockmgr(&taskqueue_queues_lock, LK_RELEASE); 176 return queue; 177 } 178 } 179 lockmgr(&taskqueue_queues_lock, LK_RELEASE); 180 return NULL; 181 } 182 183 /* 184 * NOTE! If using the per-cpu taskqueues ``taskqueue_thread[mycpuid]'', 185 * be sure NOT TO SHARE the ``task'' between CPUs. TASKS ARE NOT LOCKED. 186 * So either use a throwaway task which will only be enqueued once, or 187 * use one task per CPU! 188 */ 189 static int 190 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) 191 { 192 struct task *ins; 193 struct task *prev; 194 195 /* 196 * Don't allow new tasks on a queue which is being freed. 197 */ 198 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) 199 return EPIPE; 200 201 /* 202 * Count multiple enqueues. 203 */ 204 if (task->ta_pending) { 205 task->ta_pending++; 206 return 0; 207 } 208 209 /* 210 * Optimise the case when all tasks have the same priority. 211 */ 212 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); 213 if (!prev || prev->ta_priority >= task->ta_priority) { 214 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); 215 } else { 216 prev = NULL; 217 for (ins = STAILQ_FIRST(&queue->tq_queue); ins; 218 prev = ins, ins = STAILQ_NEXT(ins, ta_link)) 219 if (ins->ta_priority < task->ta_priority) 220 break; 221 222 if (prev) 223 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); 224 else 225 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); 226 } 227 228 task->ta_pending = 1; 229 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) { 230 if (queue->tq_enqueue) 231 queue->tq_enqueue(queue->tq_context); 232 } else { 233 queue->tq_flags |= TQ_FLAGS_PENDING; 234 } 235 236 return 0; 237 } 238 239 int 240 taskqueue_enqueue(struct taskqueue *queue, struct task *task) 241 { 242 int res; 243 244 TQ_LOCK(queue); 245 res = taskqueue_enqueue_locked(queue, task); 246 TQ_UNLOCK(queue); 247 248 return (res); 249 } 250 251 static void 252 taskqueue_timeout_func(void *arg) 253 { 254 struct taskqueue *queue; 255 struct timeout_task *timeout_task; 256 257 timeout_task = arg; 258 queue = timeout_task->q; 259 260 TQ_LOCK(queue); 261 KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout")); 262 timeout_task->f &= ~DT_CALLOUT_ARMED; 263 queue->tq_callouts--; 264 taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t); 265 TQ_UNLOCK(queue); 266 } 267 268 int 269 taskqueue_enqueue_timeout(struct taskqueue *queue, 270 struct timeout_task *timeout_task, int ticks) 271 { 272 int res; 273 274 TQ_LOCK(queue); 275 KASSERT(timeout_task->q == NULL || timeout_task->q == queue, 276 ("Migrated queue")); 277 timeout_task->q = queue; 278 res = timeout_task->t.ta_pending; 279 if (ticks == 0) { 280 taskqueue_enqueue_locked(queue, &timeout_task->t); 281 TQ_UNLOCK(queue); 282 } else { 283 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 284 res++; 285 } else { 286 queue->tq_callouts++; 287 timeout_task->f |= DT_CALLOUT_ARMED; 288 } 289 TQ_UNLOCK(queue); 290 callout_reset(&timeout_task->c, ticks, taskqueue_timeout_func, 291 timeout_task); 292 } 293 return (res); 294 } 295 296 void 297 taskqueue_block(struct taskqueue *queue) 298 { 299 TQ_LOCK(queue); 300 queue->tq_flags |= TQ_FLAGS_BLOCKED; 301 TQ_UNLOCK(queue); 302 } 303 304 void 305 taskqueue_unblock(struct taskqueue *queue) 306 { 307 TQ_LOCK(queue); 308 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; 309 if (queue->tq_flags & TQ_FLAGS_PENDING) { 310 queue->tq_flags &= ~TQ_FLAGS_PENDING; 311 if (queue->tq_enqueue) 312 queue->tq_enqueue(queue->tq_context); 313 } 314 TQ_UNLOCK(queue); 315 } 316 317 static void 318 taskqueue_run(struct taskqueue *queue, int lock_held) 319 { 320 struct task *task; 321 int pending; 322 323 if (lock_held == 0) 324 TQ_LOCK(queue); 325 while (STAILQ_FIRST(&queue->tq_queue)) { 326 /* 327 * Carefully remove the first task from the queue and 328 * zero its pending count. 329 */ 330 task = STAILQ_FIRST(&queue->tq_queue); 331 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); 332 pending = task->ta_pending; 333 task->ta_pending = 0; 334 queue->tq_running = task; 335 336 TQ_UNLOCK(queue); 337 task->ta_func(task->ta_context, pending); 338 queue->tq_running = NULL; 339 wakeup(task); 340 TQ_LOCK(queue); 341 } 342 if (lock_held == 0) 343 TQ_UNLOCK(queue); 344 } 345 346 static int 347 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, 348 u_int *pendp) 349 { 350 351 if (task->ta_pending > 0) 352 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); 353 if (pendp != NULL) 354 *pendp = task->ta_pending; 355 task->ta_pending = 0; 356 return (task == queue->tq_running ? EBUSY : 0); 357 } 358 359 int 360 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) 361 { 362 int error; 363 364 TQ_LOCK(queue); 365 error = taskqueue_cancel_locked(queue, task, pendp); 366 TQ_UNLOCK(queue); 367 368 return (error); 369 } 370 371 int 372 taskqueue_cancel_timeout(struct taskqueue *queue, 373 struct timeout_task *timeout_task, u_int *pendp) 374 { 375 u_int pending, pending1; 376 int error; 377 378 pending = !!callout_stop(&timeout_task->c); 379 TQ_LOCK(queue); 380 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); 381 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { 382 timeout_task->f &= ~DT_CALLOUT_ARMED; 383 queue->tq_callouts--; 384 } 385 TQ_UNLOCK(queue); 386 387 if (pendp != NULL) 388 *pendp = pending + pending1; 389 return (error); 390 } 391 392 void 393 taskqueue_drain(struct taskqueue *queue, struct task *task) 394 { 395 TQ_LOCK(queue); 396 while (task->ta_pending != 0 || task == queue->tq_running) 397 TQ_SLEEP(queue, task, "-"); 398 TQ_UNLOCK(queue); 399 } 400 401 void 402 taskqueue_drain_timeout(struct taskqueue *queue, 403 struct timeout_task *timeout_task) 404 { 405 callout_cancel(&timeout_task->c); 406 taskqueue_drain(queue, &timeout_task->t); 407 } 408 409 static void 410 taskqueue_swi_enqueue(void *context) 411 { 412 setsofttq(); 413 } 414 415 static void 416 taskqueue_swi_run(void *arg, void *frame) 417 { 418 taskqueue_run(taskqueue_swi, 0); 419 } 420 421 static void 422 taskqueue_swi_mp_run(void *arg, void *frame) 423 { 424 taskqueue_run(taskqueue_swi_mp, 0); 425 } 426 427 int 428 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, int ncpu, 429 const char *fmt, ...) 430 { 431 __va_list ap; 432 struct thread *td; 433 struct taskqueue *tq; 434 int i, error, cpu; 435 char ktname[MAXCOMLEN]; 436 437 if (count <= 0) 438 return EINVAL; 439 /* catch call argument mistakes */ 440 KKASSERT(pri > 0 && pri < TDPRI_MAX); 441 442 tq = *tqp; 443 cpu = ncpu; 444 445 __va_start(ap, fmt); 446 kvsnprintf(ktname, MAXCOMLEN, fmt, ap); 447 __va_end(ap); 448 449 tq->tq_threads = kmalloc(sizeof(struct thread *) * count, M_TASKQUEUE, 450 M_WAITOK | M_ZERO); 451 452 for (i = 0; i < count; i++) { 453 /* 454 * If no specific cpu was specified and more than one thread 455 * is to be created, we distribute the threads amongst all 456 * cpus. 457 */ 458 if ((ncpu <= -1) && (count > 1)) 459 cpu = i%ncpus; 460 461 if (count == 1) { 462 error = lwkt_create(taskqueue_thread_loop, tqp, 463 &tq->tq_threads[i], NULL, 464 TDF_NOSTART, cpu, 465 "%s", ktname); 466 } else { 467 error = lwkt_create(taskqueue_thread_loop, tqp, 468 &tq->tq_threads[i], NULL, 469 TDF_NOSTART, cpu, 470 "%s_%d", ktname, i); 471 } 472 if (error) { 473 kprintf("%s: lwkt_create(%s): error %d", __func__, 474 ktname, error); 475 tq->tq_threads[i] = NULL; 476 } else { 477 td = tq->tq_threads[i]; 478 lwkt_setpri_initial(td, pri); 479 lwkt_schedule(td); 480 tq->tq_tcount++; 481 } 482 } 483 484 return 0; 485 } 486 487 void 488 taskqueue_thread_loop(void *arg) 489 { 490 struct taskqueue **tqp, *tq; 491 492 tqp = arg; 493 tq = *tqp; 494 TQ_LOCK(tq); 495 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { 496 taskqueue_run(tq, 1); 497 TQ_SLEEP(tq, tq, "tqthr"); 498 } 499 500 /* rendezvous with thread that asked us to terminate */ 501 tq->tq_tcount--; 502 TQ_UNLOCK(tq); 503 wakeup_one(tq->tq_threads); 504 lwkt_exit(); 505 } 506 507 /* NOTE: tq must be locked */ 508 void 509 taskqueue_thread_enqueue(void *context) 510 { 511 struct taskqueue **tqp, *tq; 512 513 tqp = context; 514 tq = *tqp; 515 516 /* Unlock spinlock before wakeup_one() */ 517 TQ_UNLOCK(tq); 518 wakeup_one(tq); 519 TQ_LOCK(tq); 520 } 521 522 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0, 523 register_swi(SWI_TQ, taskqueue_swi_run, NULL, "swi_taskq", NULL, -1)); 524 /* 525 * XXX: possibly use a different SWI_TQ_MP or so. 526 * related: sys/interrupt.h 527 * related: platform/XXX/isa/ipl_funcs.c 528 */ 529 TASKQUEUE_DEFINE(swi_mp, taskqueue_swi_enqueue, 0, 530 register_swi_mp(SWI_TQ, taskqueue_swi_mp_run, NULL, "swi_mp_taskq", NULL, 531 -1)); 532 533 struct taskqueue *taskqueue_thread[MAXCPU]; 534 535 static void 536 taskqueue_init(void) 537 { 538 int cpu; 539 540 lockinit(&taskqueue_queues_lock, "tqqueues", 0, 0); 541 STAILQ_INIT(&taskqueue_queues); 542 543 for (cpu = 0; cpu < ncpus; cpu++) { 544 taskqueue_thread[cpu] = taskqueue_create("thread", M_INTWAIT, 545 taskqueue_thread_enqueue, &taskqueue_thread[cpu]); 546 taskqueue_start_threads(&taskqueue_thread[cpu], 1, 547 TDPRI_KERN_DAEMON, cpu, "taskq_cpu %d", cpu); 548 } 549 } 550 551 SYSINIT(taskqueueinit, SI_SUB_PRE_DRIVERS, SI_ORDER_ANY, taskqueue_init, NULL); 552