xref: /freebsd/sys/net/mp_ring.c (revision fdafd315)
14c7070dbSScott Long /*-
24c7070dbSScott Long  * Copyright (c) 2014 Chelsio Communications, Inc.
34c7070dbSScott Long  * All rights reserved.
44c7070dbSScott Long  * Written by: Navdeep Parhar <np@FreeBSD.org>
54c7070dbSScott Long  *
64c7070dbSScott Long  * Redistribution and use in source and binary forms, with or without
74c7070dbSScott Long  * modification, are permitted provided that the following conditions
84c7070dbSScott Long  * are met:
94c7070dbSScott Long  * 1. Redistributions of source code must retain the above copyright
104c7070dbSScott Long  *    notice, this list of conditions and the following disclaimer.
114c7070dbSScott Long  * 2. Redistributions in binary form must reproduce the above copyright
124c7070dbSScott Long  *    notice, this list of conditions and the following disclaimer in the
134c7070dbSScott Long  *    documentation and/or other materials provided with the distribution.
144c7070dbSScott Long  *
154c7070dbSScott Long  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
164c7070dbSScott Long  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
174c7070dbSScott Long  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
184c7070dbSScott Long  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
194c7070dbSScott Long  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
204c7070dbSScott Long  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
214c7070dbSScott Long  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
224c7070dbSScott Long  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
234c7070dbSScott Long  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
244c7070dbSScott Long  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
254c7070dbSScott Long  * SUCH DAMAGE.
264c7070dbSScott Long  */
274c7070dbSScott Long 
284c7070dbSScott Long #include <sys/types.h>
294c7070dbSScott Long #include <sys/param.h>
304c7070dbSScott Long #include <sys/systm.h>
314c7070dbSScott Long #include <sys/counter.h>
324c7070dbSScott Long #include <sys/lock.h>
334c7070dbSScott Long #include <sys/mutex.h>
344c7070dbSScott Long #include <sys/malloc.h>
354c7070dbSScott Long #include <machine/cpu.h>
36fc614c29SScott Long #include <net/mp_ring.h>
37fc614c29SScott Long 
384c7070dbSScott Long union ring_state {
394c7070dbSScott Long 	struct {
404c7070dbSScott Long 		uint16_t pidx_head;
414c7070dbSScott Long 		uint16_t pidx_tail;
424c7070dbSScott Long 		uint16_t cidx;
434c7070dbSScott Long 		uint16_t flags;
444c7070dbSScott Long 	};
454c7070dbSScott Long 	uint64_t state;
464c7070dbSScott Long };
474c7070dbSScott Long 
484c7070dbSScott Long enum {
494c7070dbSScott Long 	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
504c7070dbSScott Long 	BUSY,		/* consumer is running already, or will be shortly. */
514c7070dbSScott Long 	STALLED,	/* consumer stopped due to lack of resources. */
524c7070dbSScott Long 	ABDICATED,	/* consumer stopped even though there was work to be
534c7070dbSScott Long 			   done because it wants another thread to take over. */
544c7070dbSScott Long };
554c7070dbSScott Long 
564c7070dbSScott Long static inline uint16_t
space_available(struct ifmp_ring * r,union ring_state s)574c7070dbSScott Long space_available(struct ifmp_ring *r, union ring_state s)
584c7070dbSScott Long {
594c7070dbSScott Long 	uint16_t x = r->size - 1;
604c7070dbSScott Long 
614c7070dbSScott Long 	if (s.cidx == s.pidx_head)
624c7070dbSScott Long 		return (x);
634c7070dbSScott Long 	else if (s.cidx > s.pidx_head)
644c7070dbSScott Long 		return (s.cidx - s.pidx_head - 1);
654c7070dbSScott Long 	else
664c7070dbSScott Long 		return (x - s.pidx_head + s.cidx);
674c7070dbSScott Long }
684c7070dbSScott Long 
694c7070dbSScott Long static inline uint16_t
increment_idx(struct ifmp_ring * r,uint16_t idx,uint16_t n)704c7070dbSScott Long increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
714c7070dbSScott Long {
724c7070dbSScott Long 	int x = r->size - idx;
734c7070dbSScott Long 
744c7070dbSScott Long 	MPASS(x > 0);
754c7070dbSScott Long 	return (x > n ? idx + n : n - x);
764c7070dbSScott Long }
774c7070dbSScott Long 
784c7070dbSScott Long /* Consumer is about to update the ring's state to s */
794c7070dbSScott Long static inline uint16_t
state_to_flags(union ring_state s,int abdicate)804c7070dbSScott Long state_to_flags(union ring_state s, int abdicate)
814c7070dbSScott Long {
824c7070dbSScott Long 
834c7070dbSScott Long 	if (s.cidx == s.pidx_tail)
844c7070dbSScott Long 		return (IDLE);
854c7070dbSScott Long 	else if (abdicate && s.pidx_tail != s.pidx_head)
864c7070dbSScott Long 		return (ABDICATED);
874c7070dbSScott Long 
884c7070dbSScott Long 	return (BUSY);
894c7070dbSScott Long }
904c7070dbSScott Long 
915881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS
924c7070dbSScott Long static void
drain_ring_locked(struct ifmp_ring * r,union ring_state os,uint16_t prev,int budget)934c7070dbSScott Long drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
944c7070dbSScott Long {
954c7070dbSScott Long 	union ring_state ns;
964c7070dbSScott Long 	int n, pending, total;
974c7070dbSScott Long 	uint16_t cidx = os.cidx;
984c7070dbSScott Long 	uint16_t pidx = os.pidx_tail;
994c7070dbSScott Long 
1004c7070dbSScott Long 	MPASS(os.flags == BUSY);
1014c7070dbSScott Long 	MPASS(cidx != pidx);
1024c7070dbSScott Long 
1034c7070dbSScott Long 	if (prev == IDLE)
1044c7070dbSScott Long 		counter_u64_add(r->starts, 1);
1054c7070dbSScott Long 	pending = 0;
1064c7070dbSScott Long 	total = 0;
1074c7070dbSScott Long 
1084c7070dbSScott Long 	while (cidx != pidx) {
1094c7070dbSScott Long 		/* Items from cidx to pidx are available for consumption. */
1104c7070dbSScott Long 		n = r->drain(r, cidx, pidx);
1114c7070dbSScott Long 		if (n == 0) {
1124c7070dbSScott Long 			os.state = ns.state = r->state;
1134c7070dbSScott Long 			ns.cidx = cidx;
1144c7070dbSScott Long 			ns.flags = STALLED;
1154c7070dbSScott Long 			r->state = ns.state;
1164c7070dbSScott Long 			if (prev != STALLED)
1174c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
1184c7070dbSScott Long 			else if (total > 0) {
1194c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
1204c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
1214c7070dbSScott Long 			}
1224c7070dbSScott Long 			break;
1234c7070dbSScott Long 		}
1244c7070dbSScott Long 		cidx = increment_idx(r, cidx, n);
1254c7070dbSScott Long 		pending += n;
1264c7070dbSScott Long 		total += n;
1274c7070dbSScott Long 
1284c7070dbSScott Long 		/*
1294c7070dbSScott Long 		 * We update the cidx only if we've caught up with the pidx, the
1304c7070dbSScott Long 		 * real cidx is getting too far ahead of the one visible to
1314c7070dbSScott Long 		 * everyone else, or we have exceeded our budget.
1324c7070dbSScott Long 		 */
1334c7070dbSScott Long 		if (cidx != pidx && pending < 64 && total < budget)
1344c7070dbSScott Long 			continue;
1354c7070dbSScott Long 
1364c7070dbSScott Long 		os.state = ns.state = r->state;
1374c7070dbSScott Long 		ns.cidx = cidx;
1384c7070dbSScott Long 		ns.flags = state_to_flags(ns, total >= budget);
1394c7070dbSScott Long 		r->state = ns.state;
1404c7070dbSScott Long 
1414c7070dbSScott Long 		if (ns.flags == ABDICATED)
1424c7070dbSScott Long 			counter_u64_add(r->abdications, 1);
1434c7070dbSScott Long 		if (ns.flags != BUSY) {
1444c7070dbSScott Long 			/* Wrong loop exit if we're going to stall. */
1454c7070dbSScott Long 			MPASS(ns.flags != STALLED);
1464c7070dbSScott Long 			if (prev == STALLED) {
1474c7070dbSScott Long 				MPASS(total > 0);
1484c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
1494c7070dbSScott Long 			}
1504c7070dbSScott Long 			break;
1514c7070dbSScott Long 		}
1524c7070dbSScott Long 
1534c7070dbSScott Long 		/*
1544c7070dbSScott Long 		 * The acquire style atomic above guarantees visibility of items
1554c7070dbSScott Long 		 * associated with any pidx change that we notice here.
1564c7070dbSScott Long 		 */
1574c7070dbSScott Long 		pidx = ns.pidx_tail;
1584c7070dbSScott Long 		pending = 0;
1594c7070dbSScott Long 	}
1604c7070dbSScott Long }
1614c7070dbSScott Long #else
1624c7070dbSScott Long /*
1634c7070dbSScott Long  * Caller passes in a state, with a guarantee that there is work to do and that
1644c7070dbSScott Long  * all items up to the pidx_tail in the state are visible.
1654c7070dbSScott Long  */
1664c7070dbSScott Long static void
drain_ring_lockless(struct ifmp_ring * r,union ring_state os,uint16_t prev,int budget)1674c7070dbSScott Long drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
1684c7070dbSScott Long {
1694c7070dbSScott Long 	union ring_state ns;
1704c7070dbSScott Long 	int n, pending, total;
1714c7070dbSScott Long 	uint16_t cidx = os.cidx;
1724c7070dbSScott Long 	uint16_t pidx = os.pidx_tail;
1734c7070dbSScott Long 
1744c7070dbSScott Long 	MPASS(os.flags == BUSY);
1754c7070dbSScott Long 	MPASS(cidx != pidx);
1764c7070dbSScott Long 
1774c7070dbSScott Long 	if (prev == IDLE)
1784c7070dbSScott Long 		counter_u64_add(r->starts, 1);
1794c7070dbSScott Long 	pending = 0;
1804c7070dbSScott Long 	total = 0;
1814c7070dbSScott Long 
1824c7070dbSScott Long 	while (cidx != pidx) {
1834c7070dbSScott Long 		/* Items from cidx to pidx are available for consumption. */
1844c7070dbSScott Long 		n = r->drain(r, cidx, pidx);
1854c7070dbSScott Long 		if (n == 0) {
1864c7070dbSScott Long 			critical_enter();
18714e00107SMarius Strobl 			os.state = r->state;
1884c7070dbSScott Long 			do {
18914e00107SMarius Strobl 				ns.state = os.state;
1904c7070dbSScott Long 				ns.cidx = cidx;
1914c7070dbSScott Long 				ns.flags = STALLED;
19214e00107SMarius Strobl 			} while (atomic_fcmpset_64(&r->state, &os.state,
1934c7070dbSScott Long 			    ns.state) == 0);
1944c7070dbSScott Long 			critical_exit();
1954c7070dbSScott Long 			if (prev != STALLED)
1964c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
1974c7070dbSScott Long 			else if (total > 0) {
1984c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
1994c7070dbSScott Long 				counter_u64_add(r->stalls, 1);
2004c7070dbSScott Long 			}
2014c7070dbSScott Long 			break;
2024c7070dbSScott Long 		}
2034c7070dbSScott Long 		cidx = increment_idx(r, cidx, n);
2044c7070dbSScott Long 		pending += n;
2054c7070dbSScott Long 		total += n;
2064c7070dbSScott Long 
2074c7070dbSScott Long 		/*
2084c7070dbSScott Long 		 * We update the cidx only if we've caught up with the pidx, the
2094c7070dbSScott Long 		 * real cidx is getting too far ahead of the one visible to
2104c7070dbSScott Long 		 * everyone else, or we have exceeded our budget.
2114c7070dbSScott Long 		 */
2124c7070dbSScott Long 		if (cidx != pidx && pending < 64 && total < budget)
2134c7070dbSScott Long 			continue;
2144c7070dbSScott Long 		critical_enter();
21514e00107SMarius Strobl 		os.state = r->state;
216ab2e3f79SStephen Hurd 		do {
21714e00107SMarius Strobl 			ns.state = os.state;
2184c7070dbSScott Long 			ns.cidx = cidx;
2194c7070dbSScott Long 			ns.flags = state_to_flags(ns, total >= budget);
22014e00107SMarius Strobl 		} while (atomic_fcmpset_acq_64(&r->state, &os.state,
22114e00107SMarius Strobl 		    ns.state) == 0);
2224c7070dbSScott Long 		critical_exit();
2234c7070dbSScott Long 
2244c7070dbSScott Long 		if (ns.flags == ABDICATED)
2254c7070dbSScott Long 			counter_u64_add(r->abdications, 1);
2264c7070dbSScott Long 		if (ns.flags != BUSY) {
2274c7070dbSScott Long 			/* Wrong loop exit if we're going to stall. */
2284c7070dbSScott Long 			MPASS(ns.flags != STALLED);
2294c7070dbSScott Long 			if (prev == STALLED) {
2304c7070dbSScott Long 				MPASS(total > 0);
2314c7070dbSScott Long 				counter_u64_add(r->restarts, 1);
2324c7070dbSScott Long 			}
2334c7070dbSScott Long 			break;
2344c7070dbSScott Long 		}
2354c7070dbSScott Long 
2364c7070dbSScott Long 		/*
2374c7070dbSScott Long 		 * The acquire style atomic above guarantees visibility of items
2384c7070dbSScott Long 		 * associated with any pidx change that we notice here.
2394c7070dbSScott Long 		 */
2404c7070dbSScott Long 		pidx = ns.pidx_tail;
2414c7070dbSScott Long 		pending = 0;
2424c7070dbSScott Long 	}
2434c7070dbSScott Long }
2444c7070dbSScott Long #endif
2454c7070dbSScott Long 
2464c7070dbSScott Long int
ifmp_ring_alloc(struct ifmp_ring ** pr,int size,void * cookie,mp_ring_drain_t drain,mp_ring_can_drain_t can_drain,struct malloc_type * mt,int flags)2474c7070dbSScott Long ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
2484c7070dbSScott Long     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
2494c7070dbSScott Long {
2504c7070dbSScott Long 	struct ifmp_ring *r;
2514c7070dbSScott Long 
2524c7070dbSScott Long 	/* All idx are 16b so size can be 65536 at most */
2534c7070dbSScott Long 	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
2544c7070dbSScott Long 	    can_drain == NULL)
2554c7070dbSScott Long 		return (EINVAL);
2564c7070dbSScott Long 	*pr = NULL;
2574c7070dbSScott Long 	flags &= M_NOWAIT | M_WAITOK;
2584c7070dbSScott Long 	MPASS(flags != 0);
2594c7070dbSScott Long 
2604c7070dbSScott Long 	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
2614c7070dbSScott Long 	if (r == NULL)
2624c7070dbSScott Long 		return (ENOMEM);
2634c7070dbSScott Long 	r->size = size;
2644c7070dbSScott Long 	r->cookie = cookie;
2654c7070dbSScott Long 	r->mt = mt;
2664c7070dbSScott Long 	r->drain = drain;
2674c7070dbSScott Long 	r->can_drain = can_drain;
2684c7070dbSScott Long 	r->enqueues = counter_u64_alloc(flags);
2694c7070dbSScott Long 	r->drops = counter_u64_alloc(flags);
2704c7070dbSScott Long 	r->starts = counter_u64_alloc(flags);
2714c7070dbSScott Long 	r->stalls = counter_u64_alloc(flags);
2724c7070dbSScott Long 	r->restarts = counter_u64_alloc(flags);
2734c7070dbSScott Long 	r->abdications = counter_u64_alloc(flags);
2744c7070dbSScott Long 	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
2754c7070dbSScott Long 	    r->stalls == NULL || r->restarts == NULL ||
2764c7070dbSScott Long 	    r->abdications == NULL) {
2774c7070dbSScott Long 		ifmp_ring_free(r);
2784c7070dbSScott Long 		return (ENOMEM);
2794c7070dbSScott Long 	}
2804c7070dbSScott Long 
2814c7070dbSScott Long 	*pr = r;
2825881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS
2834c7070dbSScott Long 	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
2844c7070dbSScott Long #endif
2854c7070dbSScott Long 	return (0);
2864c7070dbSScott Long }
2874c7070dbSScott Long 
2884c7070dbSScott Long void
ifmp_ring_free(struct ifmp_ring * r)2894c7070dbSScott Long ifmp_ring_free(struct ifmp_ring *r)
2904c7070dbSScott Long {
2914c7070dbSScott Long 
2924c7070dbSScott Long 	if (r == NULL)
2934c7070dbSScott Long 		return;
2944c7070dbSScott Long 
2954c7070dbSScott Long 	if (r->enqueues != NULL)
2964c7070dbSScott Long 		counter_u64_free(r->enqueues);
2974c7070dbSScott Long 	if (r->drops != NULL)
2984c7070dbSScott Long 		counter_u64_free(r->drops);
2994c7070dbSScott Long 	if (r->starts != NULL)
3004c7070dbSScott Long 		counter_u64_free(r->starts);
3014c7070dbSScott Long 	if (r->stalls != NULL)
3024c7070dbSScott Long 		counter_u64_free(r->stalls);
3034c7070dbSScott Long 	if (r->restarts != NULL)
3044c7070dbSScott Long 		counter_u64_free(r->restarts);
3054c7070dbSScott Long 	if (r->abdications != NULL)
3064c7070dbSScott Long 		counter_u64_free(r->abdications);
3074c7070dbSScott Long 
3084c7070dbSScott Long 	free(r, r->mt);
3094c7070dbSScott Long }
3104c7070dbSScott Long 
3114c7070dbSScott Long /*
3124c7070dbSScott Long  * Enqueue n items and maybe drain the ring for some time.
3134c7070dbSScott Long  *
3144c7070dbSScott Long  * Returns an errno.
3154c7070dbSScott Long  */
3165881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS
3174c7070dbSScott Long int
ifmp_ring_enqueue(struct ifmp_ring * r,void ** items,int n,int budget,int abdicate)318fe51d4cdSStephen Hurd ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
3194c7070dbSScott Long {
3204c7070dbSScott Long 	union ring_state os, ns;
3214c7070dbSScott Long 	uint16_t pidx_start, pidx_stop;
3224c7070dbSScott Long 	int i;
3234c7070dbSScott Long 
3244c7070dbSScott Long 	MPASS(items != NULL);
3254c7070dbSScott Long 	MPASS(n > 0);
3264c7070dbSScott Long 
3274c7070dbSScott Long 	mtx_lock(&r->lock);
3284c7070dbSScott Long 	/*
3294c7070dbSScott Long 	 * Reserve room for the new items.  Our reservation, if successful, is
3304c7070dbSScott Long 	 * from 'pidx_start' to 'pidx_stop'.
3314c7070dbSScott Long 	 */
3324c7070dbSScott Long 	os.state = r->state;
3334c7070dbSScott Long 	if (n >= space_available(r, os)) {
3344c7070dbSScott Long 		counter_u64_add(r->drops, n);
3354c7070dbSScott Long 		MPASS(os.flags != IDLE);
336e335651eSMatt Macy 		mtx_unlock(&r->lock);
3374c7070dbSScott Long 		if (os.flags == STALLED)
3384c7070dbSScott Long 			ifmp_ring_check_drainage(r, 0);
3394c7070dbSScott Long 		return (ENOBUFS);
3404c7070dbSScott Long 	}
3414c7070dbSScott Long 	ns.state = os.state;
3424c7070dbSScott Long 	ns.pidx_head = increment_idx(r, os.pidx_head, n);
3434c7070dbSScott Long 	r->state = ns.state;
3444c7070dbSScott Long 	pidx_start = os.pidx_head;
3454c7070dbSScott Long 	pidx_stop = ns.pidx_head;
3464c7070dbSScott Long 
3474c7070dbSScott Long 	/*
3484c7070dbSScott Long 	 * Wait for other producers who got in ahead of us to enqueue their
3494c7070dbSScott Long 	 * items, one producer at a time.  It is our turn when the ring's
350efc457e1SPedro F. Giffuni 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
3514c7070dbSScott Long 	 */
3524c7070dbSScott Long 	while (ns.pidx_tail != pidx_start) {
3534c7070dbSScott Long 		cpu_spinwait();
3544c7070dbSScott Long 		ns.state = r->state;
3554c7070dbSScott Long 	}
3564c7070dbSScott Long 
3574c7070dbSScott Long 	/* Now it is our turn to fill up the area we reserved earlier. */
3584c7070dbSScott Long 	i = pidx_start;
3594c7070dbSScott Long 	do {
3604c7070dbSScott Long 		r->items[i] = *items++;
3614c7070dbSScott Long 		if (__predict_false(++i == r->size))
3624c7070dbSScott Long 			i = 0;
3634c7070dbSScott Long 	} while (i != pidx_stop);
3644c7070dbSScott Long 
3654c7070dbSScott Long 	/*
3664c7070dbSScott Long 	 * Update the ring's pidx_tail.  The release style atomic guarantees
3674c7070dbSScott Long 	 * that the items are visible to any thread that sees the updated pidx.
3684c7070dbSScott Long 	 */
3694c7070dbSScott Long 	os.state = ns.state = r->state;
3704c7070dbSScott Long 	ns.pidx_tail = pidx_stop;
371fe51d4cdSStephen Hurd 	if (abdicate) {
372fe51d4cdSStephen Hurd 		if (os.flags == IDLE)
373fe51d4cdSStephen Hurd 			ns.flags = ABDICATED;
37414e00107SMarius Strobl 	} else
3754c7070dbSScott Long 		ns.flags = BUSY;
3764c7070dbSScott Long 	r->state = ns.state;
3774c7070dbSScott Long 	counter_u64_add(r->enqueues, n);
3784c7070dbSScott Long 
379fe51d4cdSStephen Hurd 	if (!abdicate) {
3804c7070dbSScott Long 		/*
3814c7070dbSScott Long 		 * Turn into a consumer if some other thread isn't active as a consumer
3824c7070dbSScott Long 		 * already.
3834c7070dbSScott Long 		 */
3844c7070dbSScott Long 		if (os.flags != BUSY)
3854c7070dbSScott Long 			drain_ring_locked(r, ns, os.flags, budget);
386fe51d4cdSStephen Hurd 	}
3874c7070dbSScott Long 
3884c7070dbSScott Long 	mtx_unlock(&r->lock);
3894c7070dbSScott Long 	return (0);
3904c7070dbSScott Long }
3914c7070dbSScott Long #else
3924c7070dbSScott Long int
ifmp_ring_enqueue(struct ifmp_ring * r,void ** items,int n,int budget,int abdicate)393fe51d4cdSStephen Hurd ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
3944c7070dbSScott Long {
3954c7070dbSScott Long 	union ring_state os, ns;
3964c7070dbSScott Long 	uint16_t pidx_start, pidx_stop;
3974c7070dbSScott Long 	int i;
3984c7070dbSScott Long 
3994c7070dbSScott Long 	MPASS(items != NULL);
4004c7070dbSScott Long 	MPASS(n > 0);
4014c7070dbSScott Long 
4024c7070dbSScott Long 	/*
4034c7070dbSScott Long 	 * Reserve room for the new items.  Our reservation, if successful, is
4044c7070dbSScott Long 	 * from 'pidx_start' to 'pidx_stop'.
4054c7070dbSScott Long 	 */
4064c7070dbSScott Long 	os.state = r->state;
40714e00107SMarius Strobl 	for (;;) {
4084c7070dbSScott Long 		if (n >= space_available(r, os)) {
4094c7070dbSScott Long 			counter_u64_add(r->drops, n);
4104c7070dbSScott Long 			MPASS(os.flags != IDLE);
4114c7070dbSScott Long 			if (os.flags == STALLED)
4124c7070dbSScott Long 				ifmp_ring_check_drainage(r, 0);
4134c7070dbSScott Long 			return (ENOBUFS);
4144c7070dbSScott Long 		}
4154c7070dbSScott Long 		ns.state = os.state;
4164c7070dbSScott Long 		ns.pidx_head = increment_idx(r, os.pidx_head, n);
4174c7070dbSScott Long 		critical_enter();
41814e00107SMarius Strobl 		if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
4194c7070dbSScott Long 			break;
4204c7070dbSScott Long 		critical_exit();
4214c7070dbSScott Long 		cpu_spinwait();
4224c7070dbSScott Long 	}
4234c7070dbSScott Long 	pidx_start = os.pidx_head;
4244c7070dbSScott Long 	pidx_stop = ns.pidx_head;
4254c7070dbSScott Long 
4264c7070dbSScott Long 	/*
4274c7070dbSScott Long 	 * Wait for other producers who got in ahead of us to enqueue their
4284c7070dbSScott Long 	 * items, one producer at a time.  It is our turn when the ring's
429efc457e1SPedro F. Giffuni 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
4304c7070dbSScott Long 	 */
4314c7070dbSScott Long 	while (ns.pidx_tail != pidx_start) {
4324c7070dbSScott Long 		cpu_spinwait();
4334c7070dbSScott Long 		ns.state = r->state;
4344c7070dbSScott Long 	}
4354c7070dbSScott Long 
4364c7070dbSScott Long 	/* Now it is our turn to fill up the area we reserved earlier. */
4374c7070dbSScott Long 	i = pidx_start;
4384c7070dbSScott Long 	do {
4394c7070dbSScott Long 		r->items[i] = *items++;
4404c7070dbSScott Long 		if (__predict_false(++i == r->size))
4414c7070dbSScott Long 			i = 0;
4424c7070dbSScott Long 	} while (i != pidx_stop);
4434c7070dbSScott Long 
4444c7070dbSScott Long 	/*
4454c7070dbSScott Long 	 * Update the ring's pidx_tail.  The release style atomic guarantees
4464c7070dbSScott Long 	 * that the items are visible to any thread that sees the updated pidx.
4474c7070dbSScott Long 	 */
44814e00107SMarius Strobl 	os.state = r->state;
4494c7070dbSScott Long 	do {
45014e00107SMarius Strobl 		ns.state = os.state;
4514c7070dbSScott Long 		ns.pidx_tail = pidx_stop;
452fe51d4cdSStephen Hurd 		if (abdicate) {
4531225d9daSStephen Hurd 			if (os.flags == IDLE)
4541225d9daSStephen Hurd 				ns.flags = ABDICATED;
45514e00107SMarius Strobl 		} else
456fe51d4cdSStephen Hurd 			ns.flags = BUSY;
45714e00107SMarius Strobl 	} while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
4584c7070dbSScott Long 	critical_exit();
4594c7070dbSScott Long 	counter_u64_add(r->enqueues, n);
4604c7070dbSScott Long 
461fe51d4cdSStephen Hurd 	if (!abdicate) {
462fe51d4cdSStephen Hurd 		/*
463fe51d4cdSStephen Hurd 		 * Turn into a consumer if some other thread isn't active as a consumer
464fe51d4cdSStephen Hurd 		 * already.
465fe51d4cdSStephen Hurd 		 */
466fe51d4cdSStephen Hurd 		if (os.flags != BUSY)
467fe51d4cdSStephen Hurd 			drain_ring_lockless(r, ns, os.flags, budget);
468fe51d4cdSStephen Hurd 	}
469fe51d4cdSStephen Hurd 
4704c7070dbSScott Long 	return (0);
4714c7070dbSScott Long }
4724c7070dbSScott Long #endif
4734c7070dbSScott Long 
4744c7070dbSScott Long void
ifmp_ring_check_drainage(struct ifmp_ring * r,int budget)4754c7070dbSScott Long ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
4764c7070dbSScott Long {
4774c7070dbSScott Long 	union ring_state os, ns;
4784c7070dbSScott Long 
4794c7070dbSScott Long 	os.state = r->state;
4801225d9daSStephen Hurd 	if ((os.flags != STALLED && os.flags != ABDICATED) ||	// Only continue in STALLED and ABDICATED
4811225d9daSStephen Hurd 	    os.pidx_head != os.pidx_tail ||			// Require work to be available
4821225d9daSStephen Hurd 	    (os.flags != ABDICATED && r->can_drain(r) == 0))	// Can either drain, or everyone left
4834c7070dbSScott Long 		return;
4844c7070dbSScott Long 
4854c7070dbSScott Long 	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
4864c7070dbSScott Long 	ns.state = os.state;
4874c7070dbSScott Long 	ns.flags = BUSY;
4884c7070dbSScott Long 
4895881181dSMatt Macy #ifdef MP_RING_NO_64BIT_ATOMICS
4904c7070dbSScott Long 	mtx_lock(&r->lock);
4914c7070dbSScott Long 	if (r->state != os.state) {
4924c7070dbSScott Long 		mtx_unlock(&r->lock);
4934c7070dbSScott Long 		return;
4944c7070dbSScott Long 	}
4954c7070dbSScott Long 	r->state = ns.state;
4964c7070dbSScott Long 	drain_ring_locked(r, ns, os.flags, budget);
4974c7070dbSScott Long 	mtx_unlock(&r->lock);
4984c7070dbSScott Long #else
4994c7070dbSScott Long 	/*
5004c7070dbSScott Long 	 * The acquire style atomic guarantees visibility of items associated
5014c7070dbSScott Long 	 * with the pidx that we read here.
5024c7070dbSScott Long 	 */
5034c7070dbSScott Long 	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
5044c7070dbSScott Long 		return;
5054c7070dbSScott Long 
5064c7070dbSScott Long 	drain_ring_lockless(r, ns, os.flags, budget);
5074c7070dbSScott Long #endif
5084c7070dbSScott Long }
5094c7070dbSScott Long 
5104c7070dbSScott Long void
ifmp_ring_reset_stats(struct ifmp_ring * r)5114c7070dbSScott Long ifmp_ring_reset_stats(struct ifmp_ring *r)
5124c7070dbSScott Long {
5134c7070dbSScott Long 
5144c7070dbSScott Long 	counter_u64_zero(r->enqueues);
5154c7070dbSScott Long 	counter_u64_zero(r->drops);
5164c7070dbSScott Long 	counter_u64_zero(r->starts);
5174c7070dbSScott Long 	counter_u64_zero(r->stalls);
5184c7070dbSScott Long 	counter_u64_zero(r->restarts);
5194c7070dbSScott Long 	counter_u64_zero(r->abdications);
5204c7070dbSScott Long }
5214c7070dbSScott Long 
5224c7070dbSScott Long int
ifmp_ring_is_idle(struct ifmp_ring * r)5234c7070dbSScott Long ifmp_ring_is_idle(struct ifmp_ring *r)
5244c7070dbSScott Long {
5254c7070dbSScott Long 	union ring_state s;
5264c7070dbSScott Long 
5274c7070dbSScott Long 	s.state = r->state;
5284c7070dbSScott Long 	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
5294c7070dbSScott Long 	    s.flags == IDLE)
5304c7070dbSScott Long 		return (1);
5314c7070dbSScott Long 
5324c7070dbSScott Long 	return (0);
5334c7070dbSScott Long }
5344c7070dbSScott Long 
5354c7070dbSScott Long int
ifmp_ring_is_stalled(struct ifmp_ring * r)5364c7070dbSScott Long ifmp_ring_is_stalled(struct ifmp_ring *r)
5374c7070dbSScott Long {
5384c7070dbSScott Long 	union ring_state s;
5394c7070dbSScott Long 
5404c7070dbSScott Long 	s.state = r->state;
5414c7070dbSScott Long 	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
5424c7070dbSScott Long 		return (1);
5434c7070dbSScott Long 
5444c7070dbSScott Long 	return (0);
5454c7070dbSScott Long }
546