1 /**
2  * \file
3  * Lock free queue.
4  *
5  * (C) Copyright 2011 Novell, Inc
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining
8  * a copy of this software and associated documentation files (the
9  * "Software"), to deal in the Software without restriction, including
10  * without limitation the rights to use, copy, modify, merge, publish,
11  * distribute, sublicense, and/or sell copies of the Software, and to
12  * permit persons to whom the Software is furnished to do so, subject to
13  * the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be
16  * included in all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
20  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
22  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
24  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25  */
26 
27 /*
28  * This is an implementation of a lock-free queue, as described in
29  *
30  * Simple, Fast, and Practical Non-Blocking and Blocking
31  *   Concurrent Queue Algorithms
32  * Maged M. Michael, Michael L. Scott
33  * 1995
34  *
35  * A few slight modifications have been made:
36  *
37  * We use hazard pointers to rule out the ABA problem, instead of the
38  * counter as in the paper.
39  *
40  * Memory management of the queue entries is done by the caller, not
41  * by the queue implementation.  This implies that the dequeue
42  * function must return the queue entry, not just the data.
43  *
44  * Therefore, the dummy entry must never be returned.  We do this by
45  * re-enqueuing a new dummy entry after we dequeue one and then
46  * retrying the dequeue.  We need more than one dummy because they
47  * must be hazardly freed.
48  */
49 
50 #include <stdlib.h>
51 #ifdef HAVE_UNISTD_H
52 #include <unistd.h>
53 #endif
54 
55 #include <mono/utils/mono-membar.h>
56 #include <mono/utils/hazard-pointer.h>
57 #include <mono/utils/atomic.h>
58 
59 #include <mono/utils/lock-free-queue.h>
60 
61 #define INVALID_NEXT	((MonoLockFreeQueueNode *volatile)-1)
62 #define END_MARKER	((MonoLockFreeQueueNode *volatile)-2)
63 #define FREE_NEXT	((MonoLockFreeQueueNode *volatile)-3)
64 
65 /*
66  * Initialize a lock-free queue in-place at @q.
67  */
68 void
mono_lock_free_queue_init(MonoLockFreeQueue * q)69 mono_lock_free_queue_init (MonoLockFreeQueue *q)
70 {
71 	int i;
72 	for (i = 0; i < MONO_LOCK_FREE_QUEUE_NUM_DUMMIES; ++i) {
73 		q->dummies [i].node.next = (i == 0) ? END_MARKER : FREE_NEXT;
74 		q->dummies [i].in_use = i == 0 ? 1 : 0;
75 #ifdef QUEUE_DEBUG
76 		q->dummies [i].node.in_queue = i == 0 ? TRUE : FALSE;
77 #endif
78 	}
79 
80 	q->head = q->tail = &q->dummies [0].node;
81 	q->has_dummy = 1;
82 }
83 
84 /*
85  * Initialize @node's state. If @poison is TRUE, @node may not be enqueued to a
86  * queue - @mono_lock_free_queue_node_unpoison must be called first; otherwise,
87  * the node can be enqueued right away.
88  *
89  * The poisoning feature is mainly intended for ensuring correctness in complex
90  * lock-free code that uses the queue. For example, in some code that reuses
91  * nodes, nodes can be poisoned when they're dequeued, and then unpoisoned and
92  * enqueued in their hazard free callback.
93  */
94 void
mono_lock_free_queue_node_init(MonoLockFreeQueueNode * node,gboolean poison)95 mono_lock_free_queue_node_init (MonoLockFreeQueueNode *node, gboolean poison)
96 {
97 	node->next = poison ? INVALID_NEXT : FREE_NEXT;
98 #ifdef QUEUE_DEBUG
99 	node->in_queue = FALSE;
100 #endif
101 }
102 
103 /*
104  * Unpoisons @node so that it may be enqueued.
105  */
106 void
mono_lock_free_queue_node_unpoison(MonoLockFreeQueueNode * node)107 mono_lock_free_queue_node_unpoison (MonoLockFreeQueueNode *node)
108 {
109 	g_assert (node->next == INVALID_NEXT);
110 #ifdef QUEUE_DEBUG
111 	g_assert (!node->in_queue);
112 #endif
113 	node->next = FREE_NEXT;
114 }
115 
116 /*
117  * Enqueue @node to @q. @node must have been initialized by a prior call to
118  * @mono_lock_free_queue_node_init, and must not be in a poisoned state.
119  */
120 void
mono_lock_free_queue_enqueue(MonoLockFreeQueue * q,MonoLockFreeQueueNode * node)121 mono_lock_free_queue_enqueue (MonoLockFreeQueue *q, MonoLockFreeQueueNode *node)
122 {
123 	MonoThreadHazardPointers *hp = mono_hazard_pointer_get ();
124 	MonoLockFreeQueueNode *tail;
125 
126 #ifdef QUEUE_DEBUG
127 	g_assert (!node->in_queue);
128 	node->in_queue = TRUE;
129 	mono_memory_write_barrier ();
130 #endif
131 
132 	g_assert (node->next == FREE_NEXT);
133 	node->next = END_MARKER;
134 	for (;;) {
135 		MonoLockFreeQueueNode *next;
136 
137 		tail = (MonoLockFreeQueueNode *) mono_get_hazardous_pointer ((gpointer volatile*)&q->tail, hp, 0);
138 		mono_memory_read_barrier ();
139 		/*
140 		 * We never dereference next so we don't need a
141 		 * hazardous load.
142 		 */
143 		next = tail->next;
144 		mono_memory_read_barrier ();
145 
146 		/* Are tail and next consistent? */
147 		if (tail == q->tail) {
148 			g_assert (next != INVALID_NEXT && next != FREE_NEXT);
149 			g_assert (next != tail);
150 
151 			if (next == END_MARKER) {
152 				/*
153 				 * Here we require that nodes that
154 				 * have been dequeued don't have
155 				 * next==END_MARKER.  If they did, we
156 				 * might append to a node that isn't
157 				 * in the queue anymore here.
158 				 */
159 				if (mono_atomic_cas_ptr ((gpointer volatile*)&tail->next, node, END_MARKER) == END_MARKER)
160 					break;
161 			} else {
162 				/* Try to advance tail */
163 				mono_atomic_cas_ptr ((gpointer volatile*)&q->tail, next, tail);
164 			}
165 		}
166 
167 		mono_memory_write_barrier ();
168 		mono_hazard_pointer_clear (hp, 0);
169 	}
170 
171 	/* Try to advance tail */
172 	mono_atomic_cas_ptr ((gpointer volatile*)&q->tail, node, tail);
173 
174 	mono_memory_write_barrier ();
175 	mono_hazard_pointer_clear (hp, 0);
176 }
177 
178 static void
free_dummy(gpointer _dummy)179 free_dummy (gpointer _dummy)
180 {
181 	MonoLockFreeQueueDummy *dummy = (MonoLockFreeQueueDummy *) _dummy;
182 	mono_lock_free_queue_node_unpoison (&dummy->node);
183 	g_assert (dummy->in_use);
184 	mono_memory_write_barrier ();
185 	dummy->in_use = 0;
186 }
187 
188 static MonoLockFreeQueueDummy*
get_dummy(MonoLockFreeQueue * q)189 get_dummy (MonoLockFreeQueue *q)
190 {
191 	int i;
192 	for (i = 0; i < MONO_LOCK_FREE_QUEUE_NUM_DUMMIES; ++i) {
193 		MonoLockFreeQueueDummy *dummy = &q->dummies [i];
194 
195 		if (dummy->in_use)
196 			continue;
197 
198 		if (mono_atomic_cas_i32 (&dummy->in_use, 1, 0) == 0)
199 			return dummy;
200 	}
201 	return NULL;
202 }
203 
204 static gboolean
is_dummy(MonoLockFreeQueue * q,MonoLockFreeQueueNode * n)205 is_dummy (MonoLockFreeQueue *q, MonoLockFreeQueueNode *n)
206 {
207 	return n >= &q->dummies [0].node && n <= &q->dummies [MONO_LOCK_FREE_QUEUE_NUM_DUMMIES-1].node;
208 }
209 
210 static gboolean
try_reenqueue_dummy(MonoLockFreeQueue * q)211 try_reenqueue_dummy (MonoLockFreeQueue *q)
212 {
213 	MonoLockFreeQueueDummy *dummy;
214 
215 	if (q->has_dummy)
216 		return FALSE;
217 
218 	dummy = get_dummy (q);
219 	if (!dummy)
220 		return FALSE;
221 
222 	if (mono_atomic_cas_i32 (&q->has_dummy, 1, 0) != 0) {
223 		dummy->in_use = 0;
224 		return FALSE;
225 	}
226 
227 	mono_lock_free_queue_enqueue (q, &dummy->node);
228 
229 	return TRUE;
230 }
231 
232 /*
233  * Dequeues a node from @q. Returns NULL if no nodes are available. The returned
234  * node is hazardous and must be freed with @mono_thread_hazardous_try_free or
235  * @mono_thread_hazardous_queue_free - it must not be freed directly.
236  */
237 MonoLockFreeQueueNode*
mono_lock_free_queue_dequeue(MonoLockFreeQueue * q)238 mono_lock_free_queue_dequeue (MonoLockFreeQueue *q)
239 {
240 	MonoThreadHazardPointers *hp = mono_hazard_pointer_get ();
241 	MonoLockFreeQueueNode *head;
242 
243  retry:
244 	for (;;) {
245 		MonoLockFreeQueueNode *tail, *next;
246 
247 		head = (MonoLockFreeQueueNode *) mono_get_hazardous_pointer ((gpointer volatile*)&q->head, hp, 0);
248 		tail = (MonoLockFreeQueueNode*)q->tail;
249 		mono_memory_read_barrier ();
250 		next = head->next;
251 		mono_memory_read_barrier ();
252 
253 		/* Are head, tail and next consistent? */
254 		if (head == q->head) {
255 			g_assert (next != INVALID_NEXT && next != FREE_NEXT);
256 			g_assert (next != head);
257 
258 			/* Is queue empty or tail behind? */
259 			if (head == tail) {
260 				if (next == END_MARKER) {
261 					/* Queue is empty */
262 					mono_hazard_pointer_clear (hp, 0);
263 
264 					/*
265 					 * We only continue if we
266 					 * reenqueue the dummy
267 					 * ourselves, so as not to
268 					 * wait for threads that might
269 					 * not actually run.
270 					 */
271 					if (!is_dummy (q, head) && try_reenqueue_dummy (q))
272 						continue;
273 
274 					return NULL;
275 				}
276 
277 				/* Try to advance tail */
278 				mono_atomic_cas_ptr ((gpointer volatile*)&q->tail, next, tail);
279 			} else {
280 				g_assert (next != END_MARKER);
281 				/* Try to dequeue head */
282 				if (mono_atomic_cas_ptr ((gpointer volatile*)&q->head, next, head) == head)
283 					break;
284 			}
285 		}
286 
287 		mono_memory_write_barrier ();
288 		mono_hazard_pointer_clear (hp, 0);
289 	}
290 
291 	/*
292 	 * The head is dequeued now, so we know it's this thread's
293 	 * responsibility to free it - no other thread can.
294 	 */
295 	mono_memory_write_barrier ();
296 	mono_hazard_pointer_clear (hp, 0);
297 
298 	g_assert (head->next);
299 	/*
300 	 * Setting next here isn't necessary for correctness, but we
301 	 * do it to make sure that we catch dereferencing next in a
302 	 * node that's not in the queue anymore.
303 	 */
304 	head->next = INVALID_NEXT;
305 #if QUEUE_DEBUG
306 	g_assert (head->in_queue);
307 	head->in_queue = FALSE;
308 	mono_memory_write_barrier ();
309 #endif
310 
311 	if (is_dummy (q, head)) {
312 		g_assert (q->has_dummy);
313 		q->has_dummy = 0;
314 		mono_memory_write_barrier ();
315 		mono_thread_hazardous_try_free (head, free_dummy);
316 		if (try_reenqueue_dummy (q))
317 			goto retry;
318 		return NULL;
319 	}
320 
321 	/* The caller must hazardously free the node. */
322 	return head;
323 }
324