xref: /dragonfly/sys/kern/kern_mpipe.c (revision 7c4f4eee)
1 /*
2  * (MPSAFE)
3  *
4  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
5  *
6  * This code is derived from software contributed to The DragonFly Project
7  * by Matthew Dillon <dillon@backplane.com>
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * 1. Redistributions of source code must retain the above copyright
14  *    notice, this list of conditions and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in
17  *    the documentation and/or other materials provided with the
18  *    distribution.
19  * 3. Neither the name of The DragonFly Project nor the names of its
20  *    contributors may be used to endorse or promote products derived
21  *    from this software without specific, prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
28  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
29  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
30  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
31  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
32  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
33  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34  * SUCH DAMAGE.
35  */
36 
37 #include <sys/param.h>
38 #include <sys/systm.h>
39 #include <sys/kernel.h>
40 #include <sys/slaballoc.h>
41 #include <sys/mbuf.h>
42 #include <sys/vmmeter.h>
43 #include <sys/lock.h>
44 #include <sys/thread.h>
45 #include <sys/globaldata.h>
46 #include <sys/mpipe.h>
47 #include <sys/kthread.h>
48 
49 struct mpipe_callback {
50 	STAILQ_ENTRY(mpipe_callback) entry;
51 	void (*func)(void *arg1, void *arg2);
52 	void *arg1;
53 	void *arg2;
54 };
55 
56 static MALLOC_DEFINE(M_MPIPEARY, "MPipe Array", "Auxiliary MPIPE structure");
57 
58 static void mpipe_thread(void *arg);
59 
60 /*
61  * Initialize a malloc pipeline for the specified malloc type and allocation
62  * size.  Create an array to cache up to nom_count buffers and preallocate
63  * them.
64  */
65 void
66 mpipe_init(malloc_pipe_t mpipe, malloc_type_t type, int bytes,
67 	int nnom, int nmax,
68 	int mpflags,
69 	void (*construct)(void *, void *),
70 	void (*deconstruct)(void *, void *),
71 	void *priv)
72 {
73     int n;
74 
75     if (nnom < 1)
76 	nnom = 1;
77     if (nmax < 0)
78 	nmax = 0x7FFF0000;	/* some very large number */
79     if (nmax < nnom)
80 	nmax = nnom;
81     bzero(mpipe, sizeof(struct malloc_pipe));
82     mpipe->type = type;
83     mpipe->bytes = bytes;
84     mpipe->mpflags = mpflags;
85     mpipe->construct = construct;
86     mpipe->deconstruct = deconstruct;
87     mpipe->priv = priv;
88     if ((mpflags & MPF_NOZERO) == 0)
89 	mpipe->mflags |= M_ZERO;
90     if (mpflags & MPF_INT)
91 	mpipe->mflags |= M_USE_RESERVE | M_USE_INTERRUPT_RESERVE;
92     mpipe->ary_count = nnom;
93     mpipe->max_count = nmax;
94     mpipe->array = kmalloc(nnom * sizeof(mpipe->array[0]), M_MPIPEARY,
95 			    M_WAITOK | M_ZERO);
96 
97     while (mpipe->free_count < nnom) {
98 	n = mpipe->free_count;
99 	mpipe->array[n] = kmalloc(bytes, mpipe->type, M_WAITOK | mpipe->mflags);
100 	if (construct)
101 	    construct(mpipe->array[n], priv);
102 	++mpipe->free_count;
103 	++mpipe->total_count;
104     }
105     STAILQ_INIT(&mpipe->queue);
106 
107     lwkt_token_init(&mpipe->token, "mpipe token");
108 
109     /*
110      * Create a support thread for the mpipe queue
111      */
112     if (mpflags & MPF_CALLBACK) {
113 	    kthread_create(mpipe_thread, mpipe, &mpipe->thread,
114 			   "mpipe_%s", type->ks_shortdesc);
115     }
116 }
117 
118 /*
119  * Destroy a previously initialized mpipe.  This routine can also safely be
120  * called on an uninitialized mpipe structure if it was zero'd or mpipe_done()
121  * was previously called on it.
122  */
123 void
124 mpipe_done(malloc_pipe_t mpipe)
125 {
126     void *buf;
127     int n;
128 
129     KKASSERT(mpipe->free_count == mpipe->total_count);	/* no outstanding mem */
130 
131     /*
132      * Clean up the kthread
133      */
134     lwkt_gettoken(&mpipe->token);
135     mpipe->mpflags |= MPF_EXITING;
136     while (mpipe->thread) {
137 	wakeup(&mpipe->queue);
138 	tsleep(mpipe, 0, "mpipex", 1);
139     }
140 
141     /*
142      * Clean up the mpipe buffers
143      */
144     for (n = mpipe->free_count - 1; n >= 0; --n) {
145 	buf = mpipe->array[n];
146 	mpipe->array[n] = NULL;
147 	KKASSERT(buf != NULL);
148 	if (mpipe->deconstruct)
149 	    mpipe->deconstruct(buf, mpipe->priv);
150 	kfree(buf, mpipe->type);
151     }
152     mpipe->free_count = 0;
153     mpipe->total_count = 0;
154     if (mpipe->array) {
155 	kfree(mpipe->array, M_MPIPEARY);
156 	mpipe->array = NULL;
157     }
158     lwkt_reltoken(&mpipe->token);
159     lwkt_token_uninit(&mpipe->token);
160 }
161 
162 /*
163  * mpipe support thread for request failures when mpipe_alloc_callback()
164  * is called.
165  */
166 static void
167 mpipe_thread(void *arg)
168 {
169     malloc_pipe_t mpipe = arg;
170     struct mpipe_callback *mcb;
171 
172     lwkt_gettoken(&mpipe->token);
173     while ((mpipe->mpflags & MPF_EXITING) == 0) {
174 	while (mpipe->free_count &&
175 	       (mcb = STAILQ_FIRST(&mpipe->queue)) != NULL) {
176 		STAILQ_REMOVE(&mpipe->queue, mcb, mpipe_callback, entry);
177 		mcb->func(mcb->arg1, mcb->arg2);
178 		kfree(mcb, M_MPIPEARY);
179 	}
180 	mpipe->mpflags |= MPF_QUEUEWAIT;
181 	tsleep(&mpipe->queue, 0, "wait", 0);
182     }
183     mpipe->thread = NULL;
184     wakeup(mpipe);
185     lwkt_reltoken(&mpipe->token);
186 }
187 
188 
189 /*
190  * Allocate an entry (inline suppot routine).  The allocation is guarenteed
191  * to return non-NULL up to the nominal count after which it may return NULL.
192  * Note that the implementation is defined to be allowed to block for short
193  * periods of time.
194  *
195  * Use mpipe_alloc_callback() for non-blocking operation with a callback
196  * Use mpipe_alloc_nowait() for non-blocking operation without a callback
197  * Use mpipe_alloc_waitok() for blocking operation & guarenteed non-NULL
198  */
199 static __inline
200 void *
201 _mpipe_alloc_locked(malloc_pipe_t mpipe, int mfailed)
202 {
203     void *buf;
204     int n;
205 
206     if ((n = mpipe->free_count) != 0) {
207 	/*
208 	 * Use a free entry if it exists.
209 	 */
210 	--n;
211 	buf = mpipe->array[n];
212 	mpipe->array[n] = NULL;	/* sanity check, not absolutely needed */
213 	mpipe->free_count = n;
214     } else if (mpipe->total_count >= mpipe->max_count || mfailed) {
215 	/*
216 	 * Return NULL if we have hit our limit
217 	 */
218 	buf = NULL;
219     } else {
220 	/*
221 	 * Otherwise try to malloc() non-blocking.
222 	 */
223 	buf = kmalloc(mpipe->bytes, mpipe->type, M_NOWAIT | mpipe->mflags);
224 	if (buf) {
225 	    ++mpipe->total_count;
226 	    if (mpipe->construct)
227 	        mpipe->construct(buf, mpipe->priv);
228 	}
229     }
230     return(buf);
231 }
232 
233 /*
234  * Nominal non-blocking mpipe allocation
235  */
236 void *
237 mpipe_alloc_nowait(malloc_pipe_t mpipe)
238 {
239     void *buf;
240 
241     lwkt_gettoken(&mpipe->token);
242     buf = _mpipe_alloc_locked(mpipe, 0);
243     lwkt_reltoken(&mpipe->token);
244 
245     return(buf);
246 }
247 
248 /*
249  * non-blocking mpipe allocation with callback for retry.
250  *
251  * If NULL is returned func(arg) is queued and will be called back when
252  * space is likely (but not necessarily) available.
253  *
254  * If non-NULL is returned func(arg) is ignored.
255  */
256 void *
257 mpipe_alloc_callback(malloc_pipe_t mpipe, void (*func)(void *arg1, void *arg2),
258 		     void *arg1, void *arg2)
259 {
260     struct mpipe_callback *mcb;
261     void *buf;
262 
263     lwkt_gettoken(&mpipe->token);
264     buf = _mpipe_alloc_locked(mpipe, 0);
265     if (buf == NULL) {
266 	mcb = kmalloc(sizeof(*mcb), M_MPIPEARY, M_INTWAIT);
267 	buf = _mpipe_alloc_locked(mpipe, 0);
268 	if (buf == NULL) {
269 	    mcb->func = func;
270 	    mcb->arg1 = arg1;
271 	    mcb->arg2 = arg2;
272 	    STAILQ_INSERT_TAIL(&mpipe->queue, mcb, entry);
273 	} else {
274 	    kfree(mcb, M_MPIPEARY);
275 	}
276     }
277     lwkt_reltoken(&mpipe->token);
278 
279     return(buf);
280 }
281 
282 /*
283  * This function can be called to nominally wait until resources are
284  * available and mpipe_alloc_nowait() is likely to return non-NULL.
285  *
286  * NOTE: mpipe_alloc_nowait() can still return NULL.
287  */
288 void
289 mpipe_wait(malloc_pipe_t mpipe)
290 {
291     if (mpipe->free_count == 0) {
292 	lwkt_gettoken(&mpipe->token);
293 	while ((mpipe->mpflags & MPF_EXITING) == 0) {
294 	    if (mpipe->free_count)
295 		    break;
296 	    mpipe->mpflags |= MPF_QUEUEWAIT;
297 	    tsleep(&mpipe->queue, 0, "wait", 0);
298 	}
299 	lwkt_reltoken(&mpipe->token);
300     }
301 }
302 
303 /*
304  * Allocate an entry, block until the allocation succeeds.  This may cause
305  * us to block waiting for a prior allocation to be freed.
306  */
307 void *
308 mpipe_alloc_waitok(malloc_pipe_t mpipe)
309 {
310     void *buf;
311     int mfailed;
312 
313     lwkt_gettoken(&mpipe->token);
314     mfailed = 0;
315     while ((buf = _mpipe_alloc_locked(mpipe, mfailed)) == NULL) {
316 	/*
317 	 * Block if we have hit our limit
318 	 */
319 	mpipe->pending = 1;
320 	tsleep(mpipe, 0, "mpipe1", 0);
321 	mfailed = 1;
322     }
323     lwkt_reltoken(&mpipe->token);
324 
325     return(buf);
326 }
327 
328 /*
329  * Free an entry, unblock any waiters.  Allow NULL.
330  */
331 void
332 mpipe_free(malloc_pipe_t mpipe, void *buf)
333 {
334     int n;
335 
336     if (buf == NULL)
337 	return;
338 
339     lwkt_gettoken(&mpipe->token);
340     if ((n = mpipe->free_count) < mpipe->ary_count) {
341 	/*
342 	 * Free slot available in free array (LIFO)
343 	 */
344 	mpipe->array[n] = buf;
345 	++mpipe->free_count;
346 	if ((mpipe->mpflags & (MPF_CACHEDATA|MPF_NOZERO)) == 0)
347 	    bzero(buf, mpipe->bytes);
348 	if (mpipe->mpflags & MPF_QUEUEWAIT) {
349 		mpipe->mpflags &= ~MPF_QUEUEWAIT;
350 		lwkt_reltoken(&mpipe->token);
351 		wakeup(&mpipe->queue);
352 	} else {
353 		lwkt_reltoken(&mpipe->token);
354 	}
355 	/*
356 	 * Wakeup anyone blocked in mpipe_alloc_*().
357 	 */
358 	if (mpipe->pending) {
359 	    mpipe->pending = 0;
360 	    wakeup(mpipe);
361 	}
362     } else {
363 	/*
364 	 * All the free slots are full, free the buffer directly.
365 	 */
366 	--mpipe->total_count;
367 	KKASSERT(mpipe->total_count >= mpipe->free_count);
368 	if (mpipe->deconstruct)
369 	    mpipe->deconstruct(buf, mpipe->priv);
370 	lwkt_reltoken(&mpipe->token);
371 	kfree(buf, mpipe->type);
372     }
373 }
374 
375