1 /* $OpenBSD: kern_task.c,v 1.31 2020/08/01 08:40:20 anton Exp $ */ 2 3 /* 4 * Copyright (c) 2013 David Gwynne <dlg@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/param.h> 20 #include <sys/systm.h> 21 #include <sys/malloc.h> 22 #include <sys/mutex.h> 23 #include <sys/kthread.h> 24 #include <sys/task.h> 25 #include <sys/proc.h> 26 #include <sys/witness.h> 27 28 #include "kcov.h" 29 #if NKCOV > 0 30 #include <sys/kcov.h> 31 #endif 32 33 #ifdef WITNESS 34 35 static struct lock_type taskq_lock_type = { 36 .lt_name = "taskq" 37 }; 38 39 #define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \ 40 (LO_CLASS_RWLOCK << LO_CLASSSHIFT) 41 42 #endif /* WITNESS */ 43 44 struct taskq_thread { 45 SLIST_ENTRY(taskq_thread) 46 tt_entry; 47 struct proc *tt_thread; 48 }; 49 SLIST_HEAD(taskq_threads, taskq_thread); 50 51 struct taskq { 52 enum { 53 TQ_S_CREATED, 54 TQ_S_RUNNING, 55 TQ_S_DESTROYED 56 } tq_state; 57 unsigned int tq_running; 58 unsigned int tq_nthreads; 59 unsigned int tq_flags; 60 const char *tq_name; 61 62 struct mutex tq_mtx; 63 struct task_list tq_worklist; 64 65 struct taskq_threads tq_threads; 66 unsigned int tq_barriers; 67 unsigned int tq_bgen; 68 unsigned int tq_bthreads; 69 70 #ifdef WITNESS 71 struct lock_object tq_lock_object; 72 #endif 73 }; 74 75 static const char taskq_sys_name[] = "systq"; 76 77 struct taskq taskq_sys = { 78 .tq_state = TQ_S_CREATED, 79 .tq_running = 0, 80 .tq_nthreads = 1, 81 .tq_flags = 0, 82 .tq_name = taskq_sys_name, 83 .tq_mtx = MUTEX_INITIALIZER_FLAGS(IPL_HIGH, 84 taskq_sys_name, 0), 85 .tq_worklist = TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist), 86 87 .tq_threads = SLIST_HEAD_INITIALIZER(taskq_sys.tq_threads), 88 .tq_barriers = 0, 89 .tq_bgen = 0, 90 .tq_bthreads = 0, 91 92 #ifdef WITNESS 93 .tq_lock_object = { 94 .lo_name = taskq_sys_name, 95 .lo_flags = TASKQ_LOCK_FLAGS, 96 }, 97 #endif 98 }; 99 100 static const char taskq_sys_mp_name[] = "systqmp"; 101 102 struct taskq taskq_sys_mp = { 103 .tq_state = TQ_S_CREATED, 104 .tq_running = 0, 105 .tq_nthreads = 1, 106 .tq_flags = TASKQ_MPSAFE, 107 .tq_name = taskq_sys_mp_name, 108 .tq_mtx = MUTEX_INITIALIZER_FLAGS(IPL_HIGH, 109 taskq_sys_mp_name, 0), 110 .tq_worklist = TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist), 111 112 .tq_threads = SLIST_HEAD_INITIALIZER(taskq_sys_mp.tq_threads), 113 .tq_barriers = 0, 114 .tq_bgen = 0, 115 .tq_bthreads = 0, 116 117 #ifdef WITNESS 118 .tq_lock_object = { 119 .lo_name = taskq_sys_mp_name, 120 .lo_flags = TASKQ_LOCK_FLAGS, 121 }, 122 #endif 123 }; 124 125 struct taskq *const systq = &taskq_sys; 126 struct taskq *const systqmp = &taskq_sys_mp; 127 128 void taskq_init(void); /* called in init_main.c */ 129 void taskq_create_thread(void *); 130 void taskq_barrier_task(void *); 131 int taskq_sleep(const volatile void *, struct mutex *, int, 132 const char *, int); 133 int taskq_next_work(struct taskq *, struct task *); 134 void taskq_thread(void *); 135 136 void 137 taskq_init(void) 138 { 139 WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type); 140 kthread_create_deferred(taskq_create_thread, systq); 141 WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type); 142 kthread_create_deferred(taskq_create_thread, systqmp); 143 } 144 145 struct taskq * 146 taskq_create(const char *name, unsigned int nthreads, int ipl, 147 unsigned int flags) 148 { 149 struct taskq *tq; 150 151 tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK); 152 if (tq == NULL) 153 return (NULL); 154 155 tq->tq_state = TQ_S_CREATED; 156 tq->tq_running = 0; 157 tq->tq_nthreads = nthreads; 158 tq->tq_name = name; 159 tq->tq_flags = flags; 160 161 mtx_init_flags(&tq->tq_mtx, ipl, name, 0); 162 TAILQ_INIT(&tq->tq_worklist); 163 164 SLIST_INIT(&tq->tq_threads); 165 tq->tq_barriers = 0; 166 tq->tq_bgen = 0; 167 tq->tq_bthreads = 0; 168 169 #ifdef WITNESS 170 memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object)); 171 tq->tq_lock_object.lo_name = name; 172 tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS; 173 witness_init(&tq->tq_lock_object, &taskq_lock_type); 174 #endif 175 176 /* try to create a thread to guarantee that tasks will be serviced */ 177 kthread_create_deferred(taskq_create_thread, tq); 178 179 return (tq); 180 } 181 182 void 183 taskq_destroy(struct taskq *tq) 184 { 185 mtx_enter(&tq->tq_mtx); 186 switch (tq->tq_state) { 187 case TQ_S_CREATED: 188 /* tq is still referenced by taskq_create_thread */ 189 tq->tq_state = TQ_S_DESTROYED; 190 mtx_leave(&tq->tq_mtx); 191 return; 192 193 case TQ_S_RUNNING: 194 tq->tq_state = TQ_S_DESTROYED; 195 break; 196 197 default: 198 panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state); 199 } 200 201 while (tq->tq_running > 0) { 202 wakeup(tq); 203 msleep_nsec(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 204 INFSLP); 205 } 206 mtx_leave(&tq->tq_mtx); 207 208 free(tq, M_DEVBUF, sizeof(*tq)); 209 } 210 211 void 212 taskq_create_thread(void *arg) 213 { 214 struct taskq *tq = arg; 215 int rv; 216 217 mtx_enter(&tq->tq_mtx); 218 219 switch (tq->tq_state) { 220 case TQ_S_DESTROYED: 221 mtx_leave(&tq->tq_mtx); 222 free(tq, M_DEVBUF, sizeof(*tq)); 223 return; 224 225 case TQ_S_CREATED: 226 tq->tq_state = TQ_S_RUNNING; 227 break; 228 229 default: 230 panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state); 231 } 232 233 do { 234 tq->tq_running++; 235 mtx_leave(&tq->tq_mtx); 236 237 rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name); 238 239 mtx_enter(&tq->tq_mtx); 240 if (rv != 0) { 241 printf("unable to create thread for \"%s\" taskq\n", 242 tq->tq_name); 243 244 tq->tq_running--; 245 /* could have been destroyed during kthread_create */ 246 if (tq->tq_state == TQ_S_DESTROYED && 247 tq->tq_running == 0) 248 wakeup_one(&tq->tq_running); 249 break; 250 } 251 } while (tq->tq_running < tq->tq_nthreads); 252 253 mtx_leave(&tq->tq_mtx); 254 } 255 256 void 257 taskq_barrier_task(void *p) 258 { 259 struct taskq *tq = p; 260 unsigned int gen; 261 262 mtx_enter(&tq->tq_mtx); 263 tq->tq_bthreads++; 264 wakeup(&tq->tq_bthreads); 265 266 gen = tq->tq_bgen; 267 do { 268 msleep_nsec(&tq->tq_bgen, &tq->tq_mtx, 269 PWAIT, "tqbarend", INFSLP); 270 } while (gen == tq->tq_bgen); 271 mtx_leave(&tq->tq_mtx); 272 } 273 274 static void 275 taskq_do_barrier(struct taskq *tq) 276 { 277 struct task t = TASK_INITIALIZER(taskq_barrier_task, tq); 278 struct proc *thread = curproc; 279 struct taskq_thread *tt; 280 281 mtx_enter(&tq->tq_mtx); 282 tq->tq_barriers++; 283 284 /* is the barrier being run from a task inside the taskq? */ 285 SLIST_FOREACH(tt, &tq->tq_threads, tt_entry) { 286 if (tt->tt_thread == thread) { 287 tq->tq_bthreads++; 288 wakeup(&tq->tq_bthreads); 289 break; 290 } 291 } 292 293 while (tq->tq_bthreads < tq->tq_nthreads) { 294 /* shove the task into the queue for a worker to pick up */ 295 SET(t.t_flags, TASK_ONQUEUE); 296 TAILQ_INSERT_TAIL(&tq->tq_worklist, &t, t_entry); 297 wakeup_one(tq); 298 299 msleep_nsec(&tq->tq_bthreads, &tq->tq_mtx, 300 PWAIT, "tqbar", INFSLP); 301 302 /* 303 * another thread running a barrier might have 304 * done this work for us. 305 */ 306 if (ISSET(t.t_flags, TASK_ONQUEUE)) 307 TAILQ_REMOVE(&tq->tq_worklist, &t, t_entry); 308 } 309 310 if (--tq->tq_barriers == 0) { 311 /* we're the last one out */ 312 tq->tq_bgen++; 313 wakeup(&tq->tq_bgen); 314 tq->tq_bthreads = 0; 315 } else { 316 unsigned int gen = tq->tq_bgen; 317 do { 318 msleep_nsec(&tq->tq_bgen, &tq->tq_mtx, 319 PWAIT, "tqbarwait", INFSLP); 320 } while (gen == tq->tq_bgen); 321 } 322 mtx_leave(&tq->tq_mtx); 323 } 324 325 void 326 taskq_barrier(struct taskq *tq) 327 { 328 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 329 330 taskq_do_barrier(tq); 331 } 332 333 void 334 taskq_del_barrier(struct taskq *tq, struct task *t) 335 { 336 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 337 338 if (task_del(tq, t)) 339 return; 340 341 taskq_do_barrier(tq); 342 } 343 344 void 345 task_set(struct task *t, void (*fn)(void *), void *arg) 346 { 347 t->t_func = fn; 348 t->t_arg = arg; 349 t->t_flags = 0; 350 } 351 352 int 353 task_add(struct taskq *tq, struct task *w) 354 { 355 int rv = 0; 356 357 if (ISSET(w->t_flags, TASK_ONQUEUE)) 358 return (0); 359 360 mtx_enter(&tq->tq_mtx); 361 if (!ISSET(w->t_flags, TASK_ONQUEUE)) { 362 rv = 1; 363 SET(w->t_flags, TASK_ONQUEUE); 364 TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry); 365 #if NKCOV > 0 366 w->t_process = curproc->p_p; 367 #endif 368 } 369 mtx_leave(&tq->tq_mtx); 370 371 if (rv) 372 wakeup_one(tq); 373 374 return (rv); 375 } 376 377 int 378 task_del(struct taskq *tq, struct task *w) 379 { 380 int rv = 0; 381 382 if (!ISSET(w->t_flags, TASK_ONQUEUE)) 383 return (0); 384 385 mtx_enter(&tq->tq_mtx); 386 if (ISSET(w->t_flags, TASK_ONQUEUE)) { 387 rv = 1; 388 CLR(w->t_flags, TASK_ONQUEUE); 389 TAILQ_REMOVE(&tq->tq_worklist, w, t_entry); 390 } 391 mtx_leave(&tq->tq_mtx); 392 393 return (rv); 394 } 395 396 int 397 taskq_next_work(struct taskq *tq, struct task *work) 398 { 399 struct task *next; 400 401 mtx_enter(&tq->tq_mtx); 402 while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) { 403 if (tq->tq_state != TQ_S_RUNNING) { 404 mtx_leave(&tq->tq_mtx); 405 return (0); 406 } 407 408 msleep_nsec(tq, &tq->tq_mtx, PWAIT, "bored", INFSLP); 409 } 410 411 TAILQ_REMOVE(&tq->tq_worklist, next, t_entry); 412 CLR(next->t_flags, TASK_ONQUEUE); 413 414 *work = *next; /* copy to caller to avoid races */ 415 416 next = TAILQ_FIRST(&tq->tq_worklist); 417 mtx_leave(&tq->tq_mtx); 418 419 if (next != NULL && tq->tq_nthreads > 1) 420 wakeup_one(tq); 421 422 return (1); 423 } 424 425 void 426 taskq_thread(void *xtq) 427 { 428 struct taskq_thread self = { .tt_thread = curproc }; 429 struct taskq *tq = xtq; 430 struct task work; 431 int last; 432 433 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 434 KERNEL_UNLOCK(); 435 436 mtx_enter(&tq->tq_mtx); 437 SLIST_INSERT_HEAD(&tq->tq_threads, &self, tt_entry); 438 mtx_leave(&tq->tq_mtx); 439 440 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL); 441 442 while (taskq_next_work(tq, &work)) { 443 WITNESS_LOCK(&tq->tq_lock_object, 0); 444 #if NKCOV > 0 445 kcov_remote_enter(KCOV_REMOTE_COMMON, work.t_process); 446 #endif 447 (*work.t_func)(work.t_arg); 448 #if NKCOV > 0 449 kcov_remote_leave(KCOV_REMOTE_COMMON, work.t_process); 450 #endif 451 WITNESS_UNLOCK(&tq->tq_lock_object, 0); 452 sched_pause(yield); 453 } 454 455 mtx_enter(&tq->tq_mtx); 456 SLIST_REMOVE(&tq->tq_threads, &self, taskq_thread, tt_entry); 457 last = (--tq->tq_running == 0); 458 mtx_leave(&tq->tq_mtx); 459 460 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 461 KERNEL_LOCK(); 462 463 if (last) 464 wakeup_one(&tq->tq_running); 465 466 kthread_exit(0); 467 } 468