1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #include <sys/zfs_context.h>
27 
28 int taskq_now;
29 taskq_t *system_taskq;
30 
31 typedef struct task {
32 	struct task	*task_next;
33 	struct task	*task_prev;
34 	task_func_t	*task_func;
35 	void		*task_arg;
36 } task_t;
37 
38 #define	TASKQ_ACTIVE	0x00010000
39 
40 struct taskq {
41 	kmutex_t	tq_lock;
42 	krwlock_t	tq_threadlock;
43 	kcondvar_t	tq_dispatch_cv;
44 	kcondvar_t	tq_wait_cv;
45 	thread_t	*tq_threadlist;
46 	int		tq_flags;
47 	int		tq_active;
48 	int		tq_nthreads;
49 	int		tq_nalloc;
50 	int		tq_minalloc;
51 	int		tq_maxalloc;
52 	task_t		*tq_freelist;
53 	task_t		tq_task;
54 };
55 
56 static task_t *
57 task_alloc(taskq_t *tq, int tqflags)
58 {
59 	task_t *t;
60 
61 	if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
62 		tq->tq_freelist = t->task_next;
63 	} else {
64 		mutex_exit(&tq->tq_lock);
65 		if (tq->tq_nalloc >= tq->tq_maxalloc) {
66 			if (!(tqflags & KM_SLEEP)) {
67 				mutex_enter(&tq->tq_lock);
68 				return (NULL);
69 			}
70 			/*
71 			 * We don't want to exceed tq_maxalloc, but we can't
72 			 * wait for other tasks to complete (and thus free up
73 			 * task structures) without risking deadlock with
74 			 * the caller.  So, we just delay for one second
75 			 * to throttle the allocation rate.
76 			 */
77 			delay(hz);
78 		}
79 
80 		/* Clean up TQ_FRONT from tqflags before passing it to kmem */
81 		t = kmem_alloc(sizeof (task_t),
82 		    tqflags & (KM_SLEEP | KM_NOSLEEP));
83 		mutex_enter(&tq->tq_lock);
84 		if (t != NULL)
85 			tq->tq_nalloc++;
86 	}
87 	return (t);
88 }
89 
90 static void
91 task_free(taskq_t *tq, task_t *t)
92 {
93 	if (tq->tq_nalloc <= tq->tq_minalloc) {
94 		t->task_next = tq->tq_freelist;
95 		tq->tq_freelist = t;
96 	} else {
97 		tq->tq_nalloc--;
98 		mutex_exit(&tq->tq_lock);
99 		kmem_free(t, sizeof (task_t));
100 		mutex_enter(&tq->tq_lock);
101 	}
102 }
103 
104 taskqid_t
105 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
106 {
107 	task_t *t;
108 
109 	if (taskq_now) {
110 		func(arg);
111 		return (1);
112 	}
113 
114 	mutex_enter(&tq->tq_lock);
115 	ASSERT(tq->tq_flags & TASKQ_ACTIVE);
116 	if ((t = task_alloc(tq, tqflags)) == NULL) {
117 		mutex_exit(&tq->tq_lock);
118 		return (0);
119 	}
120 	if (tqflags & TQ_FRONT) {
121 		t->task_next = tq->tq_task.task_next;
122 		t->task_prev = &tq->tq_task;
123 	} else {
124 		t->task_next = &tq->tq_task;
125 		t->task_prev = tq->tq_task.task_prev;
126 	}
127 	t->task_next->task_prev = t;
128 	t->task_prev->task_next = t;
129 	t->task_func = func;
130 	t->task_arg = arg;
131 	cv_signal(&tq->tq_dispatch_cv);
132 	mutex_exit(&tq->tq_lock);
133 	return (1);
134 }
135 
136 void
137 taskq_wait(taskq_t *tq)
138 {
139 	mutex_enter(&tq->tq_lock);
140 	while (tq->tq_task.task_next != &tq->tq_task || tq->tq_active != 0)
141 		cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
142 	mutex_exit(&tq->tq_lock);
143 }
144 
145 static void *
146 taskq_thread(void *arg)
147 {
148 	taskq_t *tq = arg;
149 	task_t *t;
150 
151 	mutex_enter(&tq->tq_lock);
152 	while (tq->tq_flags & TASKQ_ACTIVE) {
153 		if ((t = tq->tq_task.task_next) == &tq->tq_task) {
154 			if (--tq->tq_active == 0)
155 				cv_broadcast(&tq->tq_wait_cv);
156 			cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
157 			tq->tq_active++;
158 			continue;
159 		}
160 		t->task_prev->task_next = t->task_next;
161 		t->task_next->task_prev = t->task_prev;
162 		mutex_exit(&tq->tq_lock);
163 
164 		rw_enter(&tq->tq_threadlock, RW_READER);
165 		t->task_func(t->task_arg);
166 		rw_exit(&tq->tq_threadlock);
167 
168 		mutex_enter(&tq->tq_lock);
169 		task_free(tq, t);
170 	}
171 	tq->tq_nthreads--;
172 	cv_broadcast(&tq->tq_wait_cv);
173 	mutex_exit(&tq->tq_lock);
174 	return (NULL);
175 }
176 
177 /*ARGSUSED*/
178 taskq_t *
179 taskq_create(const char *name, int nthreads, pri_t pri,
180 	int minalloc, int maxalloc, uint_t flags)
181 {
182 	taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP);
183 	int t;
184 
185 	if (flags & TASKQ_THREADS_CPU_PCT) {
186 		int pct;
187 		ASSERT3S(nthreads, >=, 0);
188 		ASSERT3S(nthreads, <=, 100);
189 		pct = MIN(nthreads, 100);
190 		pct = MAX(pct, 0);
191 
192 		nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100;
193 		nthreads = MAX(nthreads, 1);	/* need at least 1 thread */
194 	} else {
195 		ASSERT3S(nthreads, >=, 1);
196 	}
197 
198 	rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);
199 	mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
200 	cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
201 	cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
202 	tq->tq_flags = flags | TASKQ_ACTIVE;
203 	tq->tq_active = nthreads;
204 	tq->tq_nthreads = nthreads;
205 	tq->tq_minalloc = minalloc;
206 	tq->tq_maxalloc = maxalloc;
207 	tq->tq_task.task_next = &tq->tq_task;
208 	tq->tq_task.task_prev = &tq->tq_task;
209 	tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP);
210 
211 	if (flags & TASKQ_PREPOPULATE) {
212 		mutex_enter(&tq->tq_lock);
213 		while (minalloc-- > 0)
214 			task_free(tq, task_alloc(tq, KM_SLEEP));
215 		mutex_exit(&tq->tq_lock);
216 	}
217 
218 	for (t = 0; t < nthreads; t++)
219 		(void) thr_create(0, 0, taskq_thread,
220 		    tq, THR_BOUND, &tq->tq_threadlist[t]);
221 
222 	return (tq);
223 }
224 
225 void
226 taskq_destroy(taskq_t *tq)
227 {
228 	int t;
229 	int nthreads = tq->tq_nthreads;
230 
231 	taskq_wait(tq);
232 
233 	mutex_enter(&tq->tq_lock);
234 
235 	tq->tq_flags &= ~TASKQ_ACTIVE;
236 	cv_broadcast(&tq->tq_dispatch_cv);
237 
238 	while (tq->tq_nthreads != 0)
239 		cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
240 
241 	tq->tq_minalloc = 0;
242 	while (tq->tq_nalloc != 0) {
243 		ASSERT(tq->tq_freelist != NULL);
244 		task_free(tq, task_alloc(tq, KM_SLEEP));
245 	}
246 
247 	mutex_exit(&tq->tq_lock);
248 
249 	for (t = 0; t < nthreads; t++)
250 		(void) thr_join(tq->tq_threadlist[t], NULL, NULL);
251 
252 	kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t));
253 
254 	rw_destroy(&tq->tq_threadlock);
255 	mutex_destroy(&tq->tq_lock);
256 	cv_destroy(&tq->tq_dispatch_cv);
257 	cv_destroy(&tq->tq_wait_cv);
258 
259 	kmem_free(tq, sizeof (taskq_t));
260 }
261 
262 int
263 taskq_member(taskq_t *tq, void *t)
264 {
265 	int i;
266 
267 	if (taskq_now)
268 		return (1);
269 
270 	for (i = 0; i < tq->tq_nthreads; i++)
271 		if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t)
272 			return (1);
273 
274 	return (0);
275 }
276 
277 void
278 system_taskq_init(void)
279 {
280 	system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512,
281 	    TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
282 }
283 
284 void
285 system_taskq_fini(void)
286 {
287 	taskq_destroy(system_taskq);
288 	system_taskq = NULL; /* defensive */
289 }
290