xref: /netbsd/sys/kern/subr_workqueue.c (revision ec91f812)
1 /*	$NetBSD: subr_workqueue.c,v 1.41 2022/10/29 11:41:00 riastradh Exp $	*/
2 
3 /*-
4  * Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi,
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  */
28 
29 #include <sys/cdefs.h>
30 __KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.41 2022/10/29 11:41:00 riastradh Exp $");
31 
32 #include <sys/param.h>
33 #include <sys/cpu.h>
34 #include <sys/systm.h>
35 #include <sys/kthread.h>
36 #include <sys/kmem.h>
37 #include <sys/proc.h>
38 #include <sys/workqueue.h>
39 #include <sys/mutex.h>
40 #include <sys/condvar.h>
41 #include <sys/sdt.h>
42 #include <sys/queue.h>
43 
44 typedef struct work_impl {
45 	SIMPLEQ_ENTRY(work_impl) wk_entry;
46 } work_impl_t;
47 
48 SIMPLEQ_HEAD(workqhead, work_impl);
49 
50 struct workqueue_queue {
51 	kmutex_t q_mutex;
52 	kcondvar_t q_cv;
53 	struct workqhead q_queue_pending;
54 	struct workqhead q_queue_running;
55 	lwp_t *q_worker;
56 };
57 
58 struct workqueue {
59 	void (*wq_func)(struct work *, void *);
60 	void *wq_arg;
61 	int wq_flags;
62 
63 	char wq_name[MAXCOMLEN];
64 	pri_t wq_prio;
65 	void *wq_ptr;
66 };
67 
68 #define	WQ_SIZE		(roundup2(sizeof(struct workqueue), coherency_unit))
69 #define	WQ_QUEUE_SIZE	(roundup2(sizeof(struct workqueue_queue), coherency_unit))
70 
71 #define	POISON	0xaabbccdd
72 
73 SDT_PROBE_DEFINE7(sdt, kernel, workqueue, create,
74     "struct workqueue *"/*wq*/,
75     "const char *"/*name*/,
76     "void (*)(struct work *, void *)"/*func*/,
77     "void *"/*arg*/,
78     "pri_t"/*prio*/,
79     "int"/*ipl*/,
80     "int"/*flags*/);
81 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, destroy,
82     "struct workqueue *"/*wq*/);
83 
84 SDT_PROBE_DEFINE3(sdt, kernel, workqueue, enqueue,
85     "struct workqueue *"/*wq*/,
86     "struct work *"/*wk*/,
87     "struct cpu_info *"/*ci*/);
88 SDT_PROBE_DEFINE4(sdt, kernel, workqueue, entry,
89     "struct workqueue *"/*wq*/,
90     "struct work *"/*wk*/,
91     "void (*)(struct work *, void *)"/*func*/,
92     "void *"/*arg*/);
93 SDT_PROBE_DEFINE4(sdt, kernel, workqueue, return,
94     "struct workqueue *"/*wq*/,
95     "struct work *"/*wk*/,
96     "void (*)(struct work *, void *)"/*func*/,
97     "void *"/*arg*/);
98 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__start,
99     "struct workqueue *"/*wq*/,
100     "struct work *"/*wk*/);
101 SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__done,
102     "struct workqueue *"/*wq*/,
103     "struct work *"/*wk*/);
104 
105 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__start,
106     "struct workqueue *"/*wq*/);
107 SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__done,
108     "struct workqueue *"/*wq*/);
109 
110 static size_t
workqueue_size(int flags)111 workqueue_size(int flags)
112 {
113 
114 	return WQ_SIZE
115 	    + ((flags & WQ_PERCPU) != 0 ? ncpu : 1) * WQ_QUEUE_SIZE
116 	    + coherency_unit;
117 }
118 
119 static struct workqueue_queue *
workqueue_queue_lookup(struct workqueue * wq,struct cpu_info * ci)120 workqueue_queue_lookup(struct workqueue *wq, struct cpu_info *ci)
121 {
122 	u_int idx = 0;
123 
124 	if (wq->wq_flags & WQ_PERCPU) {
125 		idx = ci ? cpu_index(ci) : cpu_index(curcpu());
126 	}
127 
128 	return (void *)((uintptr_t)(wq) + WQ_SIZE + (idx * WQ_QUEUE_SIZE));
129 }
130 
131 static void
workqueue_runlist(struct workqueue * wq,struct workqhead * list)132 workqueue_runlist(struct workqueue *wq, struct workqhead *list)
133 {
134 	work_impl_t *wk;
135 	work_impl_t *next;
136 
137 	/*
138 	 * note that "list" is not a complete SIMPLEQ.
139 	 */
140 
141 	for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) {
142 		next = SIMPLEQ_NEXT(wk, wk_entry);
143 		SDT_PROBE4(sdt, kernel, workqueue, entry,
144 		    wq, wk, wq->wq_func, wq->wq_arg);
145 		(*wq->wq_func)((void *)wk, wq->wq_arg);
146 		SDT_PROBE4(sdt, kernel, workqueue, return,
147 		    wq, wk, wq->wq_func, wq->wq_arg);
148 	}
149 }
150 
151 static void
workqueue_worker(void * cookie)152 workqueue_worker(void *cookie)
153 {
154 	struct workqueue *wq = cookie;
155 	struct workqueue_queue *q;
156 	int s;
157 
158 	/* find the workqueue of this kthread */
159 	q = workqueue_queue_lookup(wq, curlwp->l_cpu);
160 
161 	if (wq->wq_flags & WQ_FPU)
162 		s = kthread_fpu_enter();
163 	for (;;) {
164 		/*
165 		 * we violate abstraction of SIMPLEQ.
166 		 */
167 
168 		mutex_enter(&q->q_mutex);
169 		while (SIMPLEQ_EMPTY(&q->q_queue_pending))
170 			cv_wait(&q->q_cv, &q->q_mutex);
171 		KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running));
172 		q->q_queue_running.sqh_first =
173 		    q->q_queue_pending.sqh_first; /* XXX */
174 		SIMPLEQ_INIT(&q->q_queue_pending);
175 		mutex_exit(&q->q_mutex);
176 
177 		workqueue_runlist(wq, &q->q_queue_running);
178 
179 		mutex_enter(&q->q_mutex);
180 		KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running));
181 		SIMPLEQ_INIT(&q->q_queue_running);
182 		/* Wake up workqueue_wait */
183 		cv_broadcast(&q->q_cv);
184 		mutex_exit(&q->q_mutex);
185 	}
186 	if (wq->wq_flags & WQ_FPU)
187 		kthread_fpu_exit(s);
188 }
189 
190 static void
workqueue_init(struct workqueue * wq,const char * name,void (* callback_func)(struct work *,void *),void * callback_arg,pri_t prio,int ipl)191 workqueue_init(struct workqueue *wq, const char *name,
192     void (*callback_func)(struct work *, void *), void *callback_arg,
193     pri_t prio, int ipl)
194 {
195 
196 	KASSERT(sizeof(wq->wq_name) > strlen(name));
197 	strncpy(wq->wq_name, name, sizeof(wq->wq_name));
198 
199 	wq->wq_prio = prio;
200 	wq->wq_func = callback_func;
201 	wq->wq_arg = callback_arg;
202 }
203 
204 static int
workqueue_initqueue(struct workqueue * wq,struct workqueue_queue * q,int ipl,struct cpu_info * ci)205 workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q,
206     int ipl, struct cpu_info *ci)
207 {
208 	int error, ktf;
209 
210 	KASSERT(q->q_worker == NULL);
211 
212 	mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl);
213 	cv_init(&q->q_cv, wq->wq_name);
214 	SIMPLEQ_INIT(&q->q_queue_pending);
215 	SIMPLEQ_INIT(&q->q_queue_running);
216 	ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0);
217 	if (wq->wq_prio < PRI_KERNEL)
218 		ktf |= KTHREAD_TS;
219 	if (ci) {
220 		error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker,
221 		    wq, &q->q_worker, "%s/%u", wq->wq_name, ci->ci_index);
222 	} else {
223 		error = kthread_create(wq->wq_prio, ktf, ci, workqueue_worker,
224 		    wq, &q->q_worker, "%s", wq->wq_name);
225 	}
226 	if (error != 0) {
227 		mutex_destroy(&q->q_mutex);
228 		cv_destroy(&q->q_cv);
229 		KASSERT(q->q_worker == NULL);
230 	}
231 	return error;
232 }
233 
234 struct workqueue_exitargs {
235 	work_impl_t wqe_wk;
236 	struct workqueue_queue *wqe_q;
237 };
238 
239 static void
workqueue_exit(struct work * wk,void * arg)240 workqueue_exit(struct work *wk, void *arg)
241 {
242 	struct workqueue_exitargs *wqe = (void *)wk;
243 	struct workqueue_queue *q = wqe->wqe_q;
244 
245 	/*
246 	 * only competition at this point is workqueue_finiqueue.
247 	 */
248 
249 	KASSERT(q->q_worker == curlwp);
250 	KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
251 	mutex_enter(&q->q_mutex);
252 	q->q_worker = NULL;
253 	cv_broadcast(&q->q_cv);
254 	mutex_exit(&q->q_mutex);
255 	kthread_exit(0);
256 }
257 
258 static void
workqueue_finiqueue(struct workqueue * wq,struct workqueue_queue * q)259 workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q)
260 {
261 	struct workqueue_exitargs wqe;
262 
263 	KASSERT(wq->wq_func == workqueue_exit);
264 
265 	wqe.wqe_q = q;
266 	KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
267 	KASSERT(q->q_worker != NULL);
268 	mutex_enter(&q->q_mutex);
269 	SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry);
270 	cv_broadcast(&q->q_cv);
271 	while (q->q_worker != NULL) {
272 		cv_wait(&q->q_cv, &q->q_mutex);
273 	}
274 	mutex_exit(&q->q_mutex);
275 	mutex_destroy(&q->q_mutex);
276 	cv_destroy(&q->q_cv);
277 }
278 
279 /* --- */
280 
281 int
workqueue_create(struct workqueue ** wqp,const char * name,void (* callback_func)(struct work *,void *),void * callback_arg,pri_t prio,int ipl,int flags)282 workqueue_create(struct workqueue **wqp, const char *name,
283     void (*callback_func)(struct work *, void *), void *callback_arg,
284     pri_t prio, int ipl, int flags)
285 {
286 	struct workqueue *wq;
287 	struct workqueue_queue *q;
288 	void *ptr;
289 	int error = 0;
290 
291 	CTASSERT(sizeof(work_impl_t) <= sizeof(struct work));
292 
293 	ptr = kmem_zalloc(workqueue_size(flags), KM_SLEEP);
294 	wq = (void *)roundup2((uintptr_t)ptr, coherency_unit);
295 	wq->wq_ptr = ptr;
296 	wq->wq_flags = flags;
297 
298 	workqueue_init(wq, name, callback_func, callback_arg, prio, ipl);
299 
300 	if (flags & WQ_PERCPU) {
301 		struct cpu_info *ci;
302 		CPU_INFO_ITERATOR cii;
303 
304 		/* create the work-queue for each CPU */
305 		for (CPU_INFO_FOREACH(cii, ci)) {
306 			q = workqueue_queue_lookup(wq, ci);
307 			error = workqueue_initqueue(wq, q, ipl, ci);
308 			if (error) {
309 				break;
310 			}
311 		}
312 	} else {
313 		/* initialize a work-queue */
314 		q = workqueue_queue_lookup(wq, NULL);
315 		error = workqueue_initqueue(wq, q, ipl, NULL);
316 	}
317 
318 	if (error != 0) {
319 		workqueue_destroy(wq);
320 	} else {
321 		*wqp = wq;
322 	}
323 
324 	return error;
325 }
326 
327 static bool
workqueue_q_wait(struct workqueue_queue * q,work_impl_t * wk_target)328 workqueue_q_wait(struct workqueue_queue *q, work_impl_t *wk_target)
329 {
330 	work_impl_t *wk;
331 	bool found = false;
332 
333 	mutex_enter(&q->q_mutex);
334 	if (q->q_worker == curlwp)
335 		goto out;
336     again:
337 	SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) {
338 		if (wk == wk_target)
339 			goto found;
340 	}
341 	SIMPLEQ_FOREACH(wk, &q->q_queue_running, wk_entry) {
342 		if (wk == wk_target)
343 			goto found;
344 	}
345     found:
346 	if (wk != NULL) {
347 		found = true;
348 		cv_wait(&q->q_cv, &q->q_mutex);
349 		goto again;
350 	}
351     out:
352 	mutex_exit(&q->q_mutex);
353 
354 	return found;
355 }
356 
357 /*
358  * Wait for a specified work to finish.  The caller must ensure that no new
359  * work will be enqueued before calling workqueue_wait.  Note that if the
360  * workqueue is WQ_PERCPU, the caller can enqueue a new work to another queue
361  * other than the waiting queue.
362  */
363 void
workqueue_wait(struct workqueue * wq,struct work * wk)364 workqueue_wait(struct workqueue *wq, struct work *wk)
365 {
366 	struct workqueue_queue *q;
367 	bool found;
368 
369 	ASSERT_SLEEPABLE();
370 
371 	SDT_PROBE2(sdt, kernel, workqueue, wait__start,  wq, wk);
372 	if (ISSET(wq->wq_flags, WQ_PERCPU)) {
373 		struct cpu_info *ci;
374 		CPU_INFO_ITERATOR cii;
375 		for (CPU_INFO_FOREACH(cii, ci)) {
376 			q = workqueue_queue_lookup(wq, ci);
377 			found = workqueue_q_wait(q, (work_impl_t *)wk);
378 			if (found)
379 				break;
380 		}
381 	} else {
382 		q = workqueue_queue_lookup(wq, NULL);
383 		(void) workqueue_q_wait(q, (work_impl_t *)wk);
384 	}
385 	SDT_PROBE2(sdt, kernel, workqueue, wait__done,  wq, wk);
386 }
387 
388 void
workqueue_destroy(struct workqueue * wq)389 workqueue_destroy(struct workqueue *wq)
390 {
391 	struct workqueue_queue *q;
392 	struct cpu_info *ci;
393 	CPU_INFO_ITERATOR cii;
394 
395 	ASSERT_SLEEPABLE();
396 
397 	SDT_PROBE1(sdt, kernel, workqueue, exit__start,  wq);
398 	wq->wq_func = workqueue_exit;
399 	for (CPU_INFO_FOREACH(cii, ci)) {
400 		q = workqueue_queue_lookup(wq, ci);
401 		if (q->q_worker != NULL) {
402 			workqueue_finiqueue(wq, q);
403 		}
404 	}
405 	SDT_PROBE1(sdt, kernel, workqueue, exit__done,  wq);
406 	kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags));
407 }
408 
409 #ifdef DEBUG
410 static void
workqueue_check_duplication(struct workqueue_queue * q,work_impl_t * wk)411 workqueue_check_duplication(struct workqueue_queue *q, work_impl_t *wk)
412 {
413 	work_impl_t *_wk;
414 
415 	SIMPLEQ_FOREACH(_wk, &q->q_queue_pending, wk_entry) {
416 		if (_wk == wk)
417 			panic("%s: tried to enqueue a queued work", __func__);
418 	}
419 }
420 #endif
421 
422 void
workqueue_enqueue(struct workqueue * wq,struct work * wk0,struct cpu_info * ci)423 workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci)
424 {
425 	struct workqueue_queue *q;
426 	work_impl_t *wk = (void *)wk0;
427 
428 	SDT_PROBE3(sdt, kernel, workqueue, enqueue,  wq, wk0, ci);
429 
430 	KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL);
431 	q = workqueue_queue_lookup(wq, ci);
432 
433 	mutex_enter(&q->q_mutex);
434 #ifdef DEBUG
435 	workqueue_check_duplication(q, wk);
436 #endif
437 	SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry);
438 	cv_broadcast(&q->q_cv);
439 	mutex_exit(&q->q_mutex);
440 }
441