xref: /freebsd/sys/sys/buf_ring.h (revision e17f5b1d)
1 /*-
2  * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
3  *
4  * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  *
28  * $FreeBSD$
29  *
30  */
31 
32 #ifndef	_SYS_BUF_RING_H_
33 #define	_SYS_BUF_RING_H_
34 
35 #include <machine/cpu.h>
36 
37 #ifdef DEBUG_BUFRING
38 #include <sys/lock.h>
39 #include <sys/mutex.h>
40 #endif
41 
42 struct buf_ring {
43 	volatile uint32_t	br_prod_head;
44 	volatile uint32_t	br_prod_tail;
45 	int              	br_prod_size;
46 	int              	br_prod_mask;
47 	uint64_t		br_drops;
48 	volatile uint32_t	br_cons_head __aligned(CACHE_LINE_SIZE);
49 	volatile uint32_t	br_cons_tail;
50 	int		 	br_cons_size;
51 	int              	br_cons_mask;
52 #ifdef DEBUG_BUFRING
53 	struct mtx		*br_lock;
54 #endif
55 	void			*br_ring[0] __aligned(CACHE_LINE_SIZE);
56 };
57 
58 /*
59  * multi-producer safe lock-free ring buffer enqueue
60  *
61  */
62 static __inline int
63 buf_ring_enqueue(struct buf_ring *br, void *buf)
64 {
65 	uint32_t prod_head, prod_next, cons_tail;
66 #ifdef DEBUG_BUFRING
67 	int i;
68 
69 	/*
70 	 * Note: It is possible to encounter an mbuf that was removed
71 	 * via drbr_peek(), and then re-added via drbr_putback() and
72 	 * trigger a spurious panic.
73 	 */
74 	for (i = br->br_cons_head; i != br->br_prod_head;
75 	     i = ((i + 1) & br->br_cons_mask))
76 		if(br->br_ring[i] == buf)
77 			panic("buf=%p already enqueue at %d prod=%d cons=%d",
78 			    buf, i, br->br_prod_tail, br->br_cons_tail);
79 #endif
80 	critical_enter();
81 	do {
82 		prod_head = br->br_prod_head;
83 		prod_next = (prod_head + 1) & br->br_prod_mask;
84 		cons_tail = br->br_cons_tail;
85 
86 		if (prod_next == cons_tail) {
87 			rmb();
88 			if (prod_head == br->br_prod_head &&
89 			    cons_tail == br->br_cons_tail) {
90 				br->br_drops++;
91 				critical_exit();
92 				return (ENOBUFS);
93 			}
94 			continue;
95 		}
96 	} while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next));
97 #ifdef DEBUG_BUFRING
98 	if (br->br_ring[prod_head] != NULL)
99 		panic("dangling value in enqueue");
100 #endif
101 	br->br_ring[prod_head] = buf;
102 
103 	/*
104 	 * If there are other enqueues in progress
105 	 * that preceded us, we need to wait for them
106 	 * to complete
107 	 */
108 	while (br->br_prod_tail != prod_head)
109 		cpu_spinwait();
110 	atomic_store_rel_int(&br->br_prod_tail, prod_next);
111 	critical_exit();
112 	return (0);
113 }
114 
115 /*
116  * multi-consumer safe dequeue
117  *
118  */
119 static __inline void *
120 buf_ring_dequeue_mc(struct buf_ring *br)
121 {
122 	uint32_t cons_head, cons_next;
123 	void *buf;
124 
125 	critical_enter();
126 	do {
127 		cons_head = br->br_cons_head;
128 		cons_next = (cons_head + 1) & br->br_cons_mask;
129 
130 		if (cons_head == br->br_prod_tail) {
131 			critical_exit();
132 			return (NULL);
133 		}
134 	} while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next));
135 
136 	buf = br->br_ring[cons_head];
137 #ifdef DEBUG_BUFRING
138 	br->br_ring[cons_head] = NULL;
139 #endif
140 	/*
141 	 * If there are other dequeues in progress
142 	 * that preceded us, we need to wait for them
143 	 * to complete
144 	 */
145 	while (br->br_cons_tail != cons_head)
146 		cpu_spinwait();
147 
148 	atomic_store_rel_int(&br->br_cons_tail, cons_next);
149 	critical_exit();
150 
151 	return (buf);
152 }
153 
154 /*
155  * single-consumer dequeue
156  * use where dequeue is protected by a lock
157  * e.g. a network driver's tx queue lock
158  */
159 static __inline void *
160 buf_ring_dequeue_sc(struct buf_ring *br)
161 {
162 	uint32_t cons_head, cons_next;
163 #ifdef PREFETCH_DEFINED
164 	uint32_t cons_next_next;
165 #endif
166 	uint32_t prod_tail;
167 	void *buf;
168 
169 	/*
170 	 * This is a workaround to allow using buf_ring on ARM and ARM64.
171 	 * ARM64TODO: Fix buf_ring in a generic way.
172 	 * REMARKS: It is suspected that br_cons_head does not require
173 	 *   load_acq operation, but this change was extensively tested
174 	 *   and confirmed it's working. To be reviewed once again in
175 	 *   FreeBSD-12.
176 	 *
177 	 * Preventing following situation:
178 
179 	 * Core(0) - buf_ring_enqueue()                                       Core(1) - buf_ring_dequeue_sc()
180 	 * -----------------------------------------                                       ----------------------------------------------
181 	 *
182 	 *                                                                                cons_head = br->br_cons_head;
183 	 * atomic_cmpset_acq_32(&br->br_prod_head, ...));
184 	 *                                                                                buf = br->br_ring[cons_head];     <see <1>>
185 	 * br->br_ring[prod_head] = buf;
186 	 * atomic_store_rel_32(&br->br_prod_tail, ...);
187 	 *                                                                                prod_tail = br->br_prod_tail;
188 	 *                                                                                if (cons_head == prod_tail)
189 	 *                                                                                        return (NULL);
190 	 *                                                                                <condition is false and code uses invalid(old) buf>`
191 	 *
192 	 * <1> Load (on core 1) from br->br_ring[cons_head] can be reordered (speculative readed) by CPU.
193 	 */
194 #if defined(__arm__) || defined(__aarch64__)
195 	cons_head = atomic_load_acq_32(&br->br_cons_head);
196 #else
197 	cons_head = br->br_cons_head;
198 #endif
199 	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
200 
201 	cons_next = (cons_head + 1) & br->br_cons_mask;
202 #ifdef PREFETCH_DEFINED
203 	cons_next_next = (cons_head + 2) & br->br_cons_mask;
204 #endif
205 
206 	if (cons_head == prod_tail)
207 		return (NULL);
208 
209 #ifdef PREFETCH_DEFINED
210 	if (cons_next != prod_tail) {
211 		prefetch(br->br_ring[cons_next]);
212 		if (cons_next_next != prod_tail)
213 			prefetch(br->br_ring[cons_next_next]);
214 	}
215 #endif
216 	br->br_cons_head = cons_next;
217 	buf = br->br_ring[cons_head];
218 
219 #ifdef DEBUG_BUFRING
220 	br->br_ring[cons_head] = NULL;
221 	if (!mtx_owned(br->br_lock))
222 		panic("lock not held on single consumer dequeue");
223 	if (br->br_cons_tail != cons_head)
224 		panic("inconsistent list cons_tail=%d cons_head=%d",
225 		    br->br_cons_tail, cons_head);
226 #endif
227 	br->br_cons_tail = cons_next;
228 	return (buf);
229 }
230 
231 /*
232  * single-consumer advance after a peek
233  * use where it is protected by a lock
234  * e.g. a network driver's tx queue lock
235  */
236 static __inline void
237 buf_ring_advance_sc(struct buf_ring *br)
238 {
239 	uint32_t cons_head, cons_next;
240 	uint32_t prod_tail;
241 
242 	cons_head = br->br_cons_head;
243 	prod_tail = br->br_prod_tail;
244 
245 	cons_next = (cons_head + 1) & br->br_cons_mask;
246 	if (cons_head == prod_tail)
247 		return;
248 	br->br_cons_head = cons_next;
249 #ifdef DEBUG_BUFRING
250 	br->br_ring[cons_head] = NULL;
251 #endif
252 	br->br_cons_tail = cons_next;
253 }
254 
255 /*
256  * Used to return a buffer (most likely already there)
257  * to the top of the ring. The caller should *not*
258  * have used any dequeue to pull it out of the ring
259  * but instead should have used the peek() function.
260  * This is normally used where the transmit queue
261  * of a driver is full, and an mbuf must be returned.
262  * Most likely whats in the ring-buffer is what
263  * is being put back (since it was not removed), but
264  * sometimes the lower transmit function may have
265  * done a pullup or other function that will have
266  * changed it. As an optimization we always put it
267  * back (since jhb says the store is probably cheaper),
268  * if we have to do a multi-queue version we will need
269  * the compare and an atomic.
270  */
271 static __inline void
272 buf_ring_putback_sc(struct buf_ring *br, void *new)
273 {
274 	KASSERT(br->br_cons_head != br->br_prod_tail,
275 		("Buf-Ring has none in putback")) ;
276 	br->br_ring[br->br_cons_head] = new;
277 }
278 
279 /*
280  * return a pointer to the first entry in the ring
281  * without modifying it, or NULL if the ring is empty
282  * race-prone if not protected by a lock
283  */
284 static __inline void *
285 buf_ring_peek(struct buf_ring *br)
286 {
287 
288 #ifdef DEBUG_BUFRING
289 	if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
290 		panic("lock not held on single consumer dequeue");
291 #endif
292 	/*
293 	 * I believe it is safe to not have a memory barrier
294 	 * here because we control cons and tail is worst case
295 	 * a lagging indicator so we worst case we might
296 	 * return NULL immediately after a buffer has been enqueued
297 	 */
298 	if (br->br_cons_head == br->br_prod_tail)
299 		return (NULL);
300 
301 	return (br->br_ring[br->br_cons_head]);
302 }
303 
304 static __inline void *
305 buf_ring_peek_clear_sc(struct buf_ring *br)
306 {
307 #ifdef DEBUG_BUFRING
308 	void *ret;
309 
310 	if (!mtx_owned(br->br_lock))
311 		panic("lock not held on single consumer dequeue");
312 #endif
313 
314 	if (br->br_cons_head == br->br_prod_tail)
315 		return (NULL);
316 
317 #if defined(__arm__) || defined(__aarch64__)
318 	/*
319 	 * The barrier is required there on ARM and ARM64 to ensure, that
320 	 * br->br_ring[br->br_cons_head] will not be fetched before the above
321 	 * condition is checked.
322 	 * Without the barrier, it is possible, that buffer will be fetched
323 	 * before the enqueue will put mbuf into br, then, in the meantime, the
324 	 * enqueue will update the array and the br_prod_tail, and the
325 	 * conditional check will be true, so we will return previously fetched
326 	 * (and invalid) buffer.
327 	 */
328 	atomic_thread_fence_acq();
329 #endif
330 
331 #ifdef DEBUG_BUFRING
332 	/*
333 	 * Single consumer, i.e. cons_head will not move while we are
334 	 * running, so atomic_swap_ptr() is not necessary here.
335 	 */
336 	ret = br->br_ring[br->br_cons_head];
337 	br->br_ring[br->br_cons_head] = NULL;
338 	return (ret);
339 #else
340 	return (br->br_ring[br->br_cons_head]);
341 #endif
342 }
343 
344 static __inline int
345 buf_ring_full(struct buf_ring *br)
346 {
347 
348 	return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
349 }
350 
351 static __inline int
352 buf_ring_empty(struct buf_ring *br)
353 {
354 
355 	return (br->br_cons_head == br->br_prod_tail);
356 }
357 
358 static __inline int
359 buf_ring_count(struct buf_ring *br)
360 {
361 
362 	return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
363 	    & br->br_prod_mask);
364 }
365 
366 struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags,
367     struct mtx *);
368 void buf_ring_free(struct buf_ring *br, struct malloc_type *type);
369 
370 
371 
372 #endif
373