1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 /* 26 * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. 27 * Copyright 2013 Nexenta Systems, Inc. All rights reserved. 28 * Copyright 2017 RackTop Systems. 29 * Copyright 2018, Joyent, Inc. 30 */ 31 32 #include <sys/taskq_impl.h> 33 34 #include <sys/class.h> 35 #include <sys/debug.h> 36 #include <sys/ksynch.h> 37 #include <sys/kmem.h> 38 #include <sys/time.h> 39 #include <sys/systm.h> 40 #include <sys/sysmacros.h> 41 #include <sys/unistd.h> 42 43 /* avoid <sys/disp.h> */ 44 #define maxclsyspri 99 45 46 /* avoid <unistd.h> */ 47 extern long sysconf(int); 48 49 /* avoiding <thread.h> */ 50 typedef unsigned int thread_t; 51 typedef unsigned int thread_key_t; 52 53 extern int thr_create(void *, size_t, void *(*)(void *), void *, long, 54 thread_t *); 55 extern int thr_join(thread_t, thread_t *, void **); 56 57 /* 58 * POSIX.1c Note: 59 * THR_BOUND is defined same as PTHREAD_SCOPE_SYSTEM in <pthread.h> 60 * THR_DETACHED is defined same as PTHREAD_CREATE_DETACHED in <pthread.h> 61 * Any changes in these definitions should be reflected in <pthread.h> 62 */ 63 #define THR_BOUND 0x00000001 /* = PTHREAD_SCOPE_SYSTEM */ 64 #define THR_NEW_LWP 0x00000002 65 #define THR_DETACHED 0x00000040 /* = PTHREAD_CREATE_DETACHED */ 66 #define THR_SUSPENDED 0x00000080 67 #define THR_DAEMON 0x00000100 68 69 70 int taskq_now; 71 taskq_t *system_taskq; 72 73 #define TASKQ_ACTIVE 0x00010000 74 75 struct taskq { 76 kmutex_t tq_lock; 77 krwlock_t tq_threadlock; 78 kcondvar_t tq_dispatch_cv; 79 kcondvar_t tq_wait_cv; 80 thread_t *tq_threadlist; 81 int tq_flags; 82 int tq_active; 83 int tq_nthreads; 84 int tq_nalloc; 85 int tq_minalloc; 86 int tq_maxalloc; 87 kcondvar_t tq_maxalloc_cv; 88 int tq_maxalloc_wait; 89 taskq_ent_t *tq_freelist; 90 taskq_ent_t tq_task; 91 }; 92 93 static taskq_ent_t * 94 task_alloc(taskq_t *tq, int tqflags) 95 { 96 taskq_ent_t *t; 97 int rv; 98 99 again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 100 tq->tq_freelist = t->tqent_next; 101 } else { 102 if (tq->tq_nalloc >= tq->tq_maxalloc) { 103 if (tqflags & KM_NOSLEEP) 104 return (NULL); 105 106 /* 107 * We don't want to exceed tq_maxalloc, but we can't 108 * wait for other tasks to complete (and thus free up 109 * task structures) without risking deadlock with 110 * the caller. So, we just delay for one second 111 * to throttle the allocation rate. If we have tasks 112 * complete before one second timeout expires then 113 * taskq_ent_free will signal us and we will 114 * immediately retry the allocation. 115 */ 116 tq->tq_maxalloc_wait++; 117 rv = cv_timedwait(&tq->tq_maxalloc_cv, 118 &tq->tq_lock, ddi_get_lbolt() + hz); 119 tq->tq_maxalloc_wait--; 120 if (rv > 0) 121 goto again; /* signaled */ 122 } 123 mutex_exit(&tq->tq_lock); 124 125 t = kmem_alloc(sizeof (taskq_ent_t), tqflags); 126 127 mutex_enter(&tq->tq_lock); 128 if (t != NULL) 129 tq->tq_nalloc++; 130 } 131 return (t); 132 } 133 134 static void 135 task_free(taskq_t *tq, taskq_ent_t *t) 136 { 137 if (tq->tq_nalloc <= tq->tq_minalloc) { 138 t->tqent_next = tq->tq_freelist; 139 tq->tq_freelist = t; 140 } else { 141 tq->tq_nalloc--; 142 mutex_exit(&tq->tq_lock); 143 kmem_free(t, sizeof (taskq_ent_t)); 144 mutex_enter(&tq->tq_lock); 145 } 146 147 if (tq->tq_maxalloc_wait) 148 cv_signal(&tq->tq_maxalloc_cv); 149 } 150 151 taskqid_t 152 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 153 { 154 taskq_ent_t *t; 155 156 if (taskq_now) { 157 func(arg); 158 return (1); 159 } 160 161 mutex_enter(&tq->tq_lock); 162 ASSERT(tq->tq_flags & TASKQ_ACTIVE); 163 if ((t = task_alloc(tq, tqflags)) == NULL) { 164 mutex_exit(&tq->tq_lock); 165 return (TASKQID_INVALID); 166 } 167 if (tqflags & TQ_FRONT) { 168 t->tqent_next = tq->tq_task.tqent_next; 169 t->tqent_prev = &tq->tq_task; 170 } else { 171 t->tqent_next = &tq->tq_task; 172 t->tqent_prev = tq->tq_task.tqent_prev; 173 } 174 t->tqent_next->tqent_prev = t; 175 t->tqent_prev->tqent_next = t; 176 t->tqent_func = func; 177 t->tqent_arg = arg; 178 t->tqent_flags = 0; 179 cv_signal(&tq->tq_dispatch_cv); 180 mutex_exit(&tq->tq_lock); 181 return (1); 182 } 183 184 void 185 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 186 taskq_ent_t *t) 187 { 188 ASSERT(func != NULL); 189 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 190 191 /* 192 * Mark it as a prealloc'd task. This is important 193 * to ensure that we don't free it later. 194 */ 195 t->tqent_flags |= TQENT_FLAG_PREALLOC; 196 /* 197 * Enqueue the task to the underlying queue. 198 */ 199 mutex_enter(&tq->tq_lock); 200 201 if (flags & TQ_FRONT) { 202 t->tqent_next = tq->tq_task.tqent_next; 203 t->tqent_prev = &tq->tq_task; 204 } else { 205 t->tqent_next = &tq->tq_task; 206 t->tqent_prev = tq->tq_task.tqent_prev; 207 } 208 t->tqent_next->tqent_prev = t; 209 t->tqent_prev->tqent_next = t; 210 t->tqent_func = func; 211 t->tqent_arg = arg; 212 cv_signal(&tq->tq_dispatch_cv); 213 mutex_exit(&tq->tq_lock); 214 } 215 216 boolean_t 217 taskq_empty(taskq_t *tq) 218 { 219 boolean_t rv; 220 221 mutex_enter(&tq->tq_lock); 222 rv = (tq->tq_task.tqent_next == &tq->tq_task) && (tq->tq_active == 0); 223 mutex_exit(&tq->tq_lock); 224 225 return (rv); 226 } 227 228 void 229 taskq_wait(taskq_t *tq) 230 { 231 mutex_enter(&tq->tq_lock); 232 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 233 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 234 mutex_exit(&tq->tq_lock); 235 } 236 237 void 238 taskq_wait_id(taskq_t *tq, taskqid_t id __unused) 239 { 240 taskq_wait(tq); 241 } 242 243 static void * 244 taskq_thread(void *arg) 245 { 246 taskq_t *tq = arg; 247 taskq_ent_t *t; 248 boolean_t prealloc; 249 250 mutex_enter(&tq->tq_lock); 251 while (tq->tq_flags & TASKQ_ACTIVE) { 252 if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { 253 if (--tq->tq_active == 0) 254 cv_broadcast(&tq->tq_wait_cv); 255 cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 256 tq->tq_active++; 257 continue; 258 } 259 t->tqent_prev->tqent_next = t->tqent_next; 260 t->tqent_next->tqent_prev = t->tqent_prev; 261 t->tqent_next = NULL; 262 t->tqent_prev = NULL; 263 prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; 264 mutex_exit(&tq->tq_lock); 265 266 rw_enter(&tq->tq_threadlock, RW_READER); 267 t->tqent_func(t->tqent_arg); 268 rw_exit(&tq->tq_threadlock); 269 270 mutex_enter(&tq->tq_lock); 271 if (!prealloc) 272 task_free(tq, t); 273 } 274 tq->tq_nthreads--; 275 cv_broadcast(&tq->tq_wait_cv); 276 mutex_exit(&tq->tq_lock); 277 return (NULL); 278 } 279 280 /*ARGSUSED*/ 281 taskq_t * 282 taskq_create(const char *name, int nthr, pri_t pri, int minalloc, 283 int maxalloc, uint_t flags) 284 { 285 return (taskq_create_proc(name, nthr, pri, 286 minalloc, maxalloc, NULL, flags)); 287 } 288 289 /*ARGSUSED*/ 290 taskq_t * 291 taskq_create_sysdc(const char *name, int nthr, int minalloc, 292 int maxalloc, proc_t *proc, uint_t dc, uint_t flags) 293 { 294 return (taskq_create_proc(name, nthr, maxclsyspri, 295 minalloc, maxalloc, proc, flags)); 296 } 297 298 /*ARGSUSED*/ 299 taskq_t * 300 taskq_create_proc(const char *name, int nthreads, pri_t pri, 301 int minalloc, int maxalloc, proc_t *proc, uint_t flags) 302 { 303 taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 304 int t; 305 306 if (flags & TASKQ_THREADS_CPU_PCT) { 307 int pct; 308 ASSERT3S(nthreads, >=, 0); 309 ASSERT3S(nthreads, <=, 100); 310 pct = MIN(nthreads, 100); 311 pct = MAX(pct, 0); 312 313 nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 314 nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 315 } else { 316 ASSERT3S(nthreads, >=, 1); 317 } 318 319 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 320 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 321 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 322 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 323 cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 324 tq->tq_flags = flags | TASKQ_ACTIVE; 325 tq->tq_active = nthreads; 326 tq->tq_nthreads = nthreads; 327 tq->tq_minalloc = minalloc; 328 tq->tq_maxalloc = maxalloc; 329 tq->tq_task.tqent_next = &tq->tq_task; 330 tq->tq_task.tqent_prev = &tq->tq_task; 331 tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 332 333 if (flags & TASKQ_PREPOPULATE) { 334 mutex_enter(&tq->tq_lock); 335 while (minalloc-- > 0) 336 task_free(tq, task_alloc(tq, KM_SLEEP)); 337 mutex_exit(&tq->tq_lock); 338 } 339 340 for (t = 0; t < nthreads; t++) 341 (void) thr_create(0, 0, taskq_thread, 342 tq, THR_BOUND, &tq->tq_threadlist[t]); 343 344 return (tq); 345 } 346 347 void 348 taskq_destroy(taskq_t *tq) 349 { 350 int t; 351 int nthreads = tq->tq_nthreads; 352 353 taskq_wait(tq); 354 355 mutex_enter(&tq->tq_lock); 356 357 tq->tq_flags &= ~TASKQ_ACTIVE; 358 cv_broadcast(&tq->tq_dispatch_cv); 359 360 while (tq->tq_nthreads != 0) 361 cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 362 363 tq->tq_minalloc = 0; 364 while (tq->tq_nalloc != 0) { 365 ASSERT(tq->tq_freelist != NULL); 366 task_free(tq, task_alloc(tq, KM_SLEEP)); 367 } 368 369 mutex_exit(&tq->tq_lock); 370 371 for (t = 0; t < nthreads; t++) 372 (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 373 374 kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 375 376 rw_destroy(&tq->tq_threadlock); 377 mutex_destroy(&tq->tq_lock); 378 cv_destroy(&tq->tq_dispatch_cv); 379 cv_destroy(&tq->tq_wait_cv); 380 cv_destroy(&tq->tq_maxalloc_cv); 381 382 kmem_free(tq, sizeof (taskq_t)); 383 } 384 385 int 386 taskq_member(taskq_t *tq, struct _kthread *t) 387 { 388 int i; 389 390 if (taskq_now) 391 return (1); 392 393 for (i = 0; i < tq->tq_nthreads; i++) 394 if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 395 return (1); 396 397 return (0); 398 } 399 400 void 401 system_taskq_init(void) 402 { 403 system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512, 404 TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 405 } 406 407 void 408 system_taskq_fini(void) 409 { 410 taskq_destroy(system_taskq); 411 system_taskq = NULL; /* defensive */ 412 } 413