1 /* $OpenBSD: kern_task.c,v 1.36 2025/01/13 03:21:10 mvs Exp $ */
2
3 /*
4 * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19 #include <sys/param.h>
20 #include <sys/systm.h>
21 #include <sys/malloc.h>
22 #include <sys/mutex.h>
23 #include <sys/kthread.h>
24 #include <sys/task.h>
25 #include <sys/proc.h>
26 #include <sys/witness.h>
27
28 #include "kcov.h"
29 #if NKCOV > 0
30 #include <sys/kcov.h>
31 #endif
32
33 #ifdef WITNESS
34
35 static struct lock_type taskq_lock_type = {
36 .lt_name = "taskq"
37 };
38
39 #define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \
40 (LO_CLASS_RWLOCK << LO_CLASSSHIFT)
41
42 #endif /* WITNESS */
43
44 struct taskq_thread {
45 SLIST_ENTRY(taskq_thread)
46 tt_entry;
47 struct proc *tt_thread;
48 };
49 SLIST_HEAD(taskq_threads, taskq_thread);
50
51 struct taskq {
52 enum {
53 TQ_S_CREATED,
54 TQ_S_RUNNING,
55 TQ_S_DESTROYED
56 } tq_state;
57 unsigned int tq_running;
58 unsigned int tq_nthreads;
59 unsigned int tq_flags;
60 const char *tq_name;
61
62 struct mutex tq_mtx;
63 struct task_list tq_worklist;
64
65 struct taskq_threads tq_threads;
66 unsigned int tq_barriers;
67 unsigned int tq_bgen;
68 unsigned int tq_bthreads;
69
70 #ifdef WITNESS
71 struct lock_object tq_lock_object;
72 #endif
73 };
74
75 static const char taskq_sys_name[] = "systq";
76
77 struct taskq taskq_sys = {
78 .tq_state = TQ_S_CREATED,
79 .tq_running = 0,
80 .tq_nthreads = 1,
81 .tq_flags = 0,
82 .tq_name = taskq_sys_name,
83 .tq_mtx = MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
84 taskq_sys_name, 0),
85 .tq_worklist = TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist),
86
87 .tq_threads = SLIST_HEAD_INITIALIZER(taskq_sys.tq_threads),
88 .tq_barriers = 0,
89 .tq_bgen = 0,
90 .tq_bthreads = 0,
91
92 #ifdef WITNESS
93 .tq_lock_object = {
94 .lo_name = taskq_sys_name,
95 .lo_flags = TASKQ_LOCK_FLAGS,
96 },
97 #endif
98 };
99
100 static const char taskq_sys_mp_name[] = "systqmp";
101
102 struct taskq taskq_sys_mp = {
103 .tq_state = TQ_S_CREATED,
104 .tq_running = 0,
105 .tq_nthreads = 1,
106 .tq_flags = TASKQ_MPSAFE,
107 .tq_name = taskq_sys_mp_name,
108 .tq_mtx = MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
109 taskq_sys_mp_name, 0),
110 .tq_worklist = TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist),
111
112 .tq_threads = SLIST_HEAD_INITIALIZER(taskq_sys_mp.tq_threads),
113 .tq_barriers = 0,
114 .tq_bgen = 0,
115 .tq_bthreads = 0,
116
117 #ifdef WITNESS
118 .tq_lock_object = {
119 .lo_name = taskq_sys_mp_name,
120 .lo_flags = TASKQ_LOCK_FLAGS,
121 },
122 #endif
123 };
124
125 struct taskq *const systq = &taskq_sys;
126 struct taskq *const systqmp = &taskq_sys_mp;
127
128 void taskq_init(void); /* called in init_main.c */
129 void taskq_create_thread(void *);
130 void taskq_barrier_task(void *);
131 int taskq_next_work(struct taskq *, struct task *);
132 void taskq_thread(void *);
133
134 void
taskq_init(void)135 taskq_init(void)
136 {
137 WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type);
138 kthread_create_deferred(taskq_create_thread, systq);
139 WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type);
140 kthread_create_deferred(taskq_create_thread, systqmp);
141 }
142
143 struct taskq *
taskq_create(const char * name,unsigned int nthreads,int ipl,unsigned int flags)144 taskq_create(const char *name, unsigned int nthreads, int ipl,
145 unsigned int flags)
146 {
147 struct taskq *tq;
148
149 tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
150 if (tq == NULL)
151 return (NULL);
152
153 tq->tq_state = TQ_S_CREATED;
154 tq->tq_running = 0;
155 tq->tq_nthreads = nthreads;
156 tq->tq_name = name;
157 tq->tq_flags = flags;
158
159 mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
160 TAILQ_INIT(&tq->tq_worklist);
161
162 SLIST_INIT(&tq->tq_threads);
163 tq->tq_barriers = 0;
164 tq->tq_bgen = 0;
165 tq->tq_bthreads = 0;
166
167 #ifdef WITNESS
168 memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object));
169 tq->tq_lock_object.lo_name = name;
170 tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS;
171 witness_init(&tq->tq_lock_object, &taskq_lock_type);
172 #endif
173
174 /* try to create a thread to guarantee that tasks will be serviced */
175 kthread_create_deferred(taskq_create_thread, tq);
176
177 return (tq);
178 }
179
180 void
taskq_destroy(struct taskq * tq)181 taskq_destroy(struct taskq *tq)
182 {
183 mtx_enter(&tq->tq_mtx);
184 switch (tq->tq_state) {
185 case TQ_S_CREATED:
186 /* tq is still referenced by taskq_create_thread */
187 tq->tq_state = TQ_S_DESTROYED;
188 mtx_leave(&tq->tq_mtx);
189 return;
190
191 case TQ_S_RUNNING:
192 tq->tq_state = TQ_S_DESTROYED;
193 break;
194
195 default:
196 panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
197 }
198
199 while (tq->tq_running > 0) {
200 wakeup(tq);
201 msleep_nsec(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy",
202 INFSLP);
203 }
204 mtx_leave(&tq->tq_mtx);
205
206 free(tq, M_DEVBUF, sizeof(*tq));
207 }
208
209 void
taskq_create_thread(void * arg)210 taskq_create_thread(void *arg)
211 {
212 struct taskq *tq = arg;
213 int rv;
214
215 mtx_enter(&tq->tq_mtx);
216
217 switch (tq->tq_state) {
218 case TQ_S_DESTROYED:
219 mtx_leave(&tq->tq_mtx);
220 free(tq, M_DEVBUF, sizeof(*tq));
221 return;
222
223 case TQ_S_CREATED:
224 tq->tq_state = TQ_S_RUNNING;
225 break;
226
227 default:
228 panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
229 }
230
231 do {
232 tq->tq_running++;
233 mtx_leave(&tq->tq_mtx);
234
235 rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
236
237 mtx_enter(&tq->tq_mtx);
238 if (rv != 0) {
239 printf("unable to create thread for \"%s\" taskq\n",
240 tq->tq_name);
241
242 tq->tq_running--;
243 /* could have been destroyed during kthread_create */
244 if (tq->tq_state == TQ_S_DESTROYED &&
245 tq->tq_running == 0)
246 wakeup_one(&tq->tq_running);
247 break;
248 }
249 } while (tq->tq_running < tq->tq_nthreads);
250
251 mtx_leave(&tq->tq_mtx);
252 }
253
254 void
taskq_barrier_task(void * p)255 taskq_barrier_task(void *p)
256 {
257 struct taskq *tq = p;
258 unsigned int gen;
259
260 mtx_enter(&tq->tq_mtx);
261 tq->tq_bthreads++;
262 wakeup(&tq->tq_bthreads);
263
264 gen = tq->tq_bgen;
265 do {
266 msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
267 PWAIT, "tqbarend", INFSLP);
268 } while (gen == tq->tq_bgen);
269 mtx_leave(&tq->tq_mtx);
270 }
271
272 static void
taskq_do_barrier(struct taskq * tq)273 taskq_do_barrier(struct taskq *tq)
274 {
275 struct task t = TASK_INITIALIZER(taskq_barrier_task, tq);
276 struct proc *thread = curproc;
277 struct taskq_thread *tt;
278
279 mtx_enter(&tq->tq_mtx);
280 tq->tq_barriers++;
281
282 /* is the barrier being run from a task inside the taskq? */
283 SLIST_FOREACH(tt, &tq->tq_threads, tt_entry) {
284 if (tt->tt_thread == thread) {
285 tq->tq_bthreads++;
286 wakeup(&tq->tq_bthreads);
287 break;
288 }
289 }
290
291 while (tq->tq_bthreads < tq->tq_nthreads) {
292 /* shove the task into the queue for a worker to pick up */
293 SET(t.t_flags, TASK_ONQUEUE);
294 TAILQ_INSERT_TAIL(&tq->tq_worklist, &t, t_entry);
295 wakeup_one(tq);
296
297 msleep_nsec(&tq->tq_bthreads, &tq->tq_mtx,
298 PWAIT, "tqbar", INFSLP);
299
300 /*
301 * another thread running a barrier might have
302 * done this work for us.
303 */
304 if (ISSET(t.t_flags, TASK_ONQUEUE))
305 TAILQ_REMOVE(&tq->tq_worklist, &t, t_entry);
306 }
307
308 if (--tq->tq_barriers == 0) {
309 /* we're the last one out */
310 tq->tq_bgen++;
311 wakeup(&tq->tq_bgen);
312 tq->tq_bthreads = 0;
313 } else {
314 unsigned int gen = tq->tq_bgen;
315 do {
316 msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
317 PWAIT, "tqbarwait", INFSLP);
318 } while (gen == tq->tq_bgen);
319 }
320 mtx_leave(&tq->tq_mtx);
321 }
322
323 void
taskq_barrier(struct taskq * tq)324 taskq_barrier(struct taskq *tq)
325 {
326 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
327
328 taskq_do_barrier(tq);
329 }
330
331 void
taskq_del_barrier(struct taskq * tq,struct task * t)332 taskq_del_barrier(struct taskq *tq, struct task *t)
333 {
334 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
335
336 task_del(tq, t);
337 taskq_do_barrier(tq);
338 }
339
340 void
task_set(struct task * t,void (* fn)(void *),void * arg)341 task_set(struct task *t, void (*fn)(void *), void *arg)
342 {
343 t->t_func = fn;
344 t->t_arg = arg;
345 t->t_flags = 0;
346 }
347
348 int
task_add(struct taskq * tq,struct task * w)349 task_add(struct taskq *tq, struct task *w)
350 {
351 int rv = 0;
352
353 if (ISSET(w->t_flags, TASK_ONQUEUE))
354 return (0);
355
356 mtx_enter(&tq->tq_mtx);
357 if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
358 rv = 1;
359 SET(w->t_flags, TASK_ONQUEUE);
360 TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
361 #if NKCOV > 0
362 if (!kcov_cold)
363 w->t_process = curproc->p_p;
364 #endif
365 }
366 mtx_leave(&tq->tq_mtx);
367
368 if (rv)
369 wakeup_one(tq);
370
371 return (rv);
372 }
373
374 int
task_del(struct taskq * tq,struct task * w)375 task_del(struct taskq *tq, struct task *w)
376 {
377 int rv = 0;
378
379 if (!ISSET(w->t_flags, TASK_ONQUEUE))
380 return (0);
381
382 mtx_enter(&tq->tq_mtx);
383 if (ISSET(w->t_flags, TASK_ONQUEUE)) {
384 rv = 1;
385 CLR(w->t_flags, TASK_ONQUEUE);
386 TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
387 }
388 mtx_leave(&tq->tq_mtx);
389
390 return (rv);
391 }
392
393 int
taskq_next_work(struct taskq * tq,struct task * work)394 taskq_next_work(struct taskq *tq, struct task *work)
395 {
396 struct task *next;
397
398 mtx_enter(&tq->tq_mtx);
399 while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
400 if (tq->tq_state != TQ_S_RUNNING) {
401 mtx_leave(&tq->tq_mtx);
402 return (0);
403 }
404
405 msleep_nsec(tq, &tq->tq_mtx, PWAIT, "bored", INFSLP);
406 }
407
408 TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
409 CLR(next->t_flags, TASK_ONQUEUE);
410
411 *work = *next; /* copy to caller to avoid races */
412
413 next = TAILQ_FIRST(&tq->tq_worklist);
414 mtx_leave(&tq->tq_mtx);
415
416 if (next != NULL && tq->tq_nthreads > 1)
417 wakeup_one(tq);
418
419 return (1);
420 }
421
422 void
taskq_thread(void * xtq)423 taskq_thread(void *xtq)
424 {
425 struct taskq_thread self = { .tt_thread = curproc };
426 struct taskq *tq = xtq;
427 struct task work;
428 int last;
429
430 if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
431 KERNEL_UNLOCK();
432
433 mtx_enter(&tq->tq_mtx);
434 SLIST_INSERT_HEAD(&tq->tq_threads, &self, tt_entry);
435 mtx_leave(&tq->tq_mtx);
436
437 WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
438
439 while (taskq_next_work(tq, &work)) {
440 WITNESS_LOCK(&tq->tq_lock_object, 0);
441 #if NKCOV > 0
442 kcov_remote_enter(KCOV_REMOTE_COMMON, work.t_process);
443 #endif
444 (*work.t_func)(work.t_arg);
445 #if NKCOV > 0
446 kcov_remote_leave(KCOV_REMOTE_COMMON, work.t_process);
447 #endif
448 WITNESS_UNLOCK(&tq->tq_lock_object, 0);
449 sched_pause(yield);
450 }
451
452 mtx_enter(&tq->tq_mtx);
453 SLIST_REMOVE(&tq->tq_threads, &self, taskq_thread, tt_entry);
454 last = (--tq->tq_running == 0);
455 mtx_leave(&tq->tq_mtx);
456
457 if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
458 KERNEL_LOCK();
459
460 if (last)
461 wakeup_one(&tq->tq_running);
462
463 kthread_exit(0);
464 }
465