xref: /openbsd/sys/kern/kern_task.c (revision 09467b48)
1 /*	$OpenBSD: kern_task.c,v 1.31 2020/08/01 08:40:20 anton Exp $ */
2 
3 /*
4  * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/param.h>
20 #include <sys/systm.h>
21 #include <sys/malloc.h>
22 #include <sys/mutex.h>
23 #include <sys/kthread.h>
24 #include <sys/task.h>
25 #include <sys/proc.h>
26 #include <sys/witness.h>
27 
28 #include "kcov.h"
29 #if NKCOV > 0
30 #include <sys/kcov.h>
31 #endif
32 
33 #ifdef WITNESS
34 
35 static struct lock_type taskq_lock_type = {
36 	.lt_name = "taskq"
37 };
38 
39 #define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \
40     (LO_CLASS_RWLOCK << LO_CLASSSHIFT)
41 
42 #endif /* WITNESS */
43 
44 struct taskq_thread {
45 	SLIST_ENTRY(taskq_thread)
46 				 tt_entry;
47 	struct proc		*tt_thread;
48 };
49 SLIST_HEAD(taskq_threads, taskq_thread);
50 
51 struct taskq {
52 	enum {
53 		TQ_S_CREATED,
54 		TQ_S_RUNNING,
55 		TQ_S_DESTROYED
56 	}			 tq_state;
57 	unsigned int		 tq_running;
58 	unsigned int		 tq_nthreads;
59 	unsigned int		 tq_flags;
60 	const char		*tq_name;
61 
62 	struct mutex		 tq_mtx;
63 	struct task_list	 tq_worklist;
64 
65 	struct taskq_threads	 tq_threads;
66 	unsigned int		 tq_barriers;
67 	unsigned int		 tq_bgen;
68 	unsigned int		 tq_bthreads;
69 
70 #ifdef WITNESS
71 	struct lock_object	 tq_lock_object;
72 #endif
73 };
74 
75 static const char taskq_sys_name[] = "systq";
76 
77 struct taskq taskq_sys = {
78 	.tq_state	= TQ_S_CREATED,
79 	.tq_running	= 0,
80 	.tq_nthreads	= 1,
81 	.tq_flags	= 0,
82 	.tq_name	= taskq_sys_name,
83 	.tq_mtx		= MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
84 			      taskq_sys_name, 0),
85 	.tq_worklist	= TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist),
86 
87 	.tq_threads	= SLIST_HEAD_INITIALIZER(taskq_sys.tq_threads),
88 	.tq_barriers	= 0,
89 	.tq_bgen	= 0,
90 	.tq_bthreads	= 0,
91 
92 #ifdef WITNESS
93 	.tq_lock_object	= {
94 		.lo_name	= taskq_sys_name,
95 		.lo_flags	= TASKQ_LOCK_FLAGS,
96 	},
97 #endif
98 };
99 
100 static const char taskq_sys_mp_name[] = "systqmp";
101 
102 struct taskq taskq_sys_mp = {
103 	.tq_state	= TQ_S_CREATED,
104 	.tq_running	= 0,
105 	.tq_nthreads	= 1,
106 	.tq_flags	= TASKQ_MPSAFE,
107 	.tq_name	= taskq_sys_mp_name,
108 	.tq_mtx		= MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
109 			      taskq_sys_mp_name, 0),
110 	.tq_worklist	= TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist),
111 
112 	.tq_threads	= SLIST_HEAD_INITIALIZER(taskq_sys_mp.tq_threads),
113 	.tq_barriers	= 0,
114 	.tq_bgen	= 0,
115 	.tq_bthreads	= 0,
116 
117 #ifdef WITNESS
118 	.tq_lock_object = {
119 		.lo_name	= taskq_sys_mp_name,
120 		.lo_flags	= TASKQ_LOCK_FLAGS,
121 	},
122 #endif
123 };
124 
125 struct taskq *const systq = &taskq_sys;
126 struct taskq *const systqmp = &taskq_sys_mp;
127 
128 void	taskq_init(void); /* called in init_main.c */
129 void	taskq_create_thread(void *);
130 void	taskq_barrier_task(void *);
131 int	taskq_sleep(const volatile void *, struct mutex *, int,
132 	    const char *, int);
133 int	taskq_next_work(struct taskq *, struct task *);
134 void	taskq_thread(void *);
135 
136 void
137 taskq_init(void)
138 {
139 	WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type);
140 	kthread_create_deferred(taskq_create_thread, systq);
141 	WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type);
142 	kthread_create_deferred(taskq_create_thread, systqmp);
143 }
144 
145 struct taskq *
146 taskq_create(const char *name, unsigned int nthreads, int ipl,
147     unsigned int flags)
148 {
149 	struct taskq *tq;
150 
151 	tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
152 	if (tq == NULL)
153 		return (NULL);
154 
155 	tq->tq_state = TQ_S_CREATED;
156 	tq->tq_running = 0;
157 	tq->tq_nthreads = nthreads;
158 	tq->tq_name = name;
159 	tq->tq_flags = flags;
160 
161 	mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
162 	TAILQ_INIT(&tq->tq_worklist);
163 
164 	SLIST_INIT(&tq->tq_threads);
165 	tq->tq_barriers = 0;
166 	tq->tq_bgen = 0;
167 	tq->tq_bthreads = 0;
168 
169 #ifdef WITNESS
170 	memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object));
171 	tq->tq_lock_object.lo_name = name;
172 	tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS;
173 	witness_init(&tq->tq_lock_object, &taskq_lock_type);
174 #endif
175 
176 	/* try to create a thread to guarantee that tasks will be serviced */
177 	kthread_create_deferred(taskq_create_thread, tq);
178 
179 	return (tq);
180 }
181 
182 void
183 taskq_destroy(struct taskq *tq)
184 {
185 	mtx_enter(&tq->tq_mtx);
186 	switch (tq->tq_state) {
187 	case TQ_S_CREATED:
188 		/* tq is still referenced by taskq_create_thread */
189 		tq->tq_state = TQ_S_DESTROYED;
190 		mtx_leave(&tq->tq_mtx);
191 		return;
192 
193 	case TQ_S_RUNNING:
194 		tq->tq_state = TQ_S_DESTROYED;
195 		break;
196 
197 	default:
198 		panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
199 	}
200 
201 	while (tq->tq_running > 0) {
202 		wakeup(tq);
203 		msleep_nsec(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy",
204 		    INFSLP);
205 	}
206 	mtx_leave(&tq->tq_mtx);
207 
208 	free(tq, M_DEVBUF, sizeof(*tq));
209 }
210 
211 void
212 taskq_create_thread(void *arg)
213 {
214 	struct taskq *tq = arg;
215 	int rv;
216 
217 	mtx_enter(&tq->tq_mtx);
218 
219 	switch (tq->tq_state) {
220 	case TQ_S_DESTROYED:
221 		mtx_leave(&tq->tq_mtx);
222 		free(tq, M_DEVBUF, sizeof(*tq));
223 		return;
224 
225 	case TQ_S_CREATED:
226 		tq->tq_state = TQ_S_RUNNING;
227 		break;
228 
229 	default:
230 		panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
231 	}
232 
233 	do {
234 		tq->tq_running++;
235 		mtx_leave(&tq->tq_mtx);
236 
237 		rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
238 
239 		mtx_enter(&tq->tq_mtx);
240 		if (rv != 0) {
241 			printf("unable to create thread for \"%s\" taskq\n",
242 			    tq->tq_name);
243 
244 			tq->tq_running--;
245 			/* could have been destroyed during kthread_create */
246 			if (tq->tq_state == TQ_S_DESTROYED &&
247 			    tq->tq_running == 0)
248 				wakeup_one(&tq->tq_running);
249 			break;
250 		}
251 	} while (tq->tq_running < tq->tq_nthreads);
252 
253 	mtx_leave(&tq->tq_mtx);
254 }
255 
256 void
257 taskq_barrier_task(void *p)
258 {
259 	struct taskq *tq = p;
260 	unsigned int gen;
261 
262 	mtx_enter(&tq->tq_mtx);
263 	tq->tq_bthreads++;
264 	wakeup(&tq->tq_bthreads);
265 
266 	gen = tq->tq_bgen;
267 	do {
268 		msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
269 		    PWAIT, "tqbarend", INFSLP);
270 	} while (gen == tq->tq_bgen);
271 	mtx_leave(&tq->tq_mtx);
272 }
273 
274 static void
275 taskq_do_barrier(struct taskq *tq)
276 {
277 	struct task t = TASK_INITIALIZER(taskq_barrier_task, tq);
278 	struct proc *thread = curproc;
279 	struct taskq_thread *tt;
280 
281 	mtx_enter(&tq->tq_mtx);
282 	tq->tq_barriers++;
283 
284 	/* is the barrier being run from a task inside the taskq? */
285 	SLIST_FOREACH(tt, &tq->tq_threads, tt_entry) {
286 		if (tt->tt_thread == thread) {
287 			tq->tq_bthreads++;
288 			wakeup(&tq->tq_bthreads);
289 			break;
290 		}
291 	}
292 
293 	while (tq->tq_bthreads < tq->tq_nthreads) {
294 		/* shove the task into the queue for a worker to pick up */
295 		SET(t.t_flags, TASK_ONQUEUE);
296 		TAILQ_INSERT_TAIL(&tq->tq_worklist, &t, t_entry);
297 		wakeup_one(tq);
298 
299 		msleep_nsec(&tq->tq_bthreads, &tq->tq_mtx,
300 		    PWAIT, "tqbar", INFSLP);
301 
302 		/*
303 		 * another thread running a barrier might have
304 		 * done this work for us.
305 		 */
306 		if (ISSET(t.t_flags, TASK_ONQUEUE))
307 			TAILQ_REMOVE(&tq->tq_worklist, &t, t_entry);
308 	}
309 
310 	if (--tq->tq_barriers == 0) {
311 		/* we're the last one out */
312 		tq->tq_bgen++;
313 		wakeup(&tq->tq_bgen);
314 		tq->tq_bthreads = 0;
315 	} else {
316 		unsigned int gen = tq->tq_bgen;
317 		do {
318 			msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
319 			    PWAIT, "tqbarwait", INFSLP);
320 		} while (gen == tq->tq_bgen);
321 	}
322 	mtx_leave(&tq->tq_mtx);
323 }
324 
325 void
326 taskq_barrier(struct taskq *tq)
327 {
328 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
329 
330 	taskq_do_barrier(tq);
331 }
332 
333 void
334 taskq_del_barrier(struct taskq *tq, struct task *t)
335 {
336 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
337 
338 	if (task_del(tq, t))
339 		return;
340 
341 	taskq_do_barrier(tq);
342 }
343 
344 void
345 task_set(struct task *t, void (*fn)(void *), void *arg)
346 {
347 	t->t_func = fn;
348 	t->t_arg = arg;
349 	t->t_flags = 0;
350 }
351 
352 int
353 task_add(struct taskq *tq, struct task *w)
354 {
355 	int rv = 0;
356 
357 	if (ISSET(w->t_flags, TASK_ONQUEUE))
358 		return (0);
359 
360 	mtx_enter(&tq->tq_mtx);
361 	if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
362 		rv = 1;
363 		SET(w->t_flags, TASK_ONQUEUE);
364 		TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
365 #if NKCOV > 0
366 		w->t_process = curproc->p_p;
367 #endif
368 	}
369 	mtx_leave(&tq->tq_mtx);
370 
371 	if (rv)
372 		wakeup_one(tq);
373 
374 	return (rv);
375 }
376 
377 int
378 task_del(struct taskq *tq, struct task *w)
379 {
380 	int rv = 0;
381 
382 	if (!ISSET(w->t_flags, TASK_ONQUEUE))
383 		return (0);
384 
385 	mtx_enter(&tq->tq_mtx);
386 	if (ISSET(w->t_flags, TASK_ONQUEUE)) {
387 		rv = 1;
388 		CLR(w->t_flags, TASK_ONQUEUE);
389 		TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
390 	}
391 	mtx_leave(&tq->tq_mtx);
392 
393 	return (rv);
394 }
395 
396 int
397 taskq_next_work(struct taskq *tq, struct task *work)
398 {
399 	struct task *next;
400 
401 	mtx_enter(&tq->tq_mtx);
402 	while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
403 		if (tq->tq_state != TQ_S_RUNNING) {
404 			mtx_leave(&tq->tq_mtx);
405 			return (0);
406 		}
407 
408 		msleep_nsec(tq, &tq->tq_mtx, PWAIT, "bored", INFSLP);
409 	}
410 
411 	TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
412 	CLR(next->t_flags, TASK_ONQUEUE);
413 
414 	*work = *next; /* copy to caller to avoid races */
415 
416 	next = TAILQ_FIRST(&tq->tq_worklist);
417 	mtx_leave(&tq->tq_mtx);
418 
419 	if (next != NULL && tq->tq_nthreads > 1)
420 		wakeup_one(tq);
421 
422 	return (1);
423 }
424 
425 void
426 taskq_thread(void *xtq)
427 {
428 	struct taskq_thread self = { .tt_thread = curproc };
429 	struct taskq *tq = xtq;
430 	struct task work;
431 	int last;
432 
433 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
434 		KERNEL_UNLOCK();
435 
436 	mtx_enter(&tq->tq_mtx);
437 	SLIST_INSERT_HEAD(&tq->tq_threads, &self, tt_entry);
438 	mtx_leave(&tq->tq_mtx);
439 
440 	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
441 
442 	while (taskq_next_work(tq, &work)) {
443 		WITNESS_LOCK(&tq->tq_lock_object, 0);
444 #if NKCOV > 0
445 		kcov_remote_enter(KCOV_REMOTE_COMMON, work.t_process);
446 #endif
447 		(*work.t_func)(work.t_arg);
448 #if NKCOV > 0
449 		kcov_remote_leave(KCOV_REMOTE_COMMON, work.t_process);
450 #endif
451 		WITNESS_UNLOCK(&tq->tq_lock_object, 0);
452 		sched_pause(yield);
453 	}
454 
455 	mtx_enter(&tq->tq_mtx);
456 	SLIST_REMOVE(&tq->tq_threads, &self, taskq_thread, tt_entry);
457 	last = (--tq->tq_running == 0);
458 	mtx_leave(&tq->tq_mtx);
459 
460 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
461 		KERNEL_LOCK();
462 
463 	if (last)
464 		wakeup_one(&tq->tq_running);
465 
466 	kthread_exit(0);
467 }
468