1 /* $OpenBSD: kern_task.c,v 1.18 2016/08/11 01:32:31 dlg Exp $ */ 2 3 /* 4 * Copyright (c) 2013 David Gwynne <dlg@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 #include <sys/param.h> 20 #include <sys/systm.h> 21 #include <sys/malloc.h> 22 #include <sys/mutex.h> 23 #include <sys/kthread.h> 24 #include <sys/task.h> 25 26 #define TASK_ONQUEUE 1 27 28 struct taskq { 29 enum { 30 TQ_S_CREATED, 31 TQ_S_RUNNING, 32 TQ_S_DESTROYED 33 } tq_state; 34 unsigned int tq_running; 35 unsigned int tq_nthreads; 36 unsigned int tq_flags; 37 const char *tq_name; 38 39 struct mutex tq_mtx; 40 struct task_list tq_worklist; 41 }; 42 43 struct taskq taskq_sys = { 44 TQ_S_CREATED, 45 0, 46 1, 47 0, 48 "systq", 49 MUTEX_INITIALIZER(IPL_HIGH), 50 TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist) 51 }; 52 53 struct taskq taskq_sys_mp = { 54 TQ_S_CREATED, 55 0, 56 1, 57 TASKQ_MPSAFE, 58 "systqmp", 59 MUTEX_INITIALIZER(IPL_HIGH), 60 TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist) 61 }; 62 63 typedef int (*sleepfn)(const volatile void *, struct mutex *, int, 64 const char *, int); 65 66 struct taskq *const systq = &taskq_sys; 67 struct taskq *const systqmp = &taskq_sys_mp; 68 69 void taskq_init(void); /* called in init_main.c */ 70 void taskq_create_thread(void *); 71 int taskq_sleep(const volatile void *, struct mutex *, int, 72 const char *, int); 73 int taskq_next_work(struct taskq *, struct task *, sleepfn); 74 void taskq_thread(void *); 75 76 void 77 taskq_init(void) 78 { 79 kthread_create_deferred(taskq_create_thread, systq); 80 kthread_create_deferred(taskq_create_thread, systqmp); 81 } 82 83 struct taskq * 84 taskq_create(const char *name, unsigned int nthreads, int ipl, 85 unsigned int flags) 86 { 87 struct taskq *tq; 88 89 tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK); 90 if (tq == NULL) 91 return (NULL); 92 93 tq->tq_state = TQ_S_CREATED; 94 tq->tq_running = 0; 95 tq->tq_nthreads = nthreads; 96 tq->tq_name = name; 97 tq->tq_flags = flags; 98 99 mtx_init(&tq->tq_mtx, ipl); 100 TAILQ_INIT(&tq->tq_worklist); 101 102 /* try to create a thread to guarantee that tasks will be serviced */ 103 kthread_create_deferred(taskq_create_thread, tq); 104 105 return (tq); 106 } 107 108 void 109 taskq_destroy(struct taskq *tq) 110 { 111 mtx_enter(&tq->tq_mtx); 112 switch (tq->tq_state) { 113 case TQ_S_CREATED: 114 /* tq is still referenced by taskq_create_thread */ 115 tq->tq_state = TQ_S_DESTROYED; 116 mtx_leave(&tq->tq_mtx); 117 return; 118 119 case TQ_S_RUNNING: 120 tq->tq_state = TQ_S_DESTROYED; 121 break; 122 123 default: 124 panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state); 125 } 126 127 while (tq->tq_running > 0) { 128 wakeup(tq); 129 msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0); 130 } 131 mtx_leave(&tq->tq_mtx); 132 133 free(tq, M_DEVBUF, sizeof(*tq)); 134 } 135 136 void 137 taskq_create_thread(void *arg) 138 { 139 struct taskq *tq = arg; 140 int rv; 141 142 mtx_enter(&tq->tq_mtx); 143 144 switch (tq->tq_state) { 145 case TQ_S_DESTROYED: 146 mtx_leave(&tq->tq_mtx); 147 free(tq, M_DEVBUF, sizeof(*tq)); 148 return; 149 150 case TQ_S_CREATED: 151 tq->tq_state = TQ_S_RUNNING; 152 break; 153 154 default: 155 panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state); 156 } 157 158 do { 159 tq->tq_running++; 160 mtx_leave(&tq->tq_mtx); 161 162 rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name); 163 164 mtx_enter(&tq->tq_mtx); 165 if (rv != 0) { 166 printf("unable to create thread for \"%s\" taskq\n", 167 tq->tq_name); 168 169 tq->tq_running--; 170 /* could have been destroyed during kthread_create */ 171 if (tq->tq_state == TQ_S_DESTROYED && 172 tq->tq_running == 0) 173 wakeup_one(&tq->tq_running); 174 break; 175 } 176 } while (tq->tq_running < tq->tq_nthreads); 177 178 mtx_leave(&tq->tq_mtx); 179 } 180 181 void 182 task_set(struct task *t, void (*fn)(void *), void *arg) 183 { 184 t->t_func = fn; 185 t->t_arg = arg; 186 t->t_flags = 0; 187 } 188 189 int 190 task_add(struct taskq *tq, struct task *w) 191 { 192 int rv = 0; 193 194 if (ISSET(w->t_flags, TASK_ONQUEUE)) 195 return (0); 196 197 mtx_enter(&tq->tq_mtx); 198 if (!ISSET(w->t_flags, TASK_ONQUEUE)) { 199 rv = 1; 200 SET(w->t_flags, TASK_ONQUEUE); 201 TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry); 202 } 203 mtx_leave(&tq->tq_mtx); 204 205 if (rv) 206 wakeup_one(tq); 207 208 return (rv); 209 } 210 211 int 212 task_del(struct taskq *tq, struct task *w) 213 { 214 int rv = 0; 215 216 if (!ISSET(w->t_flags, TASK_ONQUEUE)) 217 return (0); 218 219 mtx_enter(&tq->tq_mtx); 220 if (ISSET(w->t_flags, TASK_ONQUEUE)) { 221 rv = 1; 222 CLR(w->t_flags, TASK_ONQUEUE); 223 TAILQ_REMOVE(&tq->tq_worklist, w, t_entry); 224 } 225 mtx_leave(&tq->tq_mtx); 226 227 return (rv); 228 } 229 230 int 231 taskq_sleep(const volatile void *ident, struct mutex *mtx, int priority, 232 const char *wmesg, int tmo) 233 { 234 u_int *flags = &curproc->p_flag; 235 int rv; 236 237 atomic_clearbits_int(flags, P_CANTSLEEP); 238 rv = msleep(ident, mtx, priority, wmesg, tmo); 239 atomic_setbits_int(flags, P_CANTSLEEP); 240 241 return (tmo); 242 } 243 244 int 245 taskq_next_work(struct taskq *tq, struct task *work, sleepfn tqsleep) 246 { 247 struct task *next; 248 249 mtx_enter(&tq->tq_mtx); 250 while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) { 251 if (tq->tq_state != TQ_S_RUNNING) { 252 mtx_leave(&tq->tq_mtx); 253 return (0); 254 } 255 256 tqsleep(tq, &tq->tq_mtx, PWAIT, "bored", 0); 257 } 258 259 TAILQ_REMOVE(&tq->tq_worklist, next, t_entry); 260 CLR(next->t_flags, TASK_ONQUEUE); 261 262 *work = *next; /* copy to caller to avoid races */ 263 264 next = TAILQ_FIRST(&tq->tq_worklist); 265 mtx_leave(&tq->tq_mtx); 266 267 if (next != NULL && tq->tq_nthreads > 1) 268 wakeup_one(tq); 269 270 return (1); 271 } 272 273 void 274 taskq_thread(void *xtq) 275 { 276 sleepfn tqsleep = msleep; 277 struct taskq *tq = xtq; 278 struct task work; 279 int last; 280 281 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 282 KERNEL_UNLOCK(); 283 284 if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP)) { 285 tqsleep = taskq_sleep; 286 atomic_setbits_int(&curproc->p_flag, P_CANTSLEEP); 287 } 288 289 while (taskq_next_work(tq, &work, tqsleep)) { 290 (*work.t_func)(work.t_arg); 291 sched_pause(); 292 } 293 294 mtx_enter(&tq->tq_mtx); 295 last = (--tq->tq_running == 0); 296 mtx_leave(&tq->tq_mtx); 297 298 if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP)) 299 atomic_clearbits_int(&curproc->p_flag, P_CANTSLEEP); 300 301 if (ISSET(tq->tq_flags, TASKQ_MPSAFE)) 302 KERNEL_LOCK(); 303 304 if (last) 305 wakeup_one(&tq->tq_running); 306 307 kthread_exit(0); 308 } 309