1 /* 2 * Copyright (c) 2009 Pawel Jakub Dawidek <pjd@FreeBSD.org> 3 * All rights reserved. 4 * 5 * Copyright (c) 2012 Spectra Logic Corporation. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 #include <sys/param.h> 30 #include <sys/kernel.h> 31 #include <sys/kmem.h> 32 #include <sys/lock.h> 33 #include <sys/mutex.h> 34 #include <sys/queue.h> 35 #include <sys/taskq.h> 36 #include <sys/taskqueue.h> 37 #include <sys/zfs_context.h> 38 39 #if defined(__i386__) || defined(__amd64__) || defined(__aarch64__) 40 #include <machine/pcb.h> 41 #endif 42 43 #include <vm/uma.h> 44 45 #if __FreeBSD_version < 1201522 46 #define taskqueue_start_threads_in_proc(tqp, count, pri, proc, name, ...) \ 47 taskqueue_start_threads(tqp, count, pri, name, __VA_ARGS__) 48 #endif 49 50 static uint_t taskq_tsd; 51 static uma_zone_t taskq_zone; 52 53 /* 54 * Global system-wide dynamic task queue available for all consumers. This 55 * taskq is not intended for long-running tasks; instead, a dedicated taskq 56 * should be created. 57 */ 58 taskq_t *system_taskq = NULL; 59 taskq_t *system_delay_taskq = NULL; 60 taskq_t *dynamic_taskq = NULL; 61 62 proc_t *system_proc; 63 64 static MALLOC_DEFINE(M_TASKQ, "taskq", "taskq structures"); 65 66 static LIST_HEAD(tqenthashhead, taskq_ent) *tqenthashtbl; 67 static unsigned long tqenthash; 68 static unsigned long tqenthashlock; 69 static struct sx *tqenthashtbl_lock; 70 71 static taskqid_t tqidnext; 72 73 #define TQIDHASH(tqid) (&tqenthashtbl[(tqid) & tqenthash]) 74 #define TQIDHASHLOCK(tqid) (&tqenthashtbl_lock[((tqid) & tqenthashlock)]) 75 76 #define NORMAL_TASK 0 77 #define TIMEOUT_TASK 1 78 79 static void 80 system_taskq_init(void *arg) 81 { 82 int i; 83 84 tsd_create(&taskq_tsd, NULL); 85 tqenthashtbl = hashinit(mp_ncpus * 8, M_TASKQ, &tqenthash); 86 tqenthashlock = (tqenthash + 1) / 8; 87 if (tqenthashlock > 0) 88 tqenthashlock--; 89 tqenthashtbl_lock = 90 malloc(sizeof (*tqenthashtbl_lock) * (tqenthashlock + 1), 91 M_TASKQ, M_WAITOK | M_ZERO); 92 for (i = 0; i < tqenthashlock + 1; i++) 93 sx_init_flags(&tqenthashtbl_lock[i], "tqenthash", SX_DUPOK); 94 taskq_zone = uma_zcreate("taskq_zone", sizeof (taskq_ent_t), 95 NULL, NULL, NULL, NULL, 96 UMA_ALIGN_CACHE, 0); 97 system_taskq = taskq_create("system_taskq", mp_ncpus, minclsyspri, 98 0, 0, 0); 99 system_delay_taskq = taskq_create("system_delay_taskq", mp_ncpus, 100 minclsyspri, 0, 0, 0); 101 } 102 SYSINIT(system_taskq_init, SI_SUB_CONFIGURE, SI_ORDER_ANY, system_taskq_init, 103 NULL); 104 105 static void 106 system_taskq_fini(void *arg) 107 { 108 int i; 109 110 taskq_destroy(system_delay_taskq); 111 taskq_destroy(system_taskq); 112 uma_zdestroy(taskq_zone); 113 tsd_destroy(&taskq_tsd); 114 for (i = 0; i < tqenthashlock + 1; i++) 115 sx_destroy(&tqenthashtbl_lock[i]); 116 for (i = 0; i < tqenthash + 1; i++) 117 VERIFY(LIST_EMPTY(&tqenthashtbl[i])); 118 free(tqenthashtbl_lock, M_TASKQ); 119 free(tqenthashtbl, M_TASKQ); 120 } 121 SYSUNINIT(system_taskq_fini, SI_SUB_CONFIGURE, SI_ORDER_ANY, system_taskq_fini, 122 NULL); 123 124 #ifdef __LP64__ 125 static taskqid_t 126 __taskq_genid(void) 127 { 128 taskqid_t tqid; 129 130 /* 131 * Assume a 64-bit counter will not wrap in practice. 132 */ 133 tqid = atomic_add_64_nv(&tqidnext, 1); 134 VERIFY(tqid); 135 return (tqid); 136 } 137 #else 138 static taskqid_t 139 __taskq_genid(void) 140 { 141 taskqid_t tqid; 142 143 for (;;) { 144 tqid = atomic_add_32_nv(&tqidnext, 1); 145 if (__predict_true(tqid != 0)) 146 break; 147 } 148 VERIFY(tqid); 149 return (tqid); 150 } 151 #endif 152 153 static taskq_ent_t * 154 taskq_lookup(taskqid_t tqid) 155 { 156 taskq_ent_t *ent = NULL; 157 158 if (tqid == 0) 159 return (NULL); 160 sx_slock(TQIDHASHLOCK(tqid)); 161 LIST_FOREACH(ent, TQIDHASH(tqid), tqent_hash) { 162 if (ent->tqent_id == tqid) 163 break; 164 } 165 if (ent != NULL) 166 refcount_acquire(&ent->tqent_rc); 167 sx_sunlock(TQIDHASHLOCK(tqid)); 168 return (ent); 169 } 170 171 static taskqid_t 172 taskq_insert(taskq_ent_t *ent) 173 { 174 taskqid_t tqid = __taskq_genid(); 175 176 ent->tqent_id = tqid; 177 sx_xlock(TQIDHASHLOCK(tqid)); 178 LIST_INSERT_HEAD(TQIDHASH(tqid), ent, tqent_hash); 179 sx_xunlock(TQIDHASHLOCK(tqid)); 180 return (tqid); 181 } 182 183 static void 184 taskq_remove(taskq_ent_t *ent) 185 { 186 taskqid_t tqid = ent->tqent_id; 187 188 if (tqid == 0) 189 return; 190 sx_xlock(TQIDHASHLOCK(tqid)); 191 if (ent->tqent_id != 0) { 192 LIST_REMOVE(ent, tqent_hash); 193 ent->tqent_id = 0; 194 } 195 sx_xunlock(TQIDHASHLOCK(tqid)); 196 } 197 198 static void 199 taskq_tsd_set(void *context) 200 { 201 taskq_t *tq = context; 202 203 #if defined(__amd64__) || defined(__aarch64__) 204 if (context != NULL && tsd_get(taskq_tsd) == NULL) 205 fpu_kern_thread(FPU_KERN_NORMAL); 206 #endif 207 tsd_set(taskq_tsd, tq); 208 } 209 210 static taskq_t * 211 taskq_create_impl(const char *name, int nthreads, pri_t pri, 212 proc_t *proc __maybe_unused, uint_t flags) 213 { 214 taskq_t *tq; 215 216 if ((flags & TASKQ_THREADS_CPU_PCT) != 0) 217 nthreads = MAX((mp_ncpus * nthreads) / 100, 1); 218 219 tq = kmem_alloc(sizeof (*tq), KM_SLEEP); 220 tq->tq_nthreads = nthreads; 221 tq->tq_queue = taskqueue_create(name, M_WAITOK, 222 taskqueue_thread_enqueue, &tq->tq_queue); 223 taskqueue_set_callback(tq->tq_queue, TASKQUEUE_CALLBACK_TYPE_INIT, 224 taskq_tsd_set, tq); 225 taskqueue_set_callback(tq->tq_queue, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN, 226 taskq_tsd_set, NULL); 227 (void) taskqueue_start_threads_in_proc(&tq->tq_queue, nthreads, pri, 228 proc, "%s", name); 229 230 return ((taskq_t *)tq); 231 } 232 233 taskq_t * 234 taskq_create(const char *name, int nthreads, pri_t pri, int minalloc __unused, 235 int maxalloc __unused, uint_t flags) 236 { 237 return (taskq_create_impl(name, nthreads, pri, system_proc, flags)); 238 } 239 240 taskq_t * 241 taskq_create_proc(const char *name, int nthreads, pri_t pri, 242 int minalloc __unused, int maxalloc __unused, proc_t *proc, uint_t flags) 243 { 244 return (taskq_create_impl(name, nthreads, pri, proc, flags)); 245 } 246 247 void 248 taskq_destroy(taskq_t *tq) 249 { 250 251 taskqueue_free(tq->tq_queue); 252 kmem_free(tq, sizeof (*tq)); 253 } 254 255 static void taskq_sync_assign(void *arg); 256 257 typedef struct taskq_sync_arg { 258 kthread_t *tqa_thread; 259 kcondvar_t tqa_cv; 260 kmutex_t tqa_lock; 261 int tqa_ready; 262 } taskq_sync_arg_t; 263 264 static void 265 taskq_sync_assign(void *arg) 266 { 267 taskq_sync_arg_t *tqa = arg; 268 269 mutex_enter(&tqa->tqa_lock); 270 tqa->tqa_thread = curthread; 271 tqa->tqa_ready = 1; 272 cv_signal(&tqa->tqa_cv); 273 while (tqa->tqa_ready == 1) 274 cv_wait(&tqa->tqa_cv, &tqa->tqa_lock); 275 mutex_exit(&tqa->tqa_lock); 276 } 277 278 /* 279 * Create a taskq with a specified number of pool threads. Allocate 280 * and return an array of nthreads kthread_t pointers, one for each 281 * thread in the pool. The array is not ordered and must be freed 282 * by the caller. 283 */ 284 taskq_t * 285 taskq_create_synced(const char *name, int nthreads, pri_t pri, 286 int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp) 287 { 288 taskq_t *tq; 289 taskq_sync_arg_t *tqs = kmem_zalloc(sizeof (*tqs) * nthreads, KM_SLEEP); 290 kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads, 291 KM_SLEEP); 292 293 flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH); 294 295 tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX, 296 flags | TASKQ_PREPOPULATE); 297 VERIFY(tq != NULL); 298 VERIFY(tq->tq_nthreads == nthreads); 299 300 /* spawn all syncthreads */ 301 for (int i = 0; i < nthreads; i++) { 302 cv_init(&tqs[i].tqa_cv, NULL, CV_DEFAULT, NULL); 303 mutex_init(&tqs[i].tqa_lock, NULL, MUTEX_DEFAULT, NULL); 304 (void) taskq_dispatch(tq, taskq_sync_assign, 305 &tqs[i], TQ_FRONT); 306 } 307 308 /* wait on all syncthreads to start */ 309 for (int i = 0; i < nthreads; i++) { 310 mutex_enter(&tqs[i].tqa_lock); 311 while (tqs[i].tqa_ready == 0) 312 cv_wait(&tqs[i].tqa_cv, &tqs[i].tqa_lock); 313 mutex_exit(&tqs[i].tqa_lock); 314 } 315 316 /* let all syncthreads resume, finish */ 317 for (int i = 0; i < nthreads; i++) { 318 mutex_enter(&tqs[i].tqa_lock); 319 tqs[i].tqa_ready = 2; 320 cv_broadcast(&tqs[i].tqa_cv); 321 mutex_exit(&tqs[i].tqa_lock); 322 } 323 taskq_wait(tq); 324 325 for (int i = 0; i < nthreads; i++) { 326 kthreads[i] = tqs[i].tqa_thread; 327 mutex_destroy(&tqs[i].tqa_lock); 328 cv_destroy(&tqs[i].tqa_cv); 329 } 330 kmem_free(tqs, sizeof (*tqs) * nthreads); 331 332 *ktpp = kthreads; 333 return (tq); 334 } 335 336 int 337 taskq_member(taskq_t *tq, kthread_t *thread) 338 { 339 340 return (taskqueue_member(tq->tq_queue, thread)); 341 } 342 343 taskq_t * 344 taskq_of_curthread(void) 345 { 346 return (tsd_get(taskq_tsd)); 347 } 348 349 static void 350 taskq_free(taskq_ent_t *task) 351 { 352 taskq_remove(task); 353 if (refcount_release(&task->tqent_rc)) 354 uma_zfree(taskq_zone, task); 355 } 356 357 int 358 taskq_cancel_id(taskq_t *tq, taskqid_t tid) 359 { 360 uint32_t pend; 361 int rc; 362 taskq_ent_t *ent; 363 364 if ((ent = taskq_lookup(tid)) == NULL) 365 return (0); 366 367 if (ent->tqent_type == NORMAL_TASK) { 368 rc = taskqueue_cancel(tq->tq_queue, &ent->tqent_task, &pend); 369 if (rc == EBUSY) 370 taskqueue_drain(tq->tq_queue, &ent->tqent_task); 371 } else { 372 rc = taskqueue_cancel_timeout(tq->tq_queue, 373 &ent->tqent_timeout_task, &pend); 374 if (rc == EBUSY) { 375 taskqueue_drain_timeout(tq->tq_queue, 376 &ent->tqent_timeout_task); 377 } 378 } 379 if (pend) { 380 /* 381 * Tasks normally free themselves when run, but here the task 382 * was cancelled so it did not free itself. 383 */ 384 taskq_free(ent); 385 } 386 /* Free the extra reference we added with taskq_lookup. */ 387 taskq_free(ent); 388 return (rc); 389 } 390 391 static void 392 taskq_run(void *arg, int pending) 393 { 394 taskq_ent_t *task = arg; 395 396 if (pending == 0) 397 return; 398 task->tqent_func(task->tqent_arg); 399 taskq_free(task); 400 } 401 402 taskqid_t 403 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, 404 uint_t flags, clock_t expire_time) 405 { 406 taskq_ent_t *task; 407 taskqid_t tqid; 408 clock_t timo; 409 int mflag; 410 411 timo = expire_time - ddi_get_lbolt(); 412 if (timo <= 0) 413 return (taskq_dispatch(tq, func, arg, flags)); 414 415 if ((flags & (TQ_SLEEP | TQ_NOQUEUE)) == TQ_SLEEP) 416 mflag = M_WAITOK; 417 else 418 mflag = M_NOWAIT; 419 420 task = uma_zalloc(taskq_zone, mflag); 421 if (task == NULL) 422 return (0); 423 task->tqent_func = func; 424 task->tqent_arg = arg; 425 task->tqent_type = TIMEOUT_TASK; 426 refcount_init(&task->tqent_rc, 1); 427 tqid = taskq_insert(task); 428 TIMEOUT_TASK_INIT(tq->tq_queue, &task->tqent_timeout_task, 0, 429 taskq_run, task); 430 431 taskqueue_enqueue_timeout(tq->tq_queue, &task->tqent_timeout_task, 432 timo); 433 return (tqid); 434 } 435 436 taskqid_t 437 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) 438 { 439 taskq_ent_t *task; 440 int mflag, prio; 441 taskqid_t tqid; 442 443 if ((flags & (TQ_SLEEP | TQ_NOQUEUE)) == TQ_SLEEP) 444 mflag = M_WAITOK; 445 else 446 mflag = M_NOWAIT; 447 /* 448 * If TQ_FRONT is given, we want higher priority for this task, so it 449 * can go at the front of the queue. 450 */ 451 prio = !!(flags & TQ_FRONT); 452 453 task = uma_zalloc(taskq_zone, mflag); 454 if (task == NULL) 455 return (0); 456 refcount_init(&task->tqent_rc, 1); 457 task->tqent_func = func; 458 task->tqent_arg = arg; 459 task->tqent_type = NORMAL_TASK; 460 tqid = taskq_insert(task); 461 TASK_INIT(&task->tqent_task, prio, taskq_run, task); 462 taskqueue_enqueue(tq->tq_queue, &task->tqent_task); 463 return (tqid); 464 } 465 466 static void 467 taskq_run_ent(void *arg, int pending) 468 { 469 taskq_ent_t *task = arg; 470 471 if (pending == 0) 472 return; 473 task->tqent_func(task->tqent_arg); 474 } 475 476 void 477 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint32_t flags, 478 taskq_ent_t *task) 479 { 480 /* 481 * If TQ_FRONT is given, we want higher priority for this task, so it 482 * can go at the front of the queue. 483 */ 484 task->tqent_task.ta_priority = !!(flags & TQ_FRONT); 485 task->tqent_func = func; 486 task->tqent_arg = arg; 487 taskqueue_enqueue(tq->tq_queue, &task->tqent_task); 488 } 489 490 void 491 taskq_init_ent(taskq_ent_t *task) 492 { 493 TASK_INIT(&task->tqent_task, 0, taskq_run_ent, task); 494 task->tqent_func = NULL; 495 task->tqent_arg = NULL; 496 task->tqent_id = 0; 497 task->tqent_type = NORMAL_TASK; 498 task->tqent_rc = 0; 499 } 500 501 int 502 taskq_empty_ent(taskq_ent_t *task) 503 { 504 return (task->tqent_task.ta_pending == 0); 505 } 506 507 void 508 taskq_wait(taskq_t *tq) 509 { 510 taskqueue_quiesce(tq->tq_queue); 511 } 512 513 void 514 taskq_wait_id(taskq_t *tq, taskqid_t tid) 515 { 516 taskq_ent_t *ent; 517 518 if ((ent = taskq_lookup(tid)) == NULL) 519 return; 520 521 if (ent->tqent_type == NORMAL_TASK) 522 taskqueue_drain(tq->tq_queue, &ent->tqent_task); 523 else 524 taskqueue_drain_timeout(tq->tq_queue, &ent->tqent_timeout_task); 525 taskq_free(ent); 526 } 527 528 void 529 taskq_wait_outstanding(taskq_t *tq, taskqid_t id __unused) 530 { 531 taskqueue_drain_all(tq->tq_queue); 532 } 533