xref: /openbsd/sys/kern/kern_task.c (revision 2ee1bbb5)
1 /*	$OpenBSD: kern_task.c,v 1.36 2025/01/13 03:21:10 mvs Exp $ */
2 
3 /*
4  * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/param.h>
20 #include <sys/systm.h>
21 #include <sys/malloc.h>
22 #include <sys/mutex.h>
23 #include <sys/kthread.h>
24 #include <sys/task.h>
25 #include <sys/proc.h>
26 #include <sys/witness.h>
27 
28 #include "kcov.h"
29 #if NKCOV > 0
30 #include <sys/kcov.h>
31 #endif
32 
33 #ifdef WITNESS
34 
35 static struct lock_type taskq_lock_type = {
36 	.lt_name = "taskq"
37 };
38 
39 #define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \
40     (LO_CLASS_RWLOCK << LO_CLASSSHIFT)
41 
42 #endif /* WITNESS */
43 
44 struct taskq_thread {
45 	SLIST_ENTRY(taskq_thread)
46 				 tt_entry;
47 	struct proc		*tt_thread;
48 };
49 SLIST_HEAD(taskq_threads, taskq_thread);
50 
51 struct taskq {
52 	enum {
53 		TQ_S_CREATED,
54 		TQ_S_RUNNING,
55 		TQ_S_DESTROYED
56 	}			 tq_state;
57 	unsigned int		 tq_running;
58 	unsigned int		 tq_nthreads;
59 	unsigned int		 tq_flags;
60 	const char		*tq_name;
61 
62 	struct mutex		 tq_mtx;
63 	struct task_list	 tq_worklist;
64 
65 	struct taskq_threads	 tq_threads;
66 	unsigned int		 tq_barriers;
67 	unsigned int		 tq_bgen;
68 	unsigned int		 tq_bthreads;
69 
70 #ifdef WITNESS
71 	struct lock_object	 tq_lock_object;
72 #endif
73 };
74 
75 static const char taskq_sys_name[] = "systq";
76 
77 struct taskq taskq_sys = {
78 	.tq_state	= TQ_S_CREATED,
79 	.tq_running	= 0,
80 	.tq_nthreads	= 1,
81 	.tq_flags	= 0,
82 	.tq_name	= taskq_sys_name,
83 	.tq_mtx		= MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
84 			      taskq_sys_name, 0),
85 	.tq_worklist	= TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist),
86 
87 	.tq_threads	= SLIST_HEAD_INITIALIZER(taskq_sys.tq_threads),
88 	.tq_barriers	= 0,
89 	.tq_bgen	= 0,
90 	.tq_bthreads	= 0,
91 
92 #ifdef WITNESS
93 	.tq_lock_object	= {
94 		.lo_name	= taskq_sys_name,
95 		.lo_flags	= TASKQ_LOCK_FLAGS,
96 	},
97 #endif
98 };
99 
100 static const char taskq_sys_mp_name[] = "systqmp";
101 
102 struct taskq taskq_sys_mp = {
103 	.tq_state	= TQ_S_CREATED,
104 	.tq_running	= 0,
105 	.tq_nthreads	= 1,
106 	.tq_flags	= TASKQ_MPSAFE,
107 	.tq_name	= taskq_sys_mp_name,
108 	.tq_mtx		= MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
109 			      taskq_sys_mp_name, 0),
110 	.tq_worklist	= TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist),
111 
112 	.tq_threads	= SLIST_HEAD_INITIALIZER(taskq_sys_mp.tq_threads),
113 	.tq_barriers	= 0,
114 	.tq_bgen	= 0,
115 	.tq_bthreads	= 0,
116 
117 #ifdef WITNESS
118 	.tq_lock_object = {
119 		.lo_name	= taskq_sys_mp_name,
120 		.lo_flags	= TASKQ_LOCK_FLAGS,
121 	},
122 #endif
123 };
124 
125 struct taskq *const systq = &taskq_sys;
126 struct taskq *const systqmp = &taskq_sys_mp;
127 
128 void	taskq_init(void); /* called in init_main.c */
129 void	taskq_create_thread(void *);
130 void	taskq_barrier_task(void *);
131 int	taskq_next_work(struct taskq *, struct task *);
132 void	taskq_thread(void *);
133 
134 void
taskq_init(void)135 taskq_init(void)
136 {
137 	WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type);
138 	kthread_create_deferred(taskq_create_thread, systq);
139 	WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type);
140 	kthread_create_deferred(taskq_create_thread, systqmp);
141 }
142 
143 struct taskq *
taskq_create(const char * name,unsigned int nthreads,int ipl,unsigned int flags)144 taskq_create(const char *name, unsigned int nthreads, int ipl,
145     unsigned int flags)
146 {
147 	struct taskq *tq;
148 
149 	tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
150 	if (tq == NULL)
151 		return (NULL);
152 
153 	tq->tq_state = TQ_S_CREATED;
154 	tq->tq_running = 0;
155 	tq->tq_nthreads = nthreads;
156 	tq->tq_name = name;
157 	tq->tq_flags = flags;
158 
159 	mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
160 	TAILQ_INIT(&tq->tq_worklist);
161 
162 	SLIST_INIT(&tq->tq_threads);
163 	tq->tq_barriers = 0;
164 	tq->tq_bgen = 0;
165 	tq->tq_bthreads = 0;
166 
167 #ifdef WITNESS
168 	memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object));
169 	tq->tq_lock_object.lo_name = name;
170 	tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS;
171 	witness_init(&tq->tq_lock_object, &taskq_lock_type);
172 #endif
173 
174 	/* try to create a thread to guarantee that tasks will be serviced */
175 	kthread_create_deferred(taskq_create_thread, tq);
176 
177 	return (tq);
178 }
179 
180 void
taskq_destroy(struct taskq * tq)181 taskq_destroy(struct taskq *tq)
182 {
183 	mtx_enter(&tq->tq_mtx);
184 	switch (tq->tq_state) {
185 	case TQ_S_CREATED:
186 		/* tq is still referenced by taskq_create_thread */
187 		tq->tq_state = TQ_S_DESTROYED;
188 		mtx_leave(&tq->tq_mtx);
189 		return;
190 
191 	case TQ_S_RUNNING:
192 		tq->tq_state = TQ_S_DESTROYED;
193 		break;
194 
195 	default:
196 		panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
197 	}
198 
199 	while (tq->tq_running > 0) {
200 		wakeup(tq);
201 		msleep_nsec(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy",
202 		    INFSLP);
203 	}
204 	mtx_leave(&tq->tq_mtx);
205 
206 	free(tq, M_DEVBUF, sizeof(*tq));
207 }
208 
209 void
taskq_create_thread(void * arg)210 taskq_create_thread(void *arg)
211 {
212 	struct taskq *tq = arg;
213 	int rv;
214 
215 	mtx_enter(&tq->tq_mtx);
216 
217 	switch (tq->tq_state) {
218 	case TQ_S_DESTROYED:
219 		mtx_leave(&tq->tq_mtx);
220 		free(tq, M_DEVBUF, sizeof(*tq));
221 		return;
222 
223 	case TQ_S_CREATED:
224 		tq->tq_state = TQ_S_RUNNING;
225 		break;
226 
227 	default:
228 		panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
229 	}
230 
231 	do {
232 		tq->tq_running++;
233 		mtx_leave(&tq->tq_mtx);
234 
235 		rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
236 
237 		mtx_enter(&tq->tq_mtx);
238 		if (rv != 0) {
239 			printf("unable to create thread for \"%s\" taskq\n",
240 			    tq->tq_name);
241 
242 			tq->tq_running--;
243 			/* could have been destroyed during kthread_create */
244 			if (tq->tq_state == TQ_S_DESTROYED &&
245 			    tq->tq_running == 0)
246 				wakeup_one(&tq->tq_running);
247 			break;
248 		}
249 	} while (tq->tq_running < tq->tq_nthreads);
250 
251 	mtx_leave(&tq->tq_mtx);
252 }
253 
254 void
taskq_barrier_task(void * p)255 taskq_barrier_task(void *p)
256 {
257 	struct taskq *tq = p;
258 	unsigned int gen;
259 
260 	mtx_enter(&tq->tq_mtx);
261 	tq->tq_bthreads++;
262 	wakeup(&tq->tq_bthreads);
263 
264 	gen = tq->tq_bgen;
265 	do {
266 		msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
267 		    PWAIT, "tqbarend", INFSLP);
268 	} while (gen == tq->tq_bgen);
269 	mtx_leave(&tq->tq_mtx);
270 }
271 
272 static void
taskq_do_barrier(struct taskq * tq)273 taskq_do_barrier(struct taskq *tq)
274 {
275 	struct task t = TASK_INITIALIZER(taskq_barrier_task, tq);
276 	struct proc *thread = curproc;
277 	struct taskq_thread *tt;
278 
279 	mtx_enter(&tq->tq_mtx);
280 	tq->tq_barriers++;
281 
282 	/* is the barrier being run from a task inside the taskq? */
283 	SLIST_FOREACH(tt, &tq->tq_threads, tt_entry) {
284 		if (tt->tt_thread == thread) {
285 			tq->tq_bthreads++;
286 			wakeup(&tq->tq_bthreads);
287 			break;
288 		}
289 	}
290 
291 	while (tq->tq_bthreads < tq->tq_nthreads) {
292 		/* shove the task into the queue for a worker to pick up */
293 		SET(t.t_flags, TASK_ONQUEUE);
294 		TAILQ_INSERT_TAIL(&tq->tq_worklist, &t, t_entry);
295 		wakeup_one(tq);
296 
297 		msleep_nsec(&tq->tq_bthreads, &tq->tq_mtx,
298 		    PWAIT, "tqbar", INFSLP);
299 
300 		/*
301 		 * another thread running a barrier might have
302 		 * done this work for us.
303 		 */
304 		if (ISSET(t.t_flags, TASK_ONQUEUE))
305 			TAILQ_REMOVE(&tq->tq_worklist, &t, t_entry);
306 	}
307 
308 	if (--tq->tq_barriers == 0) {
309 		/* we're the last one out */
310 		tq->tq_bgen++;
311 		wakeup(&tq->tq_bgen);
312 		tq->tq_bthreads = 0;
313 	} else {
314 		unsigned int gen = tq->tq_bgen;
315 		do {
316 			msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
317 			    PWAIT, "tqbarwait", INFSLP);
318 		} while (gen == tq->tq_bgen);
319 	}
320 	mtx_leave(&tq->tq_mtx);
321 }
322 
323 void
taskq_barrier(struct taskq * tq)324 taskq_barrier(struct taskq *tq)
325 {
326 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
327 
328 	taskq_do_barrier(tq);
329 }
330 
331 void
taskq_del_barrier(struct taskq * tq,struct task * t)332 taskq_del_barrier(struct taskq *tq, struct task *t)
333 {
334 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
335 
336 	task_del(tq, t);
337 	taskq_do_barrier(tq);
338 }
339 
340 void
task_set(struct task * t,void (* fn)(void *),void * arg)341 task_set(struct task *t, void (*fn)(void *), void *arg)
342 {
343 	t->t_func = fn;
344 	t->t_arg = arg;
345 	t->t_flags = 0;
346 }
347 
348 int
task_add(struct taskq * tq,struct task * w)349 task_add(struct taskq *tq, struct task *w)
350 {
351 	int rv = 0;
352 
353 	if (ISSET(w->t_flags, TASK_ONQUEUE))
354 		return (0);
355 
356 	mtx_enter(&tq->tq_mtx);
357 	if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
358 		rv = 1;
359 		SET(w->t_flags, TASK_ONQUEUE);
360 		TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
361 #if NKCOV > 0
362 		if (!kcov_cold)
363 			w->t_process = curproc->p_p;
364 #endif
365 	}
366 	mtx_leave(&tq->tq_mtx);
367 
368 	if (rv)
369 		wakeup_one(tq);
370 
371 	return (rv);
372 }
373 
374 int
task_del(struct taskq * tq,struct task * w)375 task_del(struct taskq *tq, struct task *w)
376 {
377 	int rv = 0;
378 
379 	if (!ISSET(w->t_flags, TASK_ONQUEUE))
380 		return (0);
381 
382 	mtx_enter(&tq->tq_mtx);
383 	if (ISSET(w->t_flags, TASK_ONQUEUE)) {
384 		rv = 1;
385 		CLR(w->t_flags, TASK_ONQUEUE);
386 		TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
387 	}
388 	mtx_leave(&tq->tq_mtx);
389 
390 	return (rv);
391 }
392 
393 int
taskq_next_work(struct taskq * tq,struct task * work)394 taskq_next_work(struct taskq *tq, struct task *work)
395 {
396 	struct task *next;
397 
398 	mtx_enter(&tq->tq_mtx);
399 	while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
400 		if (tq->tq_state != TQ_S_RUNNING) {
401 			mtx_leave(&tq->tq_mtx);
402 			return (0);
403 		}
404 
405 		msleep_nsec(tq, &tq->tq_mtx, PWAIT, "bored", INFSLP);
406 	}
407 
408 	TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
409 	CLR(next->t_flags, TASK_ONQUEUE);
410 
411 	*work = *next; /* copy to caller to avoid races */
412 
413 	next = TAILQ_FIRST(&tq->tq_worklist);
414 	mtx_leave(&tq->tq_mtx);
415 
416 	if (next != NULL && tq->tq_nthreads > 1)
417 		wakeup_one(tq);
418 
419 	return (1);
420 }
421 
422 void
taskq_thread(void * xtq)423 taskq_thread(void *xtq)
424 {
425 	struct taskq_thread self = { .tt_thread = curproc };
426 	struct taskq *tq = xtq;
427 	struct task work;
428 	int last;
429 
430 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
431 		KERNEL_UNLOCK();
432 
433 	mtx_enter(&tq->tq_mtx);
434 	SLIST_INSERT_HEAD(&tq->tq_threads, &self, tt_entry);
435 	mtx_leave(&tq->tq_mtx);
436 
437 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
438 
439 	while (taskq_next_work(tq, &work)) {
440 		WITNESS_LOCK(&tq->tq_lock_object, 0);
441 #if NKCOV > 0
442 		kcov_remote_enter(KCOV_REMOTE_COMMON, work.t_process);
443 #endif
444 		(*work.t_func)(work.t_arg);
445 #if NKCOV > 0
446 		kcov_remote_leave(KCOV_REMOTE_COMMON, work.t_process);
447 #endif
448 		WITNESS_UNLOCK(&tq->tq_lock_object, 0);
449 		sched_pause(yield);
450 	}
451 
452 	mtx_enter(&tq->tq_mtx);
453 	SLIST_REMOVE(&tq->tq_threads, &self, taskq_thread, tt_entry);
454 	last = (--tq->tq_running == 0);
455 	mtx_leave(&tq->tq_mtx);
456 
457 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
458 		KERNEL_LOCK();
459 
460 	if (last)
461 		wakeup_one(&tq->tq_running);
462 
463 	kthread_exit(0);
464 }
465