1 /*
2  * Copyright (c) 2009 Pawel Jakub Dawidek <pjd@FreeBSD.org>
3  * All rights reserved.
4  *
5  * Copyright (c) 2012 Spectra Logic Corporation.  All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  */
28 
29 #include <sys/cdefs.h>
30 __FBSDID("$FreeBSD$");
31 
32 #include <sys/param.h>
33 #include <sys/ck.h>
34 #include <sys/epoch.h>
35 #include <sys/kernel.h>
36 #include <sys/kmem.h>
37 #include <sys/lock.h>
38 #include <sys/mutex.h>
39 #include <sys/queue.h>
40 #include <sys/taskq.h>
41 #include <sys/taskqueue.h>
42 #include <sys/zfs_context.h>
43 
44 #if defined(__i386__) || defined(__amd64__) || defined(__aarch64__)
45 #include <machine/pcb.h>
46 #endif
47 
48 #include <vm/uma.h>
49 
50 #if __FreeBSD_version < 1201522
51 #define	taskqueue_start_threads_in_proc(tqp, count, pri, proc, name, ...) \
52     taskqueue_start_threads(tqp, count, pri, name, __VA_ARGS__)
53 #endif
54 
55 static uint_t taskq_tsd;
56 static uma_zone_t taskq_zone;
57 
58 /*
59  * Global system-wide dynamic task queue available for all consumers. This
60  * taskq is not intended for long-running tasks; instead, a dedicated taskq
61  * should be created.
62  */
63 taskq_t *system_taskq = NULL;
64 taskq_t *system_delay_taskq = NULL;
65 taskq_t *dynamic_taskq = NULL;
66 
67 proc_t *system_proc;
68 
69 extern int uma_align_cache;
70 
71 static MALLOC_DEFINE(M_TASKQ, "taskq", "taskq structures");
72 
73 static CK_LIST_HEAD(tqenthashhead, taskq_ent) *tqenthashtbl;
74 static unsigned long tqenthash;
75 static unsigned long tqenthashlock;
76 static struct sx *tqenthashtbl_lock;
77 
78 static taskqid_t tqidnext;
79 
80 #define	TQIDHASH(tqid) (&tqenthashtbl[(tqid) & tqenthash])
81 #define	TQIDHASHLOCK(tqid) (&tqenthashtbl_lock[((tqid) & tqenthashlock)])
82 
83 #define	TIMEOUT_TASK 1
84 #define	NORMAL_TASK 2
85 
86 static void
87 system_taskq_init(void *arg)
88 {
89 	int i;
90 
91 	tsd_create(&taskq_tsd, NULL);
92 	tqenthashtbl = hashinit(mp_ncpus * 8, M_TASKQ, &tqenthash);
93 	tqenthashlock = (tqenthash + 1) / 8;
94 	if (tqenthashlock > 0)
95 		tqenthashlock--;
96 	tqenthashtbl_lock =
97 	    malloc(sizeof (*tqenthashtbl_lock) * (tqenthashlock + 1),
98 	    M_TASKQ, M_WAITOK | M_ZERO);
99 	for (i = 0; i < tqenthashlock + 1; i++)
100 		sx_init_flags(&tqenthashtbl_lock[i], "tqenthash", SX_DUPOK);
101 	taskq_zone = uma_zcreate("taskq_zone", sizeof (taskq_ent_t),
102 	    NULL, NULL, NULL, NULL,
103 	    UMA_ALIGN_CACHE, 0);
104 	system_taskq = taskq_create("system_taskq", mp_ncpus, minclsyspri,
105 	    0, 0, 0);
106 	system_delay_taskq = taskq_create("system_delay_taskq", mp_ncpus,
107 	    minclsyspri, 0, 0, 0);
108 }
109 SYSINIT(system_taskq_init, SI_SUB_CONFIGURE, SI_ORDER_ANY, system_taskq_init,
110     NULL);
111 
112 static void
113 system_taskq_fini(void *arg)
114 {
115 	int i;
116 
117 	taskq_destroy(system_delay_taskq);
118 	taskq_destroy(system_taskq);
119 	uma_zdestroy(taskq_zone);
120 	tsd_destroy(&taskq_tsd);
121 	for (i = 0; i < tqenthashlock + 1; i++)
122 		sx_destroy(&tqenthashtbl_lock[i]);
123 	for (i = 0; i < tqenthash + 1; i++)
124 		VERIFY(CK_LIST_EMPTY(&tqenthashtbl[i]));
125 	free(tqenthashtbl_lock, M_TASKQ);
126 	free(tqenthashtbl, M_TASKQ);
127 }
128 SYSUNINIT(system_taskq_fini, SI_SUB_CONFIGURE, SI_ORDER_ANY, system_taskq_fini,
129     NULL);
130 
131 #ifdef __LP64__
132 static taskqid_t
133 __taskq_genid(void)
134 {
135 	taskqid_t tqid;
136 
137 	/*
138 	 * Assume a 64-bit counter will not wrap in practice.
139 	 */
140 	tqid = atomic_add_64_nv(&tqidnext, 1);
141 	VERIFY(tqid);
142 	return (tqid);
143 }
144 #else
145 static taskqid_t
146 __taskq_genid(void)
147 {
148 	taskqid_t tqid;
149 
150 	for (;;) {
151 		tqid = atomic_add_32_nv(&tqidnext, 1);
152 		if (__predict_true(tqid != 0))
153 			break;
154 	}
155 	VERIFY(tqid);
156 	return (tqid);
157 }
158 #endif
159 
160 static taskq_ent_t *
161 taskq_lookup(taskqid_t tqid)
162 {
163 	taskq_ent_t *ent = NULL;
164 
165 	sx_xlock(TQIDHASHLOCK(tqid));
166 	CK_LIST_FOREACH(ent, TQIDHASH(tqid), tqent_hash) {
167 		if (ent->tqent_id == tqid)
168 			break;
169 	}
170 	if (ent != NULL)
171 		refcount_acquire(&ent->tqent_rc);
172 	sx_xunlock(TQIDHASHLOCK(tqid));
173 	return (ent);
174 }
175 
176 static taskqid_t
177 taskq_insert(taskq_ent_t *ent)
178 {
179 	taskqid_t tqid;
180 
181 	tqid = __taskq_genid();
182 	ent->tqent_id = tqid;
183 	ent->tqent_registered = B_TRUE;
184 	sx_xlock(TQIDHASHLOCK(tqid));
185 	CK_LIST_INSERT_HEAD(TQIDHASH(tqid), ent, tqent_hash);
186 	sx_xunlock(TQIDHASHLOCK(tqid));
187 	return (tqid);
188 }
189 
190 static void
191 taskq_remove(taskq_ent_t *ent)
192 {
193 	taskqid_t tqid = ent->tqent_id;
194 
195 	if (!ent->tqent_registered)
196 		return;
197 
198 	sx_xlock(TQIDHASHLOCK(tqid));
199 	CK_LIST_REMOVE(ent, tqent_hash);
200 	sx_xunlock(TQIDHASHLOCK(tqid));
201 	ent->tqent_registered = B_FALSE;
202 }
203 
204 static void
205 taskq_tsd_set(void *context)
206 {
207 	taskq_t *tq = context;
208 
209 #if defined(__amd64__) || defined(__aarch64__)
210 	if (context != NULL && tsd_get(taskq_tsd) == NULL)
211 		fpu_kern_thread(FPU_KERN_NORMAL);
212 #endif
213 	tsd_set(taskq_tsd, tq);
214 }
215 
216 static taskq_t *
217 taskq_create_impl(const char *name, int nthreads, pri_t pri,
218     proc_t *proc __maybe_unused, uint_t flags)
219 {
220 	taskq_t *tq;
221 
222 	if ((flags & TASKQ_THREADS_CPU_PCT) != 0)
223 		nthreads = MAX((mp_ncpus * nthreads) / 100, 1);
224 
225 	tq = kmem_alloc(sizeof (*tq), KM_SLEEP);
226 	tq->tq_queue = taskqueue_create(name, M_WAITOK,
227 	    taskqueue_thread_enqueue, &tq->tq_queue);
228 	taskqueue_set_callback(tq->tq_queue, TASKQUEUE_CALLBACK_TYPE_INIT,
229 	    taskq_tsd_set, tq);
230 	taskqueue_set_callback(tq->tq_queue, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN,
231 	    taskq_tsd_set, NULL);
232 	(void) taskqueue_start_threads_in_proc(&tq->tq_queue, nthreads, pri,
233 	    proc, "%s", name);
234 
235 	return ((taskq_t *)tq);
236 }
237 
238 taskq_t *
239 taskq_create(const char *name, int nthreads, pri_t pri, int minalloc __unused,
240     int maxalloc __unused, uint_t flags)
241 {
242 	return (taskq_create_impl(name, nthreads, pri, system_proc, flags));
243 }
244 
245 taskq_t *
246 taskq_create_proc(const char *name, int nthreads, pri_t pri,
247     int minalloc __unused, int maxalloc __unused, proc_t *proc, uint_t flags)
248 {
249 	return (taskq_create_impl(name, nthreads, pri, proc, flags));
250 }
251 
252 void
253 taskq_destroy(taskq_t *tq)
254 {
255 
256 	taskqueue_free(tq->tq_queue);
257 	kmem_free(tq, sizeof (*tq));
258 }
259 
260 int
261 taskq_member(taskq_t *tq, kthread_t *thread)
262 {
263 
264 	return (taskqueue_member(tq->tq_queue, thread));
265 }
266 
267 taskq_t *
268 taskq_of_curthread(void)
269 {
270 	return (tsd_get(taskq_tsd));
271 }
272 
273 static void
274 taskq_free(taskq_ent_t *task)
275 {
276 	taskq_remove(task);
277 	if (refcount_release(&task->tqent_rc))
278 		uma_zfree(taskq_zone, task);
279 }
280 
281 int
282 taskq_cancel_id(taskq_t *tq, taskqid_t tid)
283 {
284 	uint32_t pend;
285 	int rc;
286 	taskq_ent_t *ent;
287 
288 	if (tid == 0)
289 		return (0);
290 
291 	if ((ent = taskq_lookup(tid)) == NULL)
292 		return (0);
293 
294 	ent->tqent_cancelled = B_TRUE;
295 	if (ent->tqent_type == TIMEOUT_TASK) {
296 		rc = taskqueue_cancel_timeout(tq->tq_queue,
297 		    &ent->tqent_timeout_task, &pend);
298 	} else
299 		rc = taskqueue_cancel(tq->tq_queue, &ent->tqent_task, &pend);
300 	if (rc == EBUSY) {
301 		taskqueue_drain(tq->tq_queue, &ent->tqent_task);
302 	} else if (pend) {
303 		/*
304 		 * Tasks normally free themselves when run, but here the task
305 		 * was cancelled so it did not free itself.
306 		 */
307 		taskq_free(ent);
308 	}
309 	/* Free the extra reference we added with taskq_lookup. */
310 	taskq_free(ent);
311 	return (rc);
312 }
313 
314 static void
315 taskq_run(void *arg, int pending __unused)
316 {
317 	taskq_ent_t *task = arg;
318 
319 	if (!task->tqent_cancelled)
320 		task->tqent_func(task->tqent_arg);
321 	taskq_free(task);
322 }
323 
324 taskqid_t
325 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
326     uint_t flags, clock_t expire_time)
327 {
328 	taskq_ent_t *task;
329 	taskqid_t tqid;
330 	clock_t timo;
331 	int mflag;
332 
333 	timo = expire_time - ddi_get_lbolt();
334 	if (timo <= 0)
335 		return (taskq_dispatch(tq, func, arg, flags));
336 
337 	if ((flags & (TQ_SLEEP | TQ_NOQUEUE)) == TQ_SLEEP)
338 		mflag = M_WAITOK;
339 	else
340 		mflag = M_NOWAIT;
341 
342 	task = uma_zalloc(taskq_zone, mflag);
343 	if (task == NULL)
344 		return (0);
345 	task->tqent_func = func;
346 	task->tqent_arg = arg;
347 	task->tqent_type = TIMEOUT_TASK;
348 	task->tqent_cancelled = B_FALSE;
349 	refcount_init(&task->tqent_rc, 1);
350 	tqid = taskq_insert(task);
351 	TIMEOUT_TASK_INIT(tq->tq_queue, &task->tqent_timeout_task, 0,
352 	    taskq_run, task);
353 
354 	taskqueue_enqueue_timeout(tq->tq_queue, &task->tqent_timeout_task,
355 	    timo);
356 	return (tqid);
357 }
358 
359 taskqid_t
360 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
361 {
362 	taskq_ent_t *task;
363 	int mflag, prio;
364 	taskqid_t tqid;
365 
366 	if ((flags & (TQ_SLEEP | TQ_NOQUEUE)) == TQ_SLEEP)
367 		mflag = M_WAITOK;
368 	else
369 		mflag = M_NOWAIT;
370 	/*
371 	 * If TQ_FRONT is given, we want higher priority for this task, so it
372 	 * can go at the front of the queue.
373 	 */
374 	prio = !!(flags & TQ_FRONT);
375 
376 	task = uma_zalloc(taskq_zone, mflag);
377 	if (task == NULL)
378 		return (0);
379 	refcount_init(&task->tqent_rc, 1);
380 	task->tqent_func = func;
381 	task->tqent_arg = arg;
382 	task->tqent_cancelled = B_FALSE;
383 	task->tqent_type = NORMAL_TASK;
384 	tqid = taskq_insert(task);
385 	TASK_INIT(&task->tqent_task, prio, taskq_run, task);
386 	taskqueue_enqueue(tq->tq_queue, &task->tqent_task);
387 	return (tqid);
388 }
389 
390 static void
391 taskq_run_ent(void *arg, int pending __unused)
392 {
393 	taskq_ent_t *task = arg;
394 
395 	task->tqent_func(task->tqent_arg);
396 }
397 
398 void
399 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint32_t flags,
400     taskq_ent_t *task)
401 {
402 	int prio;
403 
404 	/*
405 	 * If TQ_FRONT is given, we want higher priority for this task, so it
406 	 * can go at the front of the queue.
407 	 */
408 	prio = !!(flags & TQ_FRONT);
409 	task->tqent_cancelled = B_FALSE;
410 	task->tqent_registered = B_FALSE;
411 	task->tqent_id = 0;
412 	task->tqent_func = func;
413 	task->tqent_arg = arg;
414 
415 	TASK_INIT(&task->tqent_task, prio, taskq_run_ent, task);
416 	taskqueue_enqueue(tq->tq_queue, &task->tqent_task);
417 }
418 
419 void
420 taskq_wait(taskq_t *tq)
421 {
422 	taskqueue_quiesce(tq->tq_queue);
423 }
424 
425 void
426 taskq_wait_id(taskq_t *tq, taskqid_t tid)
427 {
428 	taskq_ent_t *ent;
429 
430 	if (tid == 0)
431 		return;
432 	if ((ent = taskq_lookup(tid)) == NULL)
433 		return;
434 
435 	taskqueue_drain(tq->tq_queue, &ent->tqent_task);
436 	taskq_free(ent);
437 }
438 
439 void
440 taskq_wait_outstanding(taskq_t *tq, taskqid_t id __unused)
441 {
442 	taskqueue_drain_all(tq->tq_queue);
443 }
444 
445 int
446 taskq_empty_ent(taskq_ent_t *t)
447 {
448 	return (t->tqent_task.ta_pending == 0);
449 }
450