1 /* $NetBSD: taskq.c,v 1.2 2010/02/28 14:45:47 haad Exp $ */ 2 3 /* 4 * CDDL HEADER START 5 * 6 * The contents of this file are subject to the terms of the 7 * Common Development and Distribution License, Version 1.0 only 8 * (the "License"). You may not use this file except in compliance 9 * with the License. 10 * 11 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 12 * or http://www.opensolaris.org/os/licensing. 13 * See the License for the specific language governing permissions 14 * and limitations under the License. 15 * 16 * When distributing Covered Code, include this CDDL HEADER in each 17 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 18 * If applicable, add the following below this CDDL HEADER, with the 19 * fields enclosed by brackets "[]" replaced with your own identifying 20 * information: Portions Copyright [yyyy] [name of copyright owner] 21 * 22 * CDDL HEADER END 23 */ 24 /* 25 * Copyright 2005 Sun Microsystems, Inc. All rights reserved. 26 * Use is subject to license terms. 27 */ 28 29 #pragma ident "%Z%%M% %I% %E% SMI" 30 31 /* 32 * Kernel task queues: general-purpose asynchronous task scheduling. 33 * 34 * A common problem in kernel programming is the need to schedule tasks 35 * to be performed later, by another thread. There are several reasons 36 * you may want or need to do this: 37 * 38 * (1) The task isn't time-critical, but your current code path is. 39 * 40 * (2) The task may require grabbing locks that you already hold. 41 * 42 * (3) The task may need to block (e.g. to wait for memory), but you 43 * cannot block in your current context. 44 * 45 * (4) Your code path can't complete because of some condition, but you can't 46 * sleep or fail, so you queue the task for later execution when condition 47 * disappears. 48 * 49 * (5) You just want a simple way to launch multiple tasks in parallel. 50 * 51 * Task queues provide such a facility. In its simplest form (used when 52 * performance is not a critical consideration) a task queue consists of a 53 * single list of tasks, together with one or more threads to service the 54 * list. There are some cases when this simple queue is not sufficient: 55 * 56 * (1) The task queues are very hot and there is a need to avoid data and lock 57 * contention over global resources. 58 * 59 * (2) Some tasks may depend on other tasks to complete, so they can't be put in 60 * the same list managed by the same thread. 61 * 62 * (3) Some tasks may block for a long time, and this should not block other 63 * tasks in the queue. 64 * 65 * To provide useful service in such cases we define a "dynamic task queue" 66 * which has an individual thread for each of the tasks. These threads are 67 * dynamically created as they are needed and destroyed when they are not in 68 * use. The API for managing task pools is the same as for managing task queues 69 * with the exception of a taskq creation flag TASKQ_DYNAMIC which tells that 70 * dynamic task pool behavior is desired. 71 * 72 * Dynamic task queues may also place tasks in the normal queue (called "backing 73 * queue") when task pool runs out of resources. Users of task queues may 74 * disallow such queued scheduling by specifying TQ_NOQUEUE in the dispatch 75 * flags. 76 * 77 * The backing task queue is also used for scheduling internal tasks needed for 78 * dynamic task queue maintenance. 79 * 80 * INTERFACES: 81 * 82 * taskq_t *taskq_create(name, nthreads, pri_t pri, minalloc, maxall, flags); 83 * 84 * Create a taskq with specified properties. 85 * Possible 'flags': 86 * 87 * TASKQ_DYNAMIC: Create task pool for task management. If this flag is 88 * specified, 'nthreads' specifies the maximum number of threads in 89 * the task queue. Task execution order for dynamic task queues is 90 * not predictable. 91 * 92 * If this flag is not specified (default case) a 93 * single-list task queue is created with 'nthreads' threads 94 * servicing it. Entries in this queue are managed by 95 * taskq_ent_alloc() and taskq_ent_free() which try to keep the 96 * task population between 'minalloc' and 'maxalloc', but the 97 * latter limit is only advisory for TQ_SLEEP dispatches and the 98 * former limit is only advisory for TQ_NOALLOC dispatches. If 99 * TASKQ_PREPOPULATE is set in 'flags', the taskq will be 100 * prepopulated with 'minalloc' task structures. 101 * 102 * Since non-DYNAMIC taskqs are queues, tasks are guaranteed to be 103 * executed in the order they are scheduled if nthreads == 1. 104 * If nthreads > 1, task execution order is not predictable. 105 * 106 * TASKQ_PREPOPULATE: Prepopulate task queue with threads. 107 * Also prepopulate the task queue with 'minalloc' task structures. 108 * 109 * TASKQ_CPR_SAFE: This flag specifies that users of the task queue will 110 * use their own protocol for handling CPR issues. This flag is not 111 * supported for DYNAMIC task queues. 112 * 113 * The 'pri' field specifies the default priority for the threads that 114 * service all scheduled tasks. 115 * 116 * void taskq_destroy(tap): 117 * 118 * Waits for any scheduled tasks to complete, then destroys the taskq. 119 * Caller should guarantee that no new tasks are scheduled in the closing 120 * taskq. 121 * 122 * taskqid_t taskq_dispatch(tq, func, arg, flags): 123 * 124 * Dispatches the task "func(arg)" to taskq. The 'flags' indicates whether 125 * the caller is willing to block for memory. The function returns an 126 * opaque value which is zero iff dispatch fails. If flags is TQ_NOSLEEP 127 * or TQ_NOALLOC and the task can't be dispatched, taskq_dispatch() fails 128 * and returns (taskqid_t)0. 129 * 130 * ASSUMES: func != NULL. 131 * 132 * Possible flags: 133 * TQ_NOSLEEP: Do not wait for resources; may fail. 134 * 135 * TQ_NOALLOC: Do not allocate memory; may fail. May only be used with 136 * non-dynamic task queues. 137 * 138 * TQ_NOQUEUE: Do not enqueue a task if it can't dispatch it due to 139 * lack of available resources and fail. If this flag is not 140 * set, and the task pool is exhausted, the task may be scheduled 141 * in the backing queue. This flag may ONLY be used with dynamic 142 * task queues. 143 * 144 * NOTE: This flag should always be used when a task queue is used 145 * for tasks that may depend on each other for completion. 146 * Enqueueing dependent tasks may create deadlocks. 147 * 148 * TQ_SLEEP: May block waiting for resources. May still fail for 149 * dynamic task queues if TQ_NOQUEUE is also specified, otherwise 150 * always succeed. 151 * 152 * NOTE: Dynamic task queues are much more likely to fail in 153 * taskq_dispatch() (especially if TQ_NOQUEUE was specified), so it 154 * is important to have backup strategies handling such failures. 155 * 156 * void taskq_wait(tq): 157 * 158 * Waits for all previously scheduled tasks to complete. 159 * 160 * NOTE: It does not stop any new task dispatches. 161 * Do NOT call taskq_wait() from a task: it will cause deadlock. 162 * 163 * void taskq_suspend(tq) 164 * 165 * Suspend all task execution. Tasks already scheduled for a dynamic task 166 * queue will still be executed, but all new scheduled tasks will be 167 * suspended until taskq_resume() is called. 168 * 169 * int taskq_suspended(tq) 170 * 171 * Returns 1 if taskq is suspended and 0 otherwise. It is intended to 172 * ASSERT that the task queue is suspended. 173 * 174 * void taskq_resume(tq) 175 * 176 * Resume task queue execution. 177 * 178 * int taskq_member(tq, thread) 179 * 180 * Returns 1 if 'thread' belongs to taskq 'tq' and 0 otherwise. The 181 * intended use is to ASSERT that a given function is called in taskq 182 * context only. 183 * 184 * system_taskq 185 * 186 * Global system-wide dynamic task queue for common uses. It may be used by 187 * any subsystem that needs to schedule tasks and does not need to manage 188 * its own task queues. It is initialized quite early during system boot. 189 * 190 * IMPLEMENTATION. 191 * 192 * This is schematic representation of the task queue structures. 193 * 194 * taskq: 195 * +-------------+ 196 * |tq_lock | +---< taskq_ent_free() 197 * +-------------+ | 198 * |... | | tqent: tqent: 199 * +-------------+ | +------------+ +------------+ 200 * | tq_freelist |-->| tqent_next |--> ... ->| tqent_next | 201 * +-------------+ +------------+ +------------+ 202 * |... | | ... | | ... | 203 * +-------------+ +------------+ +------------+ 204 * | tq_task | | 205 * | | +-------------->taskq_ent_alloc() 206 * +--------------------------------------------------------------------------+ 207 * | | | tqent tqent | 208 * | +---------------------+ +--> +------------+ +--> +------------+ | 209 * | | ... | | | func, arg | | | func, arg | | 210 * +>+---------------------+ <---|-+ +------------+ <---|-+ +------------+ | 211 * | tq_taskq.tqent_next | ----+ | | tqent_next | --->+ | | tqent_next |--+ 212 * +---------------------+ | +------------+ ^ | +------------+ 213 * +-| tq_task.tqent_prev | +--| tqent_prev | | +--| tqent_prev | ^ 214 * | +---------------------+ +------------+ | +------------+ | 215 * | |... | | ... | | | ... | | 216 * | +---------------------+ +------------+ | +------------+ | 217 * | ^ | | 218 * | | | | 219 * +--------------------------------------+--------------+ TQ_APPEND() -+ 220 * | | | 221 * |... | taskq_thread()-----+ 222 * +-------------+ 223 * | tq_buckets |--+-------> [ NULL ] (for regular task queues) 224 * +-------------+ | 225 * | DYNAMIC TASK QUEUES: 226 * | 227 * +-> taskq_bucket[nCPU] taskq_bucket_dispatch() 228 * +-------------------+ ^ 229 * +--->| tqbucket_lock | | 230 * | +-------------------+ +--------+ +--------+ 231 * | | tqbucket_freelist |-->| tqent |-->...| tqent | ^ 232 * | +-------------------+<--+--------+<--...+--------+ | 233 * | | ... | | thread | | thread | | 234 * | +-------------------+ +--------+ +--------+ | 235 * | +-------------------+ | 236 * taskq_dispatch()--+--->| tqbucket_lock | TQ_APPEND()------+ 237 * TQ_HASH() | +-------------------+ +--------+ +--------+ 238 * | | tqbucket_freelist |-->| tqent |-->...| tqent | 239 * | +-------------------+<--+--------+<--...+--------+ 240 * | | ... | | thread | | thread | 241 * | +-------------------+ +--------+ +--------+ 242 * +---> ... 243 * 244 * 245 * Task queues use tq_task field to link new entry in the queue. The queue is a 246 * circular doubly-linked list. Entries are put in the end of the list with 247 * TQ_APPEND() and processed from the front of the list by taskq_thread() in 248 * FIFO order. Task queue entries are cached in the free list managed by 249 * taskq_ent_alloc() and taskq_ent_free() functions. 250 * 251 * All threads used by task queues mark t_taskq field of the thread to 252 * point to the task queue. 253 * 254 * Dynamic Task Queues Implementation. 255 * 256 * For a dynamic task queues there is a 1-to-1 mapping between a thread and 257 * taskq_ent_structure. Each entry is serviced by its own thread and each thread 258 * is controlled by a single entry. 259 * 260 * Entries are distributed over a set of buckets. To avoid using modulo 261 * arithmetics the number of buckets is 2^n and is determined as the nearest 262 * power of two roundown of the number of CPUs in the system. Tunable 263 * variable 'taskq_maxbuckets' limits the maximum number of buckets. Each entry 264 * is attached to a bucket for its lifetime and can't migrate to other buckets. 265 * 266 * Entries that have scheduled tasks are not placed in any list. The dispatch 267 * function sets their "func" and "arg" fields and signals the corresponding 268 * thread to execute the task. Once the thread executes the task it clears the 269 * "func" field and places an entry on the bucket cache of free entries pointed 270 * by "tqbucket_freelist" field. ALL entries on the free list should have "func" 271 * field equal to NULL. The free list is a circular doubly-linked list identical 272 * in structure to the tq_task list above, but entries are taken from it in LIFO 273 * order - the last freed entry is the first to be allocated. The 274 * taskq_bucket_dispatch() function gets the most recently used entry from the 275 * free list, sets its "func" and "arg" fields and signals a worker thread. 276 * 277 * After executing each task a per-entry thread taskq_d_thread() places its 278 * entry on the bucket free list and goes to a timed sleep. If it wakes up 279 * without getting new task it removes the entry from the free list and destroys 280 * itself. The thread sleep time is controlled by a tunable variable 281 * `taskq_thread_timeout'. 282 * 283 * There is various statistics kept in the bucket which allows for later 284 * analysis of taskq usage patterns. Also, a global copy of taskq creation and 285 * death statistics is kept in the global taskq data structure. Since thread 286 * creation and death happen rarely, updating such global data does not present 287 * a performance problem. 288 * 289 * NOTE: Threads are not bound to any CPU and there is absolutely no association 290 * between the bucket and actual thread CPU, so buckets are used only to 291 * split resources and reduce resource contention. Having threads attached 292 * to the CPU denoted by a bucket may reduce number of times the job 293 * switches between CPUs. 294 * 295 * Current algorithm creates a thread whenever a bucket has no free 296 * entries. It would be nice to know how many threads are in the running 297 * state and don't create threads if all CPUs are busy with existing 298 * tasks, but it is unclear how such strategy can be implemented. 299 * 300 * Currently buckets are created statically as an array attached to task 301 * queue. On some system with nCPUs < max_ncpus it may waste system 302 * memory. One solution may be allocation of buckets when they are first 303 * touched, but it is not clear how useful it is. 304 * 305 * SUSPEND/RESUME implementation. 306 * 307 * Before executing a task taskq_thread() (executing non-dynamic task 308 * queues) obtains taskq's thread lock as a reader. The taskq_suspend() 309 * function gets the same lock as a writer blocking all non-dynamic task 310 * execution. The taskq_resume() function releases the lock allowing 311 * taskq_thread to continue execution. 312 * 313 * For dynamic task queues, each bucket is marked as TQBUCKET_SUSPEND by 314 * taskq_suspend() function. After that taskq_bucket_dispatch() always 315 * fails, so that taskq_dispatch() will either enqueue tasks for a 316 * suspended backing queue or fail if TQ_NOQUEUE is specified in dispatch 317 * flags. 318 * 319 * NOTE: taskq_suspend() does not immediately block any tasks already 320 * scheduled for dynamic task queues. It only suspends new tasks 321 * scheduled after taskq_suspend() was called. 322 * 323 * taskq_member() function works by comparing a thread t_taskq pointer with 324 * the passed thread pointer. 325 * 326 * LOCKS and LOCK Hierarchy: 327 * 328 * There are two locks used in task queues. 329 * 330 * 1) Task queue structure has a lock, protecting global task queue state. 331 * 332 * 2) Each per-CPU bucket has a lock for bucket management. 333 * 334 * If both locks are needed, task queue lock should be taken only after bucket 335 * lock. 336 * 337 * DEBUG FACILITIES. 338 * 339 * For DEBUG kernels it is possible to induce random failures to 340 * taskq_dispatch() function when it is given TQ_NOSLEEP argument. The value of 341 * taskq_dmtbf and taskq_smtbf tunables control the mean time between induced 342 * failures for dynamic and static task queues respectively. 343 * 344 * Setting TASKQ_STATISTIC to 0 will disable per-bucket statistics. 345 * 346 * TUNABLES 347 * 348 * system_taskq_size - Size of the global system_taskq. 349 * This value is multiplied by nCPUs to determine 350 * actual size. 351 * Default value: 64 352 * 353 * taskq_thread_timeout - Maximum idle time for taskq_d_thread() 354 * Default value: 5 minutes 355 * 356 * taskq_maxbuckets - Maximum number of buckets in any task queue 357 * Default value: 128 358 * 359 * taskq_search_depth - Maximum # of buckets searched for a free entry 360 * Default value: 4 361 * 362 * taskq_dmtbf - Mean time between induced dispatch failures 363 * for dynamic task queues. 364 * Default value: UINT_MAX (no induced failures) 365 * 366 * taskq_smtbf - Mean time between induced dispatch failures 367 * for static task queues. 368 * Default value: UINT_MAX (no induced failures) 369 * 370 * CONDITIONAL compilation. 371 * 372 * TASKQ_STATISTIC - If set will enable bucket statistic (default). 373 * 374 */ 375 376 #include <sys/kthread.h> 377 #include <sys/taskq_impl.h> 378 #include <sys/proc.h> 379 #include <sys/kmem.h> 380 #include <sys/callb.h> 381 #include <sys/systm.h> 382 #include <sys/cmn_err.h> 383 #include <sys/debug.h> 384 #include <sys/sysmacros.h> 385 #include <sys/sdt.h> 386 #include <sys/mutex.h> 387 #include <sys/kernel.h> 388 #include <sys/limits.h> 389 390 static kmem_cache_t *taskq_ent_cache, *taskq_cache; 391 392 /* Global system task queue for common use */ 393 taskq_t *system_taskq; 394 395 /* 396 * Maxmimum number of entries in global system taskq is 397 * system_taskq_size * max_ncpus 398 */ 399 #define SYSTEM_TASKQ_SIZE 1 400 int system_taskq_size = SYSTEM_TASKQ_SIZE; 401 402 #define TASKQ_ACTIVE 0x00010000 403 404 /* 405 * Dynamic task queue threads that don't get any work within 406 * taskq_thread_timeout destroy themselves 407 */ 408 #define TASKQ_THREAD_TIMEOUT (60 * 5) 409 int taskq_thread_timeout = TASKQ_THREAD_TIMEOUT; 410 411 #define TASKQ_MAXBUCKETS 128 412 int taskq_maxbuckets = TASKQ_MAXBUCKETS; 413 414 /* 415 * When a bucket has no available entries another buckets are tried. 416 * taskq_search_depth parameter limits the amount of buckets that we search 417 * before failing. This is mostly useful in systems with many CPUs where we may 418 * spend too much time scanning busy buckets. 419 */ 420 #define TASKQ_SEARCH_DEPTH 4 421 int taskq_search_depth = TASKQ_SEARCH_DEPTH; 422 423 /* 424 * Hashing function: mix various bits of x. May be pretty much anything. 425 */ 426 #define TQ_HASH(x) ((x) ^ ((x) >> 11) ^ ((x) >> 17) ^ ((x) ^ 27)) 427 428 /* 429 * We do not create any new threads when the system is low on memory and start 430 * throttling memory allocations. The following macro tries to estimate such 431 * condition. 432 */ 433 #define ENOUGH_MEMORY() (freemem > throttlefree) 434 435 /* 436 * Static functions. 437 */ 438 static taskq_t *taskq_create_common(const char *, int, int, pri_t, int, 439 int, uint_t); 440 static void taskq_thread(void *); 441 static int taskq_constructor(void *, void *, int); 442 static void taskq_destructor(void *, void *); 443 static int taskq_ent_constructor(void *, void *, int); 444 static void taskq_ent_destructor(void *, void *); 445 static taskq_ent_t *taskq_ent_alloc(taskq_t *, int); 446 static void taskq_ent_free(taskq_t *, taskq_ent_t *); 447 448 /* 449 * Collect per-bucket statistic when TASKQ_STATISTIC is defined. 450 */ 451 #define TASKQ_STATISTIC 1 452 453 #if TASKQ_STATISTIC 454 #define TQ_STAT(b, x) b->tqbucket_stat.x++ 455 #else 456 #define TQ_STAT(b, x) 457 #endif 458 459 /* 460 * Random fault injection. 461 */ 462 uint_t taskq_random; 463 uint_t taskq_dmtbf = UINT_MAX; /* mean time between injected failures */ 464 uint_t taskq_smtbf = UINT_MAX; /* mean time between injected failures */ 465 466 /* 467 * TQ_NOSLEEP dispatches on dynamic task queues are always allowed to fail. 468 * 469 * TQ_NOSLEEP dispatches on static task queues can't arbitrarily fail because 470 * they could prepopulate the cache and make sure that they do not use more 471 * then minalloc entries. So, fault injection in this case insures that 472 * either TASKQ_PREPOPULATE is not set or there are more entries allocated 473 * than is specified by minalloc. TQ_NOALLOC dispatches are always allowed 474 * to fail, but for simplicity we treat them identically to TQ_NOSLEEP 475 * dispatches. 476 */ 477 #ifdef DEBUG 478 #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag) \ 479 taskq_random = (taskq_random * 2416 + 374441) % 1771875;\ 480 if ((flag & TQ_NOSLEEP) && \ 481 taskq_random < 1771875 / taskq_dmtbf) { \ 482 return (NULL); \ 483 } 484 485 #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag) \ 486 taskq_random = (taskq_random * 2416 + 374441) % 1771875;\ 487 if ((flag & (TQ_NOSLEEP | TQ_NOALLOC)) && \ 488 (!(tq->tq_flags & TASKQ_PREPOPULATE) || \ 489 (tq->tq_nalloc > tq->tq_minalloc)) && \ 490 (taskq_random < (1771875 / taskq_smtbf))) { \ 491 mutex_exit(&tq->tq_lock); \ 492 return ((taskqid_t)0); \ 493 } 494 #else 495 #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag) 496 #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag) 497 #endif 498 499 #define IS_EMPTY(l) (((l).tqent_prev == (l).tqent_next) && \ 500 ((l).tqent_prev == &(l))) 501 502 /* 503 * Append `tqe' in the end of the doubly-linked list denoted by l. 504 */ 505 #define TQ_APPEND(l, tqe) { \ 506 tqe->tqent_next = &l; \ 507 tqe->tqent_prev = l.tqent_prev; \ 508 tqe->tqent_next->tqent_prev = tqe; \ 509 tqe->tqent_prev->tqent_next = tqe; \ 510 } 511 512 /* 513 * Schedule a task specified by func and arg into the task queue entry tqe. 514 */ 515 #define TQ_ENQUEUE(tq, tqe, func, arg) { \ 516 ASSERT(MUTEX_HELD(&tq->tq_lock)); \ 517 TQ_APPEND(tq->tq_task, tqe); \ 518 tqe->tqent_func = (func); \ 519 tqe->tqent_arg = (arg); \ 520 tq->tq_tasks++; \ 521 if (tq->tq_tasks - tq->tq_executed > tq->tq_maxtasks) \ 522 tq->tq_maxtasks = tq->tq_tasks - tq->tq_executed; \ 523 cv_signal(&tq->tq_dispatch_cv); \ 524 DTRACE_PROBE2(taskq__enqueue, taskq_t *, tq, taskq_ent_t *, tqe); \ 525 } 526 527 /* 528 * Do-nothing task which may be used to prepopulate thread caches. 529 */ 530 /*ARGSUSED*/ 531 void 532 nulltask(void *unused) 533 { 534 } 535 536 537 /*ARGSUSED*/ 538 static int 539 taskq_constructor(void *arg, void *obj, int kmflags) 540 { 541 taskq_t *tq = obj; 542 543 memset(tq, 0, sizeof (taskq_t)); 544 545 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 546 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 547 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 548 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 549 550 tq->tq_task.tqent_next = &tq->tq_task; 551 tq->tq_task.tqent_prev = &tq->tq_task; 552 553 return (0); 554 } 555 556 /*ARGSUSED*/ 557 static void 558 taskq_destructor(void *arg, void *obj) 559 { 560 taskq_t *tq = obj; 561 562 mutex_destroy(&tq->tq_lock); 563 rw_destroy(&tq->tq_threadlock); 564 cv_destroy(&tq->tq_dispatch_cv); 565 cv_destroy(&tq->tq_wait_cv); 566 } 567 568 /*ARGSUSED*/ 569 static int 570 taskq_ent_constructor(void *arg, void *obj, int kmflags) 571 { 572 taskq_ent_t *tqe = obj; 573 574 tqe->tqent_thread = NULL; 575 cv_init(&tqe->tqent_cv, NULL, CV_DEFAULT, NULL); 576 577 return (0); 578 } 579 580 /*ARGSUSED*/ 581 static void 582 taskq_ent_destructor(void *arg, void *obj) 583 { 584 taskq_ent_t *tqe = obj; 585 586 ASSERT(tqe->tqent_thread == NULL); 587 cv_destroy(&tqe->tqent_cv); 588 } 589 590 /* 591 * Create global system dynamic task queue. 592 */ 593 void 594 system_taskq_init(void) 595 { 596 system_taskq = taskq_create_common("system_taskq", 0, 597 system_taskq_size * max_ncpus, minclsyspri, 4, 512, 598 TASKQ_PREPOPULATE); 599 } 600 601 void 602 system_taskq_fini(void) 603 { 604 taskq_destroy(system_taskq); 605 } 606 607 void 608 taskq_init(void) 609 { 610 taskq_ent_cache = kmem_cache_create("taskq_ent_cache", 611 sizeof (taskq_ent_t), 0, taskq_ent_constructor, 612 taskq_ent_destructor, NULL, NULL, NULL, 0); 613 taskq_cache = kmem_cache_create("taskq_cache", sizeof (taskq_t), 614 0, taskq_constructor, taskq_destructor, NULL, NULL, NULL, 0); 615 system_taskq_init(); 616 } 617 618 void 619 taskq_fini(void) 620 { 621 system_taskq_fini(); 622 kmem_cache_destroy(taskq_cache); 623 kmem_cache_destroy(taskq_ent_cache); 624 } 625 626 /* 627 * taskq_ent_alloc() 628 * 629 * Allocates a new taskq_ent_t structure either from the free list or from the 630 * cache. Returns NULL if it can't be allocated. 631 * 632 * Assumes: tq->tq_lock is held. 633 */ 634 static taskq_ent_t * 635 taskq_ent_alloc(taskq_t *tq, int flags) 636 { 637 int kmflags = KM_NOSLEEP; 638 639 taskq_ent_t *tqe; 640 641 ASSERT(MUTEX_HELD(&tq->tq_lock)); 642 643 /* 644 * TQ_NOALLOC allocations are allowed to use the freelist, even if 645 * we are below tq_minalloc. 646 */ 647 if ((tqe = tq->tq_freelist) != NULL && 648 ((flags & TQ_NOALLOC) || tq->tq_nalloc >= tq->tq_minalloc)) { 649 tq->tq_freelist = tqe->tqent_next; 650 } else { 651 if (flags & TQ_NOALLOC) 652 return (NULL); 653 654 mutex_exit(&tq->tq_lock); 655 if (tq->tq_nalloc >= tq->tq_maxalloc) { 656 if (kmflags & KM_NOSLEEP) { 657 mutex_enter(&tq->tq_lock); 658 return (NULL); 659 } 660 /* 661 * We don't want to exceed tq_maxalloc, but we can't 662 * wait for other tasks to complete (and thus free up 663 * task structures) without risking deadlock with 664 * the caller. So, we just delay for one second 665 * to throttle the allocation rate. 666 */ 667 delay(hz); 668 } 669 tqe = kmem_cache_alloc(taskq_ent_cache, kmflags); 670 mutex_enter(&tq->tq_lock); 671 if (tqe != NULL) 672 tq->tq_nalloc++; 673 } 674 return (tqe); 675 } 676 677 /* 678 * taskq_ent_free() 679 * 680 * Free taskq_ent_t structure by either putting it on the free list or freeing 681 * it to the cache. 682 * 683 * Assumes: tq->tq_lock is held. 684 */ 685 static void 686 taskq_ent_free(taskq_t *tq, taskq_ent_t *tqe) 687 { 688 ASSERT(MUTEX_HELD(&tq->tq_lock)); 689 690 if (tq->tq_nalloc <= tq->tq_minalloc) { 691 tqe->tqent_next = tq->tq_freelist; 692 tq->tq_freelist = tqe; 693 } else { 694 tq->tq_nalloc--; 695 mutex_exit(&tq->tq_lock); 696 kmem_cache_free(taskq_ent_cache, tqe); 697 mutex_enter(&tq->tq_lock); 698 } 699 } 700 701 /* 702 * Dispatch a task. 703 * 704 * Assumes: func != NULL 705 * 706 * Returns: NULL if dispatch failed. 707 * non-NULL if task dispatched successfully. 708 * Actual return value is the pointer to taskq entry that was used to 709 * dispatch a task. This is useful for debugging. 710 */ 711 /* ARGSUSED */ 712 taskqid_t 713 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) 714 { 715 taskq_ent_t *tqe = NULL; 716 717 ASSERT(tq != NULL); 718 ASSERT(func != NULL); 719 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 720 721 /* 722 * TQ_NOQUEUE flag can't be used with non-dynamic task queues. 723 */ 724 ASSERT(! (flags & TQ_NOQUEUE)); 725 726 /* 727 * Enqueue the task to the underlying queue. 728 */ 729 mutex_enter(&tq->tq_lock); 730 731 TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flags); 732 733 if ((tqe = taskq_ent_alloc(tq, flags)) == NULL) { 734 mutex_exit(&tq->tq_lock); 735 return ((taskqid_t)NULL); 736 } 737 TQ_ENQUEUE(tq, tqe, func, arg); 738 mutex_exit(&tq->tq_lock); 739 return ((taskqid_t)tqe); 740 } 741 742 /* 743 * Wait for all pending tasks to complete. 744 * Calling taskq_wait from a task will cause deadlock. 745 */ 746 void 747 taskq_wait(taskq_t *tq) 748 { 749 750 mutex_enter(&tq->tq_lock); 751 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 752 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 753 mutex_exit(&tq->tq_lock); 754 } 755 756 /* 757 * Suspend execution of tasks. 758 * 759 * Tasks in the queue part will be suspended immediately upon return from this 760 * function. Pending tasks in the dynamic part will continue to execute, but all 761 * new tasks will be suspended. 762 */ 763 void 764 taskq_suspend(taskq_t *tq) 765 { 766 rw_enter(&tq->tq_threadlock, RW_WRITER); 767 768 /* 769 * Mark task queue as being suspended. Needed for taskq_suspended(). 770 */ 771 mutex_enter(&tq->tq_lock); 772 ASSERT(!(tq->tq_flags & TASKQ_SUSPENDED)); 773 tq->tq_flags |= TASKQ_SUSPENDED; 774 mutex_exit(&tq->tq_lock); 775 } 776 777 /* 778 * returns: 1 if tq is suspended, 0 otherwise. 779 */ 780 int 781 taskq_suspended(taskq_t *tq) 782 { 783 return ((tq->tq_flags & TASKQ_SUSPENDED) != 0); 784 } 785 786 /* 787 * Resume taskq execution. 788 */ 789 void 790 taskq_resume(taskq_t *tq) 791 { 792 ASSERT(RW_WRITE_HELD(&tq->tq_threadlock)); 793 794 mutex_enter(&tq->tq_lock); 795 ASSERT(tq->tq_flags & TASKQ_SUSPENDED); 796 tq->tq_flags &= ~TASKQ_SUSPENDED; 797 mutex_exit(&tq->tq_lock); 798 799 rw_exit(&tq->tq_threadlock); 800 } 801 802 int 803 taskq_member(taskq_t *tq, kthread_t *thread) 804 { 805 if (tq->tq_nthreads == 1) 806 return (tq->tq_thread == thread); 807 else { 808 int i, found = 0; 809 810 mutex_enter(&tq->tq_lock); 811 for (i = 0; i < tq->tq_nthreads; i++) { 812 if (tq->tq_threadlist[i] == thread) { 813 found = 1; 814 break; 815 } 816 } 817 mutex_exit(&tq->tq_lock); 818 return (found); 819 } 820 } 821 822 /* 823 * Worker thread for processing task queue. 824 */ 825 static void 826 taskq_thread(void *arg) 827 { 828 taskq_t *tq = arg; 829 taskq_ent_t *tqe; 830 callb_cpr_t cprinfo; 831 hrtime_t start, end; 832 833 CALLB_CPR_INIT(&cprinfo, &tq->tq_lock, callb_generic_cpr, tq->tq_name); 834 835 mutex_enter(&tq->tq_lock); 836 while (tq->tq_flags & TASKQ_ACTIVE) { 837 if ((tqe = tq->tq_task.tqent_next) == &tq->tq_task) { 838 if (--tq->tq_active == 0) 839 cv_broadcast(&tq->tq_wait_cv); 840 if (tq->tq_flags & TASKQ_CPR_SAFE) { 841 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 842 } else { 843 CALLB_CPR_SAFE_BEGIN(&cprinfo); 844 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 845 CALLB_CPR_SAFE_END(&cprinfo, &tq->tq_lock); 846 } 847 tq->tq_active++; 848 continue; 849 } 850 tqe->tqent_prev->tqent_next = tqe->tqent_next; 851 tqe->tqent_next->tqent_prev = tqe->tqent_prev; 852 mutex_exit(&tq->tq_lock); 853 854 rw_enter(&tq->tq_threadlock, RW_READER); 855 start = gethrtime(); 856 DTRACE_PROBE2(taskq__exec__start, taskq_t *, tq, 857 taskq_ent_t *, tqe); 858 tqe->tqent_func(tqe->tqent_arg); 859 DTRACE_PROBE2(taskq__exec__end, taskq_t *, tq, 860 taskq_ent_t *, tqe); 861 end = gethrtime(); 862 rw_exit(&tq->tq_threadlock); 863 864 mutex_enter(&tq->tq_lock); 865 tq->tq_totaltime += end - start; 866 tq->tq_executed++; 867 868 taskq_ent_free(tq, tqe); 869 } 870 tq->tq_nthreads--; 871 cv_broadcast(&tq->tq_wait_cv); 872 ASSERT(!(tq->tq_flags & TASKQ_CPR_SAFE)); 873 CALLB_CPR_EXIT(&cprinfo); 874 thread_exit(); 875 } 876 877 /* 878 * Taskq creation. May sleep for memory. 879 * Always use automatically generated instances to avoid kstat name space 880 * collisions. 881 */ 882 883 taskq_t * 884 taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, 885 int maxalloc, uint_t flags) 886 { 887 return taskq_create_common(name, 0, nthreads, pri, minalloc, 888 maxalloc, flags | TASKQ_NOINSTANCE); 889 } 890 891 static taskq_t * 892 taskq_create_common(const char *name, int instance, int nthreads, pri_t pri, 893 int minalloc, int maxalloc, uint_t flags) 894 { 895 taskq_t *tq = kmem_cache_alloc(taskq_cache, KM_NOSLEEP); 896 uint_t ncpus = ((boot_max_ncpus == -1) ? max_ncpus : boot_max_ncpus); 897 uint_t bsize; /* # of buckets - always power of 2 */ 898 899 ASSERT(instance == 0); 900 ASSERT(flags == TASKQ_PREPOPULATE | TASKQ_NOINSTANCE); 901 902 /* 903 * TASKQ_CPR_SAFE and TASKQ_DYNAMIC flags are mutually exclusive. 904 */ 905 ASSERT((flags & (TASKQ_DYNAMIC | TASKQ_CPR_SAFE)) != 906 ((TASKQ_DYNAMIC | TASKQ_CPR_SAFE))); 907 908 ASSERT(tq->tq_buckets == NULL); 909 910 bsize = 1 << (highbit(ncpus) - 1); 911 ASSERT(bsize >= 1); 912 bsize = MIN(bsize, taskq_maxbuckets); 913 914 tq->tq_maxsize = nthreads; 915 916 (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1); 917 tq->tq_name[TASKQ_NAMELEN] = '\0'; 918 /* Make sure the name conforms to the rules for C indentifiers */ 919 strident_canon(tq->tq_name, TASKQ_NAMELEN); 920 921 tq->tq_flags = flags | TASKQ_ACTIVE; 922 tq->tq_active = nthreads; 923 tq->tq_nthreads = nthreads; 924 tq->tq_minalloc = minalloc; 925 tq->tq_maxalloc = maxalloc; 926 tq->tq_nbuckets = bsize; 927 tq->tq_pri = pri; 928 929 if (flags & TASKQ_PREPOPULATE) { 930 mutex_enter(&tq->tq_lock); 931 while (minalloc-- > 0) 932 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP)); 933 mutex_exit(&tq->tq_lock); 934 } 935 936 if (nthreads == 1) { 937 tq->tq_thread = thread_create(NULL, 0, taskq_thread, tq, 938 0, NULL, TS_RUN, pri); 939 } else { 940 kthread_t **tpp = kmem_alloc(sizeof (kthread_t *) * nthreads, 941 KM_SLEEP); 942 943 tq->tq_threadlist = tpp; 944 945 mutex_enter(&tq->tq_lock); 946 while (nthreads-- > 0) { 947 *tpp = thread_create(NULL, 0, taskq_thread, tq, 948 0, NULL, TS_RUN, pri); 949 tpp++; 950 } 951 mutex_exit(&tq->tq_lock); 952 } 953 954 return (tq); 955 } 956 957 /* 958 * taskq_destroy(). 959 * 960 * Assumes: by the time taskq_destroy is called no one will use this task queue 961 * in any way and no one will try to dispatch entries in it. 962 */ 963 void 964 taskq_destroy(taskq_t *tq) 965 { 966 taskq_bucket_t *b = tq->tq_buckets; 967 int bid = 0; 968 969 ASSERT(! (tq->tq_flags & TASKQ_CPR_SAFE)); 970 971 /* 972 * Wait for any pending entries to complete. 973 */ 974 taskq_wait(tq); 975 976 mutex_enter(&tq->tq_lock); 977 ASSERT((tq->tq_task.tqent_next == &tq->tq_task) && 978 (tq->tq_active == 0)); 979 980 if ((tq->tq_nthreads > 1) && (tq->tq_threadlist != NULL)) 981 kmem_free(tq->tq_threadlist, sizeof (kthread_t *) * 982 tq->tq_nthreads); 983 984 tq->tq_flags &= ~TASKQ_ACTIVE; 985 cv_broadcast(&tq->tq_dispatch_cv); 986 while (tq->tq_nthreads != 0) 987 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 988 989 tq->tq_minalloc = 0; 990 while (tq->tq_nalloc != 0) 991 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP)); 992 993 mutex_exit(&tq->tq_lock); 994 995 /* 996 * Mark each bucket as closing and wakeup all sleeping threads. 997 */ 998 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) { 999 taskq_ent_t *tqe; 1000 1001 mutex_enter(&b->tqbucket_lock); 1002 1003 b->tqbucket_flags |= TQBUCKET_CLOSE; 1004 /* Wakeup all sleeping threads */ 1005 1006 for (tqe = b->tqbucket_freelist.tqent_next; 1007 tqe != &b->tqbucket_freelist; tqe = tqe->tqent_next) 1008 cv_signal(&tqe->tqent_cv); 1009 1010 ASSERT(b->tqbucket_nalloc == 0); 1011 1012 /* 1013 * At this point we waited for all pending jobs to complete (in 1014 * both the task queue and the bucket and no new jobs should 1015 * arrive. Wait for all threads to die. 1016 */ 1017 while (b->tqbucket_nfree > 0) 1018 cv_wait(&b->tqbucket_cv, &b->tqbucket_lock); 1019 mutex_exit(&b->tqbucket_lock); 1020 mutex_destroy(&b->tqbucket_lock); 1021 cv_destroy(&b->tqbucket_cv); 1022 } 1023 1024 if (tq->tq_buckets != NULL) { 1025 ASSERT(tq->tq_flags & TASKQ_DYNAMIC); 1026 kmem_free(tq->tq_buckets, 1027 sizeof (taskq_bucket_t) * tq->tq_nbuckets); 1028 1029 /* Cleanup fields before returning tq to the cache */ 1030 tq->tq_buckets = NULL; 1031 tq->tq_tcreates = 0; 1032 tq->tq_tdeaths = 0; 1033 } else { 1034 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 1035 } 1036 1037 tq->tq_totaltime = 0; 1038 tq->tq_tasks = 0; 1039 tq->tq_maxtasks = 0; 1040 tq->tq_executed = 0; 1041 kmem_cache_free(taskq_cache, tq); 1042 } 1043