xref: /dragonfly/sys/kern/kern_mpipe.c (revision 52a88097)
1 /*
2  * Copyright (c) 2003,2004,2020 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 
35 #include <sys/param.h>
36 #include <sys/systm.h>
37 #include <sys/kernel.h>
38 #include <sys/slaballoc.h>
39 #include <sys/malloc.h>
40 #include <sys/mbuf.h>
41 #include <sys/vmmeter.h>
42 #include <sys/lock.h>
43 #include <sys/thread.h>
44 #include <sys/globaldata.h>
45 #include <sys/mpipe.h>
46 #include <sys/kthread.h>
47 
48 struct mpipe_callback {
49 	STAILQ_ENTRY(mpipe_callback) entry;
50 	void (*func)(void *arg1, void *arg2);
51 	void *arg1;
52 	void *arg2;
53 };
54 
55 static MALLOC_DEFINE(M_MPIPEARY, "MPipe Array", "Auxiliary MPIPE structure");
56 
57 static void mpipe_thread(void *arg);
58 
59 /*
60  * Initialize a malloc pipeline for the specified malloc type and allocation
61  * size.  Create an array to cache up to nom_count buffers and preallocate
62  * them.
63  */
64 void
65 mpipe_init(malloc_pipe_t mpipe, malloc_type_t type, int bytes,
66 	int nnom, int nmax,
67 	int mpflags,
68 	void (*construct)(void *, void *),
69 	void (*deconstruct)(void *, void *),
70 	void *priv)
71 {
72     int n;
73 
74     if (nnom < 1)
75 	nnom = 1;
76     if (nmax < 0)
77 	nmax = 0x7FFF0000;	/* some very large number */
78     if (nmax < nnom)
79 	nmax = nnom;
80     bzero(mpipe, sizeof(struct malloc_pipe));
81     mpipe->type = type;
82     mpipe->bytes = bytes;
83     mpipe->mpflags = mpflags;
84     mpipe->construct = construct;
85     mpipe->deconstruct = deconstruct;
86     mpipe->priv = priv;
87     if ((mpflags & MPF_NOZERO) == 0)
88 	mpipe->mflags |= M_ZERO;
89     if (mpflags & MPF_INT)
90 	mpipe->mflags |= M_USE_RESERVE | M_USE_INTERRUPT_RESERVE;
91     mpipe->ary_count = nnom;
92     mpipe->max_count = nmax;
93     mpipe->array = kmalloc(nnom * sizeof(mpipe->array[0]), M_MPIPEARY,
94 			    M_WAITOK | M_ZERO);
95 
96     while (mpipe->free_count < nnom) {
97 	n = mpipe->free_count;
98 	mpipe->array[n] = kmalloc(bytes, mpipe->type, M_WAITOK | mpipe->mflags);
99 	if (construct)
100 	    construct(mpipe->array[n], priv);
101 	++mpipe->free_count;
102 	++mpipe->total_count;
103     }
104     STAILQ_INIT(&mpipe->queue);
105 
106     lwkt_token_init(&mpipe->token, "mpipe token");
107 
108     /*
109      * Create a support thread for the mpipe queue
110      */
111     if (mpflags & MPF_CALLBACK) {
112 	    kthread_create(mpipe_thread, mpipe, &mpipe->thread,
113 			   "mpipe_%s", type->ks_shortdesc);
114     }
115 }
116 
117 /*
118  * Destroy a previously initialized mpipe.  This routine can also safely be
119  * called on an uninitialized mpipe structure if it was zero'd or mpipe_done()
120  * was previously called on it.
121  */
122 void
123 mpipe_done(malloc_pipe_t mpipe)
124 {
125     void *buf;
126     int n;
127 
128     KKASSERT(mpipe->free_count == mpipe->total_count);	/* no outstanding mem */
129 
130     /*
131      * Clean up the kthread
132      */
133     lwkt_gettoken(&mpipe->token);
134     mpipe->mpflags |= MPF_EXITING;
135     while (mpipe->thread) {
136 	wakeup(&mpipe->queue);
137 	tsleep(mpipe, 0, "mpipex", 1);
138     }
139 
140     /*
141      * Clean up the mpipe buffers
142      */
143     for (n = mpipe->free_count - 1; n >= 0; --n) {
144 	buf = mpipe->array[n];
145 	mpipe->array[n] = NULL;
146 	KKASSERT(buf != NULL);
147 	if (mpipe->deconstruct)
148 	    mpipe->deconstruct(buf, mpipe->priv);
149 	kfree(buf, mpipe->type);
150     }
151     mpipe->free_count = 0;
152     mpipe->total_count = 0;
153     if (mpipe->array) {
154 	kfree(mpipe->array, M_MPIPEARY);
155 	mpipe->array = NULL;
156     }
157     lwkt_reltoken(&mpipe->token);
158     lwkt_token_uninit(&mpipe->token);
159 }
160 
161 /*
162  * mpipe support thread for request failures when mpipe_alloc_callback()
163  * is called.
164  *
165  * Only set MPF_QUEUEWAIT if entries are pending in the queue.  If no entries
166  * are pending and a new entry is added, other code will set MPF_QUEUEWAIT
167  * for us.
168  */
169 static void
170 mpipe_thread(void *arg)
171 {
172     malloc_pipe_t mpipe = arg;
173     struct mpipe_callback *mcb;
174 
175     lwkt_gettoken(&mpipe->token);
176     while ((mpipe->mpflags & MPF_EXITING) == 0) {
177 	while (mpipe->free_count &&
178 	       (mcb = STAILQ_FIRST(&mpipe->queue)) != NULL) {
179 		STAILQ_REMOVE(&mpipe->queue, mcb, mpipe_callback, entry);
180 		mcb->func(mcb->arg1, mcb->arg2);
181 		kfree(mcb, M_MPIPEARY);
182 	}
183 	if (STAILQ_FIRST(&mpipe->queue))
184 		mpipe->mpflags |= MPF_QUEUEWAIT;
185 	tsleep(&mpipe->queue, 0, "wait", 0);
186     }
187     mpipe->thread = NULL;
188     lwkt_reltoken(&mpipe->token);
189     wakeup(mpipe);
190 }
191 
192 
193 /*
194  * Allocate an entry (inline suppot routine).  The allocation is guarenteed
195  * to return non-NULL up to the nominal count after which it may return NULL.
196  * Note that the implementation is defined to be allowed to block for short
197  * periods of time.
198  *
199  * Use mpipe_alloc_callback() for non-blocking operation with a callback
200  * Use mpipe_alloc_nowait() for non-blocking operation without a callback
201  * Use mpipe_alloc_waitok() for blocking operation & guarenteed non-NULL
202  */
203 static __inline
204 void *
205 _mpipe_alloc_locked(malloc_pipe_t mpipe, int mfailed)
206 {
207     void *buf;
208     int n;
209 
210     if ((n = mpipe->free_count) != 0) {
211 	/*
212 	 * Use a free entry if it exists.
213 	 */
214 	--n;
215 	buf = mpipe->array[n];
216 	mpipe->array[n] = NULL;	/* sanity check, not absolutely needed */
217 	mpipe->free_count = n;
218     } else if (mpipe->total_count >= mpipe->max_count || mfailed) {
219 	/*
220 	 * Return NULL if we have hit our limit
221 	 */
222 	buf = NULL;
223     } else {
224 	/*
225 	 * Otherwise try to malloc() non-blocking.
226 	 */
227 	buf = kmalloc(mpipe->bytes, mpipe->type, M_NOWAIT | mpipe->mflags);
228 	if (buf) {
229 	    ++mpipe->total_count;
230 	    if (mpipe->construct)
231 	        mpipe->construct(buf, mpipe->priv);
232 	}
233     }
234     return(buf);
235 }
236 
237 /*
238  * Nominal non-blocking mpipe allocation
239  */
240 void *
241 mpipe_alloc_nowait(malloc_pipe_t mpipe)
242 {
243     void *buf;
244 
245     lwkt_gettoken(&mpipe->token);
246     buf = _mpipe_alloc_locked(mpipe, 0);
247     lwkt_reltoken(&mpipe->token);
248 
249     return(buf);
250 }
251 
252 /*
253  * non-blocking mpipe allocation with callback for retry.
254  *
255  * If NULL is returned func(arg) is queued and will be called back when
256  * space is likely (but not necessarily) available.
257  *
258  * If non-NULL is returned func(arg) is ignored.
259  */
260 void *
261 mpipe_alloc_callback(malloc_pipe_t mpipe, void (*func)(void *arg1, void *arg2),
262 		     void *arg1, void *arg2)
263 {
264     struct mpipe_callback *mcb;
265     void *buf;
266 
267     lwkt_gettoken(&mpipe->token);
268     buf = _mpipe_alloc_locked(mpipe, 0);
269     if (buf == NULL) {
270 	mcb = kmalloc(sizeof(*mcb), M_MPIPEARY, M_INTWAIT);
271 	buf = _mpipe_alloc_locked(mpipe, 0);
272 	if (buf == NULL) {
273 	    mcb->func = func;
274 	    mcb->arg1 = arg1;
275 	    mcb->arg2 = arg2;
276 	    STAILQ_INSERT_TAIL(&mpipe->queue, mcb, entry);
277 	    mpipe->mpflags |= MPF_QUEUEWAIT;	/* for mpipe_thread() */
278 	} else {
279 	    kfree(mcb, M_MPIPEARY);
280 	}
281     }
282     lwkt_reltoken(&mpipe->token);
283 
284     return(buf);
285 }
286 
287 /*
288  * This function can be called to nominally wait until resources are
289  * available and mpipe_alloc_nowait() is likely to return non-NULL.
290  *
291  * NOTE: mpipe_alloc_nowait() can still return NULL.
292  */
293 void
294 mpipe_wait(malloc_pipe_t mpipe)
295 {
296     if (mpipe->free_count == 0) {
297 	lwkt_gettoken(&mpipe->token);
298 	while ((mpipe->mpflags & MPF_EXITING) == 0) {
299 	    if (mpipe->free_count)
300 		    break;
301 	    mpipe->mpflags |= MPF_QUEUEWAIT;
302 	    tsleep(&mpipe->queue, 0, "wait", 0);
303 	}
304 	lwkt_reltoken(&mpipe->token);
305     }
306 }
307 
308 /*
309  * Allocate an entry, block until the allocation succeeds.  This may cause
310  * us to block waiting for a prior allocation to be freed.
311  */
312 void *
313 mpipe_alloc_waitok(malloc_pipe_t mpipe)
314 {
315     void *buf;
316     int mfailed;
317 
318     lwkt_gettoken(&mpipe->token);
319     mfailed = 0;
320     while ((buf = _mpipe_alloc_locked(mpipe, mfailed)) == NULL) {
321 	/*
322 	 * Block if we have hit our limit
323 	 */
324 	mpipe->pending = 1;
325 	tsleep(mpipe, 0, "mpipe1", 0);
326 	mfailed = 1;
327     }
328     lwkt_reltoken(&mpipe->token);
329 
330     return(buf);
331 }
332 
333 /*
334  * Free an entry, unblock any waiters.  Allow NULL.
335  */
336 void
337 mpipe_free(malloc_pipe_t mpipe, void *buf)
338 {
339     int n;
340 
341     if (buf == NULL)
342 	return;
343 
344     lwkt_gettoken(&mpipe->token);
345     if ((n = mpipe->free_count) < mpipe->ary_count) {
346 	/*
347 	 * Free slot available in free array (LIFO)
348 	 */
349 	mpipe->array[n] = buf;
350 	++mpipe->free_count;
351 	if ((mpipe->mpflags & (MPF_CACHEDATA|MPF_NOZERO)) == 0)
352 	    bzero(buf, mpipe->bytes);
353 	if (mpipe->mpflags & MPF_QUEUEWAIT) {
354 		mpipe->mpflags &= ~MPF_QUEUEWAIT;
355 		lwkt_reltoken(&mpipe->token);
356 		wakeup(&mpipe->queue);
357 	} else {
358 		lwkt_reltoken(&mpipe->token);
359 	}
360 	/*
361 	 * Wakeup anyone blocked in mpipe_alloc_*().
362 	 */
363 	if (mpipe->pending) {
364 	    mpipe->pending = 0;
365 	    wakeup(mpipe);
366 	}
367     } else {
368 	/*
369 	 * All the free slots are full, free the buffer directly.
370 	 */
371 	--mpipe->total_count;
372 	KKASSERT(mpipe->total_count >= mpipe->free_count);
373 	if (mpipe->deconstruct)
374 	    mpipe->deconstruct(buf, mpipe->priv);
375 	lwkt_reltoken(&mpipe->token);
376 	kfree(buf, mpipe->type);
377     }
378 }
379 
380