xref: /openbsd/sys/kern/kern_task.c (revision 3b372c34)
1 /*	$OpenBSD: kern_task.c,v 1.35 2024/05/14 08:26:13 jsg Exp $ */
2 
3 /*
4  * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/param.h>
20 #include <sys/systm.h>
21 #include <sys/malloc.h>
22 #include <sys/mutex.h>
23 #include <sys/kthread.h>
24 #include <sys/task.h>
25 #include <sys/proc.h>
26 #include <sys/witness.h>
27 
28 #include "kcov.h"
29 #if NKCOV > 0
30 #include <sys/kcov.h>
31 #endif
32 
33 #ifdef WITNESS
34 
35 static struct lock_type taskq_lock_type = {
36 	.lt_name = "taskq"
37 };
38 
39 #define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \
40     (LO_CLASS_RWLOCK << LO_CLASSSHIFT)
41 
42 #endif /* WITNESS */
43 
44 struct taskq_thread {
45 	SLIST_ENTRY(taskq_thread)
46 				 tt_entry;
47 	struct proc		*tt_thread;
48 };
49 SLIST_HEAD(taskq_threads, taskq_thread);
50 
51 struct taskq {
52 	enum {
53 		TQ_S_CREATED,
54 		TQ_S_RUNNING,
55 		TQ_S_DESTROYED
56 	}			 tq_state;
57 	unsigned int		 tq_running;
58 	unsigned int		 tq_nthreads;
59 	unsigned int		 tq_flags;
60 	const char		*tq_name;
61 
62 	struct mutex		 tq_mtx;
63 	struct task_list	 tq_worklist;
64 
65 	struct taskq_threads	 tq_threads;
66 	unsigned int		 tq_barriers;
67 	unsigned int		 tq_bgen;
68 	unsigned int		 tq_bthreads;
69 
70 #ifdef WITNESS
71 	struct lock_object	 tq_lock_object;
72 #endif
73 };
74 
75 static const char taskq_sys_name[] = "systq";
76 
77 struct taskq taskq_sys = {
78 	.tq_state	= TQ_S_CREATED,
79 	.tq_running	= 0,
80 	.tq_nthreads	= 1,
81 	.tq_flags	= 0,
82 	.tq_name	= taskq_sys_name,
83 	.tq_mtx		= MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
84 			      taskq_sys_name, 0),
85 	.tq_worklist	= TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist),
86 
87 	.tq_threads	= SLIST_HEAD_INITIALIZER(taskq_sys.tq_threads),
88 	.tq_barriers	= 0,
89 	.tq_bgen	= 0,
90 	.tq_bthreads	= 0,
91 
92 #ifdef WITNESS
93 	.tq_lock_object	= {
94 		.lo_name	= taskq_sys_name,
95 		.lo_flags	= TASKQ_LOCK_FLAGS,
96 	},
97 #endif
98 };
99 
100 static const char taskq_sys_mp_name[] = "systqmp";
101 
102 struct taskq taskq_sys_mp = {
103 	.tq_state	= TQ_S_CREATED,
104 	.tq_running	= 0,
105 	.tq_nthreads	= 1,
106 	.tq_flags	= TASKQ_MPSAFE,
107 	.tq_name	= taskq_sys_mp_name,
108 	.tq_mtx		= MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
109 			      taskq_sys_mp_name, 0),
110 	.tq_worklist	= TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist),
111 
112 	.tq_threads	= SLIST_HEAD_INITIALIZER(taskq_sys_mp.tq_threads),
113 	.tq_barriers	= 0,
114 	.tq_bgen	= 0,
115 	.tq_bthreads	= 0,
116 
117 #ifdef WITNESS
118 	.tq_lock_object = {
119 		.lo_name	= taskq_sys_mp_name,
120 		.lo_flags	= TASKQ_LOCK_FLAGS,
121 	},
122 #endif
123 };
124 
125 struct taskq *const systq = &taskq_sys;
126 struct taskq *const systqmp = &taskq_sys_mp;
127 
128 void	taskq_init(void); /* called in init_main.c */
129 void	taskq_create_thread(void *);
130 void	taskq_barrier_task(void *);
131 int	taskq_next_work(struct taskq *, struct task *);
132 void	taskq_thread(void *);
133 
134 void
taskq_init(void)135 taskq_init(void)
136 {
137 	WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type);
138 	kthread_create_deferred(taskq_create_thread, systq);
139 	WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type);
140 	kthread_create_deferred(taskq_create_thread, systqmp);
141 }
142 
143 struct taskq *
taskq_create(const char * name,unsigned int nthreads,int ipl,unsigned int flags)144 taskq_create(const char *name, unsigned int nthreads, int ipl,
145     unsigned int flags)
146 {
147 	struct taskq *tq;
148 
149 	tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
150 	if (tq == NULL)
151 		return (NULL);
152 
153 	tq->tq_state = TQ_S_CREATED;
154 	tq->tq_running = 0;
155 	tq->tq_nthreads = nthreads;
156 	tq->tq_name = name;
157 	tq->tq_flags = flags;
158 
159 	mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
160 	TAILQ_INIT(&tq->tq_worklist);
161 
162 	SLIST_INIT(&tq->tq_threads);
163 	tq->tq_barriers = 0;
164 	tq->tq_bgen = 0;
165 	tq->tq_bthreads = 0;
166 
167 #ifdef WITNESS
168 	memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object));
169 	tq->tq_lock_object.lo_name = name;
170 	tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS;
171 	witness_init(&tq->tq_lock_object, &taskq_lock_type);
172 #endif
173 
174 	/* try to create a thread to guarantee that tasks will be serviced */
175 	kthread_create_deferred(taskq_create_thread, tq);
176 
177 	return (tq);
178 }
179 
180 void
taskq_destroy(struct taskq * tq)181 taskq_destroy(struct taskq *tq)
182 {
183 	mtx_enter(&tq->tq_mtx);
184 	switch (tq->tq_state) {
185 	case TQ_S_CREATED:
186 		/* tq is still referenced by taskq_create_thread */
187 		tq->tq_state = TQ_S_DESTROYED;
188 		mtx_leave(&tq->tq_mtx);
189 		return;
190 
191 	case TQ_S_RUNNING:
192 		tq->tq_state = TQ_S_DESTROYED;
193 		break;
194 
195 	default:
196 		panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
197 	}
198 
199 	while (tq->tq_running > 0) {
200 		wakeup(tq);
201 		msleep_nsec(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy",
202 		    INFSLP);
203 	}
204 	mtx_leave(&tq->tq_mtx);
205 
206 	free(tq, M_DEVBUF, sizeof(*tq));
207 }
208 
209 void
taskq_create_thread(void * arg)210 taskq_create_thread(void *arg)
211 {
212 	struct taskq *tq = arg;
213 	int rv;
214 
215 	mtx_enter(&tq->tq_mtx);
216 
217 	switch (tq->tq_state) {
218 	case TQ_S_DESTROYED:
219 		mtx_leave(&tq->tq_mtx);
220 		free(tq, M_DEVBUF, sizeof(*tq));
221 		return;
222 
223 	case TQ_S_CREATED:
224 		tq->tq_state = TQ_S_RUNNING;
225 		break;
226 
227 	default:
228 		panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
229 	}
230 
231 	do {
232 		tq->tq_running++;
233 		mtx_leave(&tq->tq_mtx);
234 
235 		rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
236 
237 		mtx_enter(&tq->tq_mtx);
238 		if (rv != 0) {
239 			printf("unable to create thread for \"%s\" taskq\n",
240 			    tq->tq_name);
241 
242 			tq->tq_running--;
243 			/* could have been destroyed during kthread_create */
244 			if (tq->tq_state == TQ_S_DESTROYED &&
245 			    tq->tq_running == 0)
246 				wakeup_one(&tq->tq_running);
247 			break;
248 		}
249 	} while (tq->tq_running < tq->tq_nthreads);
250 
251 	mtx_leave(&tq->tq_mtx);
252 }
253 
254 void
taskq_barrier_task(void * p)255 taskq_barrier_task(void *p)
256 {
257 	struct taskq *tq = p;
258 	unsigned int gen;
259 
260 	mtx_enter(&tq->tq_mtx);
261 	tq->tq_bthreads++;
262 	wakeup(&tq->tq_bthreads);
263 
264 	gen = tq->tq_bgen;
265 	do {
266 		msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
267 		    PWAIT, "tqbarend", INFSLP);
268 	} while (gen == tq->tq_bgen);
269 	mtx_leave(&tq->tq_mtx);
270 }
271 
272 static void
taskq_do_barrier(struct taskq * tq)273 taskq_do_barrier(struct taskq *tq)
274 {
275 	struct task t = TASK_INITIALIZER(taskq_barrier_task, tq);
276 	struct proc *thread = curproc;
277 	struct taskq_thread *tt;
278 
279 	mtx_enter(&tq->tq_mtx);
280 	tq->tq_barriers++;
281 
282 	/* is the barrier being run from a task inside the taskq? */
283 	SLIST_FOREACH(tt, &tq->tq_threads, tt_entry) {
284 		if (tt->tt_thread == thread) {
285 			tq->tq_bthreads++;
286 			wakeup(&tq->tq_bthreads);
287 			break;
288 		}
289 	}
290 
291 	while (tq->tq_bthreads < tq->tq_nthreads) {
292 		/* shove the task into the queue for a worker to pick up */
293 		SET(t.t_flags, TASK_ONQUEUE);
294 		TAILQ_INSERT_TAIL(&tq->tq_worklist, &t, t_entry);
295 		wakeup_one(tq);
296 
297 		msleep_nsec(&tq->tq_bthreads, &tq->tq_mtx,
298 		    PWAIT, "tqbar", INFSLP);
299 
300 		/*
301 		 * another thread running a barrier might have
302 		 * done this work for us.
303 		 */
304 		if (ISSET(t.t_flags, TASK_ONQUEUE))
305 			TAILQ_REMOVE(&tq->tq_worklist, &t, t_entry);
306 	}
307 
308 	if (--tq->tq_barriers == 0) {
309 		/* we're the last one out */
310 		tq->tq_bgen++;
311 		wakeup(&tq->tq_bgen);
312 		tq->tq_bthreads = 0;
313 	} else {
314 		unsigned int gen = tq->tq_bgen;
315 		do {
316 			msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
317 			    PWAIT, "tqbarwait", INFSLP);
318 		} while (gen == tq->tq_bgen);
319 	}
320 	mtx_leave(&tq->tq_mtx);
321 }
322 
323 void
taskq_barrier(struct taskq * tq)324 taskq_barrier(struct taskq *tq)
325 {
326 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
327 
328 	taskq_do_barrier(tq);
329 }
330 
331 void
taskq_del_barrier(struct taskq * tq,struct task * t)332 taskq_del_barrier(struct taskq *tq, struct task *t)
333 {
334 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
335 
336 	if (task_del(tq, t))
337 		return;
338 
339 	taskq_do_barrier(tq);
340 }
341 
342 void
task_set(struct task * t,void (* fn)(void *),void * arg)343 task_set(struct task *t, void (*fn)(void *), void *arg)
344 {
345 	t->t_func = fn;
346 	t->t_arg = arg;
347 	t->t_flags = 0;
348 }
349 
350 int
task_add(struct taskq * tq,struct task * w)351 task_add(struct taskq *tq, struct task *w)
352 {
353 	int rv = 0;
354 
355 	if (ISSET(w->t_flags, TASK_ONQUEUE))
356 		return (0);
357 
358 	mtx_enter(&tq->tq_mtx);
359 	if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
360 		rv = 1;
361 		SET(w->t_flags, TASK_ONQUEUE);
362 		TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
363 #if NKCOV > 0
364 		if (!kcov_cold)
365 			w->t_process = curproc->p_p;
366 #endif
367 	}
368 	mtx_leave(&tq->tq_mtx);
369 
370 	if (rv)
371 		wakeup_one(tq);
372 
373 	return (rv);
374 }
375 
376 int
task_del(struct taskq * tq,struct task * w)377 task_del(struct taskq *tq, struct task *w)
378 {
379 	int rv = 0;
380 
381 	if (!ISSET(w->t_flags, TASK_ONQUEUE))
382 		return (0);
383 
384 	mtx_enter(&tq->tq_mtx);
385 	if (ISSET(w->t_flags, TASK_ONQUEUE)) {
386 		rv = 1;
387 		CLR(w->t_flags, TASK_ONQUEUE);
388 		TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
389 	}
390 	mtx_leave(&tq->tq_mtx);
391 
392 	return (rv);
393 }
394 
395 int
taskq_next_work(struct taskq * tq,struct task * work)396 taskq_next_work(struct taskq *tq, struct task *work)
397 {
398 	struct task *next;
399 
400 	mtx_enter(&tq->tq_mtx);
401 	while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
402 		if (tq->tq_state != TQ_S_RUNNING) {
403 			mtx_leave(&tq->tq_mtx);
404 			return (0);
405 		}
406 
407 		msleep_nsec(tq, &tq->tq_mtx, PWAIT, "bored", INFSLP);
408 	}
409 
410 	TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
411 	CLR(next->t_flags, TASK_ONQUEUE);
412 
413 	*work = *next; /* copy to caller to avoid races */
414 
415 	next = TAILQ_FIRST(&tq->tq_worklist);
416 	mtx_leave(&tq->tq_mtx);
417 
418 	if (next != NULL && tq->tq_nthreads > 1)
419 		wakeup_one(tq);
420 
421 	return (1);
422 }
423 
424 void
taskq_thread(void * xtq)425 taskq_thread(void *xtq)
426 {
427 	struct taskq_thread self = { .tt_thread = curproc };
428 	struct taskq *tq = xtq;
429 	struct task work;
430 	int last;
431 
432 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
433 		KERNEL_UNLOCK();
434 
435 	mtx_enter(&tq->tq_mtx);
436 	SLIST_INSERT_HEAD(&tq->tq_threads, &self, tt_entry);
437 	mtx_leave(&tq->tq_mtx);
438 
439 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
440 
441 	while (taskq_next_work(tq, &work)) {
442 		WITNESS_LOCK(&tq->tq_lock_object, 0);
443 #if NKCOV > 0
444 		kcov_remote_enter(KCOV_REMOTE_COMMON, work.t_process);
445 #endif
446 		(*work.t_func)(work.t_arg);
447 #if NKCOV > 0
448 		kcov_remote_leave(KCOV_REMOTE_COMMON, work.t_process);
449 #endif
450 		WITNESS_UNLOCK(&tq->tq_lock_object, 0);
451 		sched_pause(yield);
452 	}
453 
454 	mtx_enter(&tq->tq_mtx);
455 	SLIST_REMOVE(&tq->tq_threads, &self, taskq_thread, tt_entry);
456 	last = (--tq->tq_running == 0);
457 	mtx_leave(&tq->tq_mtx);
458 
459 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
460 		KERNEL_LOCK();
461 
462 	if (last)
463 		wakeup_one(&tq->tq_running);
464 
465 	kthread_exit(0);
466 }
467