xref: /openbsd/sys/kern/kern_task.c (revision 9b7c3dbb)
1 /*	$OpenBSD: kern_task.c,v 1.18 2016/08/11 01:32:31 dlg Exp $ */
2 
3 /*
4  * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/param.h>
20 #include <sys/systm.h>
21 #include <sys/malloc.h>
22 #include <sys/mutex.h>
23 #include <sys/kthread.h>
24 #include <sys/task.h>
25 
26 #define TASK_ONQUEUE	1
27 
28 struct taskq {
29 	enum {
30 		TQ_S_CREATED,
31 		TQ_S_RUNNING,
32 		TQ_S_DESTROYED
33 	}			 tq_state;
34 	unsigned int		 tq_running;
35 	unsigned int		 tq_nthreads;
36 	unsigned int		 tq_flags;
37 	const char		*tq_name;
38 
39 	struct mutex		 tq_mtx;
40 	struct task_list	 tq_worklist;
41 };
42 
43 struct taskq taskq_sys = {
44 	TQ_S_CREATED,
45 	0,
46 	1,
47 	0,
48 	"systq",
49 	MUTEX_INITIALIZER(IPL_HIGH),
50 	TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist)
51 };
52 
53 struct taskq taskq_sys_mp = {
54 	TQ_S_CREATED,
55 	0,
56 	1,
57 	TASKQ_MPSAFE,
58 	"systqmp",
59 	MUTEX_INITIALIZER(IPL_HIGH),
60 	TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist)
61 };
62 
63 typedef int (*sleepfn)(const volatile void *, struct mutex *, int,
64     const char *, int);
65 
66 struct taskq *const systq = &taskq_sys;
67 struct taskq *const systqmp = &taskq_sys_mp;
68 
69 void	taskq_init(void); /* called in init_main.c */
70 void	taskq_create_thread(void *);
71 int	taskq_sleep(const volatile void *, struct mutex *, int,
72 	    const char *, int);
73 int	taskq_next_work(struct taskq *, struct task *, sleepfn);
74 void	taskq_thread(void *);
75 
76 void
77 taskq_init(void)
78 {
79 	kthread_create_deferred(taskq_create_thread, systq);
80 	kthread_create_deferred(taskq_create_thread, systqmp);
81 }
82 
83 struct taskq *
84 taskq_create(const char *name, unsigned int nthreads, int ipl,
85     unsigned int flags)
86 {
87 	struct taskq *tq;
88 
89 	tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
90 	if (tq == NULL)
91 		return (NULL);
92 
93 	tq->tq_state = TQ_S_CREATED;
94 	tq->tq_running = 0;
95 	tq->tq_nthreads = nthreads;
96 	tq->tq_name = name;
97 	tq->tq_flags = flags;
98 
99 	mtx_init(&tq->tq_mtx, ipl);
100 	TAILQ_INIT(&tq->tq_worklist);
101 
102 	/* try to create a thread to guarantee that tasks will be serviced */
103 	kthread_create_deferred(taskq_create_thread, tq);
104 
105 	return (tq);
106 }
107 
108 void
109 taskq_destroy(struct taskq *tq)
110 {
111 	mtx_enter(&tq->tq_mtx);
112 	switch (tq->tq_state) {
113 	case TQ_S_CREATED:
114 		/* tq is still referenced by taskq_create_thread */
115 		tq->tq_state = TQ_S_DESTROYED;
116 		mtx_leave(&tq->tq_mtx);
117 		return;
118 
119 	case TQ_S_RUNNING:
120 		tq->tq_state = TQ_S_DESTROYED;
121 		break;
122 
123 	default:
124 		panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
125 	}
126 
127 	while (tq->tq_running > 0) {
128 		wakeup(tq);
129 		msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0);
130 	}
131 	mtx_leave(&tq->tq_mtx);
132 
133 	free(tq, M_DEVBUF, sizeof(*tq));
134 }
135 
136 void
137 taskq_create_thread(void *arg)
138 {
139 	struct taskq *tq = arg;
140 	int rv;
141 
142 	mtx_enter(&tq->tq_mtx);
143 
144 	switch (tq->tq_state) {
145 	case TQ_S_DESTROYED:
146 		mtx_leave(&tq->tq_mtx);
147 		free(tq, M_DEVBUF, sizeof(*tq));
148 		return;
149 
150 	case TQ_S_CREATED:
151 		tq->tq_state = TQ_S_RUNNING;
152 		break;
153 
154 	default:
155 		panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
156 	}
157 
158 	do {
159 		tq->tq_running++;
160 		mtx_leave(&tq->tq_mtx);
161 
162 		rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
163 
164 		mtx_enter(&tq->tq_mtx);
165 		if (rv != 0) {
166 			printf("unable to create thread for \"%s\" taskq\n",
167 			    tq->tq_name);
168 
169 			tq->tq_running--;
170 			/* could have been destroyed during kthread_create */
171 			if (tq->tq_state == TQ_S_DESTROYED &&
172 			    tq->tq_running == 0)
173 				wakeup_one(&tq->tq_running);
174 			break;
175 		}
176 	} while (tq->tq_running < tq->tq_nthreads);
177 
178 	mtx_leave(&tq->tq_mtx);
179 }
180 
181 void
182 task_set(struct task *t, void (*fn)(void *), void *arg)
183 {
184 	t->t_func = fn;
185 	t->t_arg = arg;
186 	t->t_flags = 0;
187 }
188 
189 int
190 task_add(struct taskq *tq, struct task *w)
191 {
192 	int rv = 0;
193 
194 	if (ISSET(w->t_flags, TASK_ONQUEUE))
195 		return (0);
196 
197 	mtx_enter(&tq->tq_mtx);
198 	if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
199 		rv = 1;
200 		SET(w->t_flags, TASK_ONQUEUE);
201 		TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
202 	}
203 	mtx_leave(&tq->tq_mtx);
204 
205 	if (rv)
206 		wakeup_one(tq);
207 
208 	return (rv);
209 }
210 
211 int
212 task_del(struct taskq *tq, struct task *w)
213 {
214 	int rv = 0;
215 
216 	if (!ISSET(w->t_flags, TASK_ONQUEUE))
217 		return (0);
218 
219 	mtx_enter(&tq->tq_mtx);
220 	if (ISSET(w->t_flags, TASK_ONQUEUE)) {
221 		rv = 1;
222 		CLR(w->t_flags, TASK_ONQUEUE);
223 		TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
224 	}
225 	mtx_leave(&tq->tq_mtx);
226 
227 	return (rv);
228 }
229 
230 int
231 taskq_sleep(const volatile void *ident, struct mutex *mtx, int priority,
232     const char *wmesg, int tmo)
233 {
234 	u_int *flags = &curproc->p_flag;
235 	int rv;
236 
237 	atomic_clearbits_int(flags, P_CANTSLEEP);
238 	rv = msleep(ident, mtx, priority, wmesg, tmo);
239 	atomic_setbits_int(flags, P_CANTSLEEP);
240 
241 	return (tmo);
242 }
243 
244 int
245 taskq_next_work(struct taskq *tq, struct task *work, sleepfn tqsleep)
246 {
247 	struct task *next;
248 
249 	mtx_enter(&tq->tq_mtx);
250 	while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
251 		if (tq->tq_state != TQ_S_RUNNING) {
252 			mtx_leave(&tq->tq_mtx);
253 			return (0);
254 		}
255 
256 		tqsleep(tq, &tq->tq_mtx, PWAIT, "bored", 0);
257 	}
258 
259 	TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
260 	CLR(next->t_flags, TASK_ONQUEUE);
261 
262 	*work = *next; /* copy to caller to avoid races */
263 
264 	next = TAILQ_FIRST(&tq->tq_worklist);
265 	mtx_leave(&tq->tq_mtx);
266 
267 	if (next != NULL && tq->tq_nthreads > 1)
268 		wakeup_one(tq);
269 
270 	return (1);
271 }
272 
273 void
274 taskq_thread(void *xtq)
275 {
276 	sleepfn tqsleep = msleep;
277 	struct taskq *tq = xtq;
278 	struct task work;
279 	int last;
280 
281 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
282 		KERNEL_UNLOCK();
283 
284 	if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP)) {
285 		tqsleep = taskq_sleep;
286 		atomic_setbits_int(&curproc->p_flag, P_CANTSLEEP);
287 	}
288 
289 	while (taskq_next_work(tq, &work, tqsleep)) {
290 		(*work.t_func)(work.t_arg);
291 		sched_pause();
292 	}
293 
294 	mtx_enter(&tq->tq_mtx);
295 	last = (--tq->tq_running == 0);
296 	mtx_leave(&tq->tq_mtx);
297 
298 	if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP))
299 		atomic_clearbits_int(&curproc->p_flag, P_CANTSLEEP);
300 
301 	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
302 		KERNEL_LOCK();
303 
304 	if (last)
305 		wakeup_one(&tq->tq_running);
306 
307 	kthread_exit(0);
308 }
309