xref: /dragonfly/sys/kern/kern_mpipe.c (revision 0db87cb7)
1 /*
2  * (MPSAFE)
3  *
4  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
5  *
6  * This code is derived from software contributed to The DragonFly Project
7  * by Matthew Dillon <dillon@backplane.com>
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * 1. Redistributions of source code must retain the above copyright
14  *    notice, this list of conditions and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in
17  *    the documentation and/or other materials provided with the
18  *    distribution.
19  * 3. Neither the name of The DragonFly Project nor the names of its
20  *    contributors may be used to endorse or promote products derived
21  *    from this software without specific, prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
28  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
29  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
30  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
31  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
32  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
33  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34  * SUCH DAMAGE.
35  */
36 
37 #include <sys/param.h>
38 #include <sys/systm.h>
39 #include <sys/kernel.h>
40 #include <sys/slaballoc.h>
41 #include <sys/mbuf.h>
42 #include <sys/vmmeter.h>
43 #include <sys/lock.h>
44 #include <sys/thread.h>
45 #include <sys/globaldata.h>
46 #include <sys/mpipe.h>
47 #include <sys/kthread.h>
48 
49 #include <sys/thread2.h>
50 
51 struct mpipe_callback {
52 	STAILQ_ENTRY(mpipe_callback) entry;
53 	void (*func)(void *arg1, void *arg2);
54 	void *arg1;
55 	void *arg2;
56 };
57 
58 static MALLOC_DEFINE(M_MPIPEARY, "MPipe Array", "Auxiliary MPIPE structure");
59 
60 static void mpipe_thread(void *arg);
61 
62 /*
63  * Initialize a malloc pipeline for the specified malloc type and allocation
64  * size.  Create an array to cache up to nom_count buffers and preallocate
65  * them.
66  */
67 void
68 mpipe_init(malloc_pipe_t mpipe, malloc_type_t type, int bytes,
69 	int nnom, int nmax,
70 	int mpflags,
71 	void (*construct)(void *, void *),
72 	void (*deconstruct)(void *, void *),
73 	void *priv)
74 {
75     int n;
76 
77     if (nnom < 1)
78 	nnom = 1;
79     if (nmax < 0)
80 	nmax = 0x7FFF0000;	/* some very large number */
81     if (nmax < nnom)
82 	nmax = nnom;
83     bzero(mpipe, sizeof(struct malloc_pipe));
84     mpipe->type = type;
85     mpipe->bytes = bytes;
86     mpipe->mpflags = mpflags;
87     mpipe->construct = construct;
88     mpipe->deconstruct = deconstruct;
89     mpipe->priv = priv;
90     if ((mpflags & MPF_NOZERO) == 0)
91 	mpipe->mflags |= M_ZERO;
92     if (mpflags & MPF_INT)
93 	mpipe->mflags |= M_USE_RESERVE | M_USE_INTERRUPT_RESERVE;
94     mpipe->ary_count = nnom;
95     mpipe->max_count = nmax;
96     mpipe->array = kmalloc(nnom * sizeof(mpipe->array[0]), M_MPIPEARY,
97 			    M_WAITOK | M_ZERO);
98 
99     while (mpipe->free_count < nnom) {
100 	n = mpipe->free_count;
101 	mpipe->array[n] = kmalloc(bytes, mpipe->type, M_WAITOK | mpipe->mflags);
102 	if (construct)
103 	    construct(mpipe->array[n], priv);
104 	++mpipe->free_count;
105 	++mpipe->total_count;
106     }
107     STAILQ_INIT(&mpipe->queue);
108 
109     lwkt_token_init(&mpipe->token, "mpipe token");
110 
111     /*
112      * Create a support thread for the mpipe queue
113      */
114     if (mpflags & MPF_CALLBACK) {
115 	    kthread_create(mpipe_thread, mpipe, &mpipe->thread,
116 			   "mpipe_%s", type->ks_shortdesc);
117     }
118 }
119 
120 /*
121  * Destroy a previously initialized mpipe.  This routine can also safely be
122  * called on an uninitialized mpipe structure if it was zero'd or mpipe_done()
123  * was previously called on it.
124  */
125 void
126 mpipe_done(malloc_pipe_t mpipe)
127 {
128     void *buf;
129     int n;
130 
131     KKASSERT(mpipe->free_count == mpipe->total_count);	/* no outstanding mem */
132 
133     /*
134      * Clean up the kthread
135      */
136     lwkt_gettoken(&mpipe->token);
137     mpipe->mpflags |= MPF_EXITING;
138     while (mpipe->thread) {
139 	wakeup(&mpipe->queue);
140 	tsleep(mpipe, 0, "mpipex", 1);
141     }
142 
143     /*
144      * Clean up the mpipe buffers
145      */
146     for (n = mpipe->free_count - 1; n >= 0; --n) {
147 	buf = mpipe->array[n];
148 	mpipe->array[n] = NULL;
149 	KKASSERT(buf != NULL);
150 	if (mpipe->deconstruct)
151 	    mpipe->deconstruct(buf, mpipe->priv);
152 	kfree(buf, mpipe->type);
153     }
154     mpipe->free_count = 0;
155     mpipe->total_count = 0;
156     if (mpipe->array) {
157 	kfree(mpipe->array, M_MPIPEARY);
158 	mpipe->array = NULL;
159     }
160     lwkt_reltoken(&mpipe->token);
161     lwkt_token_uninit(&mpipe->token);
162 }
163 
164 /*
165  * mpipe support thread for request failures when mpipe_alloc_callback()
166  * is called.
167  */
168 static void
169 mpipe_thread(void *arg)
170 {
171     malloc_pipe_t mpipe = arg;
172     struct mpipe_callback *mcb;
173 
174     lwkt_gettoken(&mpipe->token);
175     while ((mpipe->mpflags & MPF_EXITING) == 0) {
176 	while (mpipe->free_count &&
177 	       (mcb = STAILQ_FIRST(&mpipe->queue)) != NULL) {
178 		STAILQ_REMOVE(&mpipe->queue, mcb, mpipe_callback, entry);
179 		mcb->func(mcb->arg1, mcb->arg2);
180 		kfree(mcb, M_MPIPEARY);
181 	}
182 	mpipe->mpflags |= MPF_QUEUEWAIT;
183 	tsleep(&mpipe->queue, 0, "wait", 0);
184     }
185     mpipe->thread = NULL;
186     wakeup(mpipe);
187     lwkt_reltoken(&mpipe->token);
188 }
189 
190 
191 /*
192  * Allocate an entry (inline suppot routine).  The allocation is guarenteed
193  * to return non-NULL up to the nominal count after which it may return NULL.
194  * Note that the implementation is defined to be allowed to block for short
195  * periods of time.
196  *
197  * Use mpipe_alloc_callback() for non-blocking operation with a callback
198  * Use mpipe_alloc_nowait() for non-blocking operation without a callback
199  * Use mpipe_alloc_waitok() for blocking operation & guarenteed non-NULL
200  */
201 static __inline
202 void *
203 _mpipe_alloc_locked(malloc_pipe_t mpipe, int mfailed)
204 {
205     void *buf;
206     int n;
207 
208     if ((n = mpipe->free_count) != 0) {
209 	/*
210 	 * Use a free entry if it exists.
211 	 */
212 	--n;
213 	buf = mpipe->array[n];
214 	mpipe->array[n] = NULL;	/* sanity check, not absolutely needed */
215 	mpipe->free_count = n;
216     } else if (mpipe->total_count >= mpipe->max_count || mfailed) {
217 	/*
218 	 * Return NULL if we have hit our limit
219 	 */
220 	buf = NULL;
221     } else {
222 	/*
223 	 * Otherwise try to malloc() non-blocking.
224 	 */
225 	buf = kmalloc(mpipe->bytes, mpipe->type, M_NOWAIT | mpipe->mflags);
226 	if (buf) {
227 	    ++mpipe->total_count;
228 	    if (mpipe->construct)
229 	        mpipe->construct(buf, mpipe->priv);
230 	}
231     }
232     return(buf);
233 }
234 
235 /*
236  * Nominal non-blocking mpipe allocation
237  */
238 void *
239 mpipe_alloc_nowait(malloc_pipe_t mpipe)
240 {
241     void *buf;
242 
243     lwkt_gettoken(&mpipe->token);
244     buf = _mpipe_alloc_locked(mpipe, 0);
245     lwkt_reltoken(&mpipe->token);
246 
247     return(buf);
248 }
249 
250 /*
251  * non-blocking mpipe allocation with callback for retry.
252  *
253  * If NULL is returned func(arg) is queued and will be called back when
254  * space is likely (but not necessarily) available.
255  *
256  * If non-NULL is returned func(arg) is ignored.
257  */
258 void *
259 mpipe_alloc_callback(malloc_pipe_t mpipe, void (*func)(void *arg1, void *arg2),
260 		     void *arg1, void *arg2)
261 {
262     struct mpipe_callback *mcb;
263     void *buf;
264 
265     lwkt_gettoken(&mpipe->token);
266     buf = _mpipe_alloc_locked(mpipe, 0);
267     if (buf == NULL) {
268 	mcb = kmalloc(sizeof(*mcb), M_MPIPEARY, M_INTWAIT);
269 	buf = _mpipe_alloc_locked(mpipe, 0);
270 	if (buf == NULL) {
271 	    mcb->func = func;
272 	    mcb->arg1 = arg1;
273 	    mcb->arg2 = arg2;
274 	    STAILQ_INSERT_TAIL(&mpipe->queue, mcb, entry);
275 	} else {
276 	    kfree(mcb, M_MPIPEARY);
277 	}
278     }
279     lwkt_reltoken(&mpipe->token);
280 
281     return(buf);
282 }
283 
284 /*
285  * This function can be called to nominally wait until resources are
286  * available and mpipe_alloc_nowait() is likely to return non-NULL.
287  *
288  * NOTE: mpipe_alloc_nowait() can still return NULL.
289  */
290 void
291 mpipe_wait(malloc_pipe_t mpipe)
292 {
293     if (mpipe->free_count == 0) {
294 	lwkt_gettoken(&mpipe->token);
295 	while ((mpipe->mpflags & MPF_EXITING) == 0) {
296 	    if (mpipe->free_count)
297 		    break;
298 	    mpipe->mpflags |= MPF_QUEUEWAIT;
299 	    tsleep(&mpipe->queue, 0, "wait", 0);
300 	}
301 	lwkt_reltoken(&mpipe->token);
302     }
303 }
304 
305 /*
306  * Allocate an entry, block until the allocation succeeds.  This may cause
307  * us to block waiting for a prior allocation to be freed.
308  */
309 void *
310 mpipe_alloc_waitok(malloc_pipe_t mpipe)
311 {
312     void *buf;
313     int mfailed;
314 
315     lwkt_gettoken(&mpipe->token);
316     mfailed = 0;
317     while ((buf = _mpipe_alloc_locked(mpipe, mfailed)) == NULL) {
318 	/*
319 	 * Block if we have hit our limit
320 	 */
321 	mpipe->pending = 1;
322 	tsleep(mpipe, 0, "mpipe1", 0);
323 	mfailed = 1;
324     }
325     lwkt_reltoken(&mpipe->token);
326 
327     return(buf);
328 }
329 
330 /*
331  * Free an entry, unblock any waiters.  Allow NULL.
332  */
333 void
334 mpipe_free(malloc_pipe_t mpipe, void *buf)
335 {
336     int n;
337 
338     if (buf == NULL)
339 	return;
340 
341     lwkt_gettoken(&mpipe->token);
342     if ((n = mpipe->free_count) < mpipe->ary_count) {
343 	/*
344 	 * Free slot available in free array (LIFO)
345 	 */
346 	mpipe->array[n] = buf;
347 	++mpipe->free_count;
348 	if ((mpipe->mpflags & (MPF_CACHEDATA|MPF_NOZERO)) == 0)
349 	    bzero(buf, mpipe->bytes);
350 	if (mpipe->mpflags & MPF_QUEUEWAIT) {
351 		mpipe->mpflags &= ~MPF_QUEUEWAIT;
352 		lwkt_reltoken(&mpipe->token);
353 		wakeup(&mpipe->queue);
354 	} else {
355 		lwkt_reltoken(&mpipe->token);
356 	}
357 	/*
358 	 * Wakeup anyone blocked in mpipe_alloc_*().
359 	 */
360 	if (mpipe->pending) {
361 	    mpipe->pending = 0;
362 	    wakeup(mpipe);
363 	}
364     } else {
365 	/*
366 	 * All the free slots are full, free the buffer directly.
367 	 */
368 	--mpipe->total_count;
369 	KKASSERT(mpipe->total_count >= mpipe->free_count);
370 	if (mpipe->deconstruct)
371 	    mpipe->deconstruct(buf, mpipe->priv);
372 	lwkt_reltoken(&mpipe->token);
373 	kfree(buf, mpipe->type);
374     }
375 }
376 
377