xref: /dragonfly/sys/kern/kern_mpipe.c (revision 1dedbd3b)
1 /*
2  * (MPSAFE)
3  *
4  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
5  *
6  * This code is derived from software contributed to The DragonFly Project
7  * by Matthew Dillon <dillon@backplane.com>
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * 1. Redistributions of source code must retain the above copyright
14  *    notice, this list of conditions and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in
17  *    the documentation and/or other materials provided with the
18  *    distribution.
19  * 3. Neither the name of The DragonFly Project nor the names of its
20  *    contributors may be used to endorse or promote products derived
21  *    from this software without specific, prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
28  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
29  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
30  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
31  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
32  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
33  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34  * SUCH DAMAGE.
35  */
36 
37 #include <sys/param.h>
38 #include <sys/systm.h>
39 #include <sys/kernel.h>
40 #include <sys/slaballoc.h>
41 #include <sys/malloc.h>
42 #include <sys/mbuf.h>
43 #include <sys/vmmeter.h>
44 #include <sys/lock.h>
45 #include <sys/thread.h>
46 #include <sys/globaldata.h>
47 #include <sys/mpipe.h>
48 #include <sys/kthread.h>
49 
50 struct mpipe_callback {
51 	STAILQ_ENTRY(mpipe_callback) entry;
52 	void (*func)(void *arg1, void *arg2);
53 	void *arg1;
54 	void *arg2;
55 };
56 
57 static MALLOC_DEFINE(M_MPIPEARY, "MPipe Array", "Auxiliary MPIPE structure");
58 
59 static void mpipe_thread(void *arg);
60 
61 /*
62  * Initialize a malloc pipeline for the specified malloc type and allocation
63  * size.  Create an array to cache up to nom_count buffers and preallocate
64  * them.
65  */
66 void
67 mpipe_init(malloc_pipe_t mpipe, malloc_type_t type, int bytes,
68 	int nnom, int nmax,
69 	int mpflags,
70 	void (*construct)(void *, void *),
71 	void (*deconstruct)(void *, void *),
72 	void *priv)
73 {
74     int n;
75 
76     if (nnom < 1)
77 	nnom = 1;
78     if (nmax < 0)
79 	nmax = 0x7FFF0000;	/* some very large number */
80     if (nmax < nnom)
81 	nmax = nnom;
82     bzero(mpipe, sizeof(struct malloc_pipe));
83     mpipe->type = type;
84     mpipe->bytes = bytes;
85     mpipe->mpflags = mpflags;
86     mpipe->construct = construct;
87     mpipe->deconstruct = deconstruct;
88     mpipe->priv = priv;
89     if ((mpflags & MPF_NOZERO) == 0)
90 	mpipe->mflags |= M_ZERO;
91     if (mpflags & MPF_INT)
92 	mpipe->mflags |= M_USE_RESERVE | M_USE_INTERRUPT_RESERVE;
93     mpipe->ary_count = nnom;
94     mpipe->max_count = nmax;
95     mpipe->array = kmalloc(nnom * sizeof(mpipe->array[0]), M_MPIPEARY,
96 			    M_WAITOK | M_ZERO);
97 
98     while (mpipe->free_count < nnom) {
99 	n = mpipe->free_count;
100 	mpipe->array[n] = kmalloc(bytes, mpipe->type, M_WAITOK | mpipe->mflags);
101 	if (construct)
102 	    construct(mpipe->array[n], priv);
103 	++mpipe->free_count;
104 	++mpipe->total_count;
105     }
106     STAILQ_INIT(&mpipe->queue);
107 
108     lwkt_token_init(&mpipe->token, "mpipe token");
109 
110     /*
111      * Create a support thread for the mpipe queue
112      */
113     if (mpflags & MPF_CALLBACK) {
114 	    kthread_create(mpipe_thread, mpipe, &mpipe->thread,
115 			   "mpipe_%s", type->ks_shortdesc);
116     }
117 }
118 
119 /*
120  * Destroy a previously initialized mpipe.  This routine can also safely be
121  * called on an uninitialized mpipe structure if it was zero'd or mpipe_done()
122  * was previously called on it.
123  */
124 void
125 mpipe_done(malloc_pipe_t mpipe)
126 {
127     void *buf;
128     int n;
129 
130     KKASSERT(mpipe->free_count == mpipe->total_count);	/* no outstanding mem */
131 
132     /*
133      * Clean up the kthread
134      */
135     lwkt_gettoken(&mpipe->token);
136     mpipe->mpflags |= MPF_EXITING;
137     while (mpipe->thread) {
138 	wakeup(&mpipe->queue);
139 	tsleep(mpipe, 0, "mpipex", 1);
140     }
141 
142     /*
143      * Clean up the mpipe buffers
144      */
145     for (n = mpipe->free_count - 1; n >= 0; --n) {
146 	buf = mpipe->array[n];
147 	mpipe->array[n] = NULL;
148 	KKASSERT(buf != NULL);
149 	if (mpipe->deconstruct)
150 	    mpipe->deconstruct(buf, mpipe->priv);
151 	kfree(buf, mpipe->type);
152     }
153     mpipe->free_count = 0;
154     mpipe->total_count = 0;
155     if (mpipe->array) {
156 	kfree(mpipe->array, M_MPIPEARY);
157 	mpipe->array = NULL;
158     }
159     lwkt_reltoken(&mpipe->token);
160     lwkt_token_uninit(&mpipe->token);
161 }
162 
163 /*
164  * mpipe support thread for request failures when mpipe_alloc_callback()
165  * is called.
166  */
167 static void
168 mpipe_thread(void *arg)
169 {
170     malloc_pipe_t mpipe = arg;
171     struct mpipe_callback *mcb;
172 
173     lwkt_gettoken(&mpipe->token);
174     while ((mpipe->mpflags & MPF_EXITING) == 0) {
175 	while (mpipe->free_count &&
176 	       (mcb = STAILQ_FIRST(&mpipe->queue)) != NULL) {
177 		STAILQ_REMOVE(&mpipe->queue, mcb, mpipe_callback, entry);
178 		mcb->func(mcb->arg1, mcb->arg2);
179 		kfree(mcb, M_MPIPEARY);
180 	}
181 	mpipe->mpflags |= MPF_QUEUEWAIT;
182 	tsleep(&mpipe->queue, 0, "wait", 0);
183     }
184     mpipe->thread = NULL;
185     wakeup(mpipe);
186     lwkt_reltoken(&mpipe->token);
187 }
188 
189 
190 /*
191  * Allocate an entry (inline suppot routine).  The allocation is guarenteed
192  * to return non-NULL up to the nominal count after which it may return NULL.
193  * Note that the implementation is defined to be allowed to block for short
194  * periods of time.
195  *
196  * Use mpipe_alloc_callback() for non-blocking operation with a callback
197  * Use mpipe_alloc_nowait() for non-blocking operation without a callback
198  * Use mpipe_alloc_waitok() for blocking operation & guarenteed non-NULL
199  */
200 static __inline
201 void *
202 _mpipe_alloc_locked(malloc_pipe_t mpipe, int mfailed)
203 {
204     void *buf;
205     int n;
206 
207     if ((n = mpipe->free_count) != 0) {
208 	/*
209 	 * Use a free entry if it exists.
210 	 */
211 	--n;
212 	buf = mpipe->array[n];
213 	mpipe->array[n] = NULL;	/* sanity check, not absolutely needed */
214 	mpipe->free_count = n;
215     } else if (mpipe->total_count >= mpipe->max_count || mfailed) {
216 	/*
217 	 * Return NULL if we have hit our limit
218 	 */
219 	buf = NULL;
220     } else {
221 	/*
222 	 * Otherwise try to malloc() non-blocking.
223 	 */
224 	buf = kmalloc(mpipe->bytes, mpipe->type, M_NOWAIT | mpipe->mflags);
225 	if (buf) {
226 	    ++mpipe->total_count;
227 	    if (mpipe->construct)
228 	        mpipe->construct(buf, mpipe->priv);
229 	}
230     }
231     return(buf);
232 }
233 
234 /*
235  * Nominal non-blocking mpipe allocation
236  */
237 void *
238 mpipe_alloc_nowait(malloc_pipe_t mpipe)
239 {
240     void *buf;
241 
242     lwkt_gettoken(&mpipe->token);
243     buf = _mpipe_alloc_locked(mpipe, 0);
244     lwkt_reltoken(&mpipe->token);
245 
246     return(buf);
247 }
248 
249 /*
250  * non-blocking mpipe allocation with callback for retry.
251  *
252  * If NULL is returned func(arg) is queued and will be called back when
253  * space is likely (but not necessarily) available.
254  *
255  * If non-NULL is returned func(arg) is ignored.
256  */
257 void *
258 mpipe_alloc_callback(malloc_pipe_t mpipe, void (*func)(void *arg1, void *arg2),
259 		     void *arg1, void *arg2)
260 {
261     struct mpipe_callback *mcb;
262     void *buf;
263 
264     lwkt_gettoken(&mpipe->token);
265     buf = _mpipe_alloc_locked(mpipe, 0);
266     if (buf == NULL) {
267 	mcb = kmalloc(sizeof(*mcb), M_MPIPEARY, M_INTWAIT);
268 	buf = _mpipe_alloc_locked(mpipe, 0);
269 	if (buf == NULL) {
270 	    mcb->func = func;
271 	    mcb->arg1 = arg1;
272 	    mcb->arg2 = arg2;
273 	    STAILQ_INSERT_TAIL(&mpipe->queue, mcb, entry);
274 	} else {
275 	    kfree(mcb, M_MPIPEARY);
276 	}
277     }
278     lwkt_reltoken(&mpipe->token);
279 
280     return(buf);
281 }
282 
283 /*
284  * This function can be called to nominally wait until resources are
285  * available and mpipe_alloc_nowait() is likely to return non-NULL.
286  *
287  * NOTE: mpipe_alloc_nowait() can still return NULL.
288  */
289 void
290 mpipe_wait(malloc_pipe_t mpipe)
291 {
292     if (mpipe->free_count == 0) {
293 	lwkt_gettoken(&mpipe->token);
294 	while ((mpipe->mpflags & MPF_EXITING) == 0) {
295 	    if (mpipe->free_count)
296 		    break;
297 	    mpipe->mpflags |= MPF_QUEUEWAIT;
298 	    tsleep(&mpipe->queue, 0, "wait", 0);
299 	}
300 	lwkt_reltoken(&mpipe->token);
301     }
302 }
303 
304 /*
305  * Allocate an entry, block until the allocation succeeds.  This may cause
306  * us to block waiting for a prior allocation to be freed.
307  */
308 void *
309 mpipe_alloc_waitok(malloc_pipe_t mpipe)
310 {
311     void *buf;
312     int mfailed;
313 
314     lwkt_gettoken(&mpipe->token);
315     mfailed = 0;
316     while ((buf = _mpipe_alloc_locked(mpipe, mfailed)) == NULL) {
317 	/*
318 	 * Block if we have hit our limit
319 	 */
320 	mpipe->pending = 1;
321 	tsleep(mpipe, 0, "mpipe1", 0);
322 	mfailed = 1;
323     }
324     lwkt_reltoken(&mpipe->token);
325 
326     return(buf);
327 }
328 
329 /*
330  * Free an entry, unblock any waiters.  Allow NULL.
331  */
332 void
333 mpipe_free(malloc_pipe_t mpipe, void *buf)
334 {
335     int n;
336 
337     if (buf == NULL)
338 	return;
339 
340     lwkt_gettoken(&mpipe->token);
341     if ((n = mpipe->free_count) < mpipe->ary_count) {
342 	/*
343 	 * Free slot available in free array (LIFO)
344 	 */
345 	mpipe->array[n] = buf;
346 	++mpipe->free_count;
347 	if ((mpipe->mpflags & (MPF_CACHEDATA|MPF_NOZERO)) == 0)
348 	    bzero(buf, mpipe->bytes);
349 	if (mpipe->mpflags & MPF_QUEUEWAIT) {
350 		mpipe->mpflags &= ~MPF_QUEUEWAIT;
351 		lwkt_reltoken(&mpipe->token);
352 		wakeup(&mpipe->queue);
353 	} else {
354 		lwkt_reltoken(&mpipe->token);
355 	}
356 	/*
357 	 * Wakeup anyone blocked in mpipe_alloc_*().
358 	 */
359 	if (mpipe->pending) {
360 	    mpipe->pending = 0;
361 	    wakeup(mpipe);
362 	}
363     } else {
364 	/*
365 	 * All the free slots are full, free the buffer directly.
366 	 */
367 	--mpipe->total_count;
368 	KKASSERT(mpipe->total_count >= mpipe->free_count);
369 	if (mpipe->deconstruct)
370 	    mpipe->deconstruct(buf, mpipe->priv);
371 	lwkt_reltoken(&mpipe->token);
372 	kfree(buf, mpipe->type);
373     }
374 }
375 
376