1 use std::cell::UnsafeCell;
2 use std::mem::MaybeUninit;
3 use std::ptr;
4 use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
5 use std::thread;
6 
7 use cache_padded::CachePadded;
8 
9 use crate::{PopError, PushError};
10 
11 // Bits indicating the state of a slot:
12 // * If a value has been written into the slot, `WRITE` is set.
13 // * If a value has been read from the slot, `READ` is set.
14 // * If the block is being destroyed, `DESTROY` is set.
15 const WRITE: usize = 1;
16 const READ: usize = 2;
17 const DESTROY: usize = 4;
18 
19 // Each block covers one "lap" of indices.
20 const LAP: usize = 32;
21 // The maximum number of items a block can hold.
22 const BLOCK_CAP: usize = LAP - 1;
23 // How many lower bits are reserved for metadata.
24 const SHIFT: usize = 1;
25 // Has two different purposes:
26 // * If set in head, indicates that the block is not the last one.
27 // * If set in tail, indicates that the queue is closed.
28 const MARK_BIT: usize = 1;
29 
30 /// A slot in a block.
31 struct Slot<T> {
32     /// The value.
33     value: UnsafeCell<MaybeUninit<T>>,
34 
35     /// The state of the slot.
36     state: AtomicUsize,
37 }
38 
39 impl<T> Slot<T> {
40     const UNINIT: Slot<T> = Slot {
41         value: UnsafeCell::new(MaybeUninit::uninit()),
42         state: AtomicUsize::new(0),
43     };
44 
45     /// Waits until a value is written into the slot.
wait_write(&self)46     fn wait_write(&self) {
47         while self.state.load(Ordering::Acquire) & WRITE == 0 {
48             thread::yield_now();
49         }
50     }
51 }
52 
53 /// A block in a linked list.
54 ///
55 /// Each block in the list can hold up to `BLOCK_CAP` values.
56 struct Block<T> {
57     /// The next block in the linked list.
58     next: AtomicPtr<Block<T>>,
59 
60     /// Slots for values.
61     slots: [Slot<T>; BLOCK_CAP],
62 }
63 
64 impl<T> Block<T> {
65     /// Creates an empty block.
new() -> Block<T>66     fn new() -> Block<T> {
67         Block {
68             next: AtomicPtr::new(ptr::null_mut()),
69             slots: [Slot::UNINIT; BLOCK_CAP],
70         }
71     }
72 
73     /// Waits until the next pointer is set.
wait_next(&self) -> *mut Block<T>74     fn wait_next(&self) -> *mut Block<T> {
75         loop {
76             let next = self.next.load(Ordering::Acquire);
77             if !next.is_null() {
78                 return next;
79             }
80             thread::yield_now();
81         }
82     }
83 
84     /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
destroy(this: *mut Block<T>, start: usize)85     unsafe fn destroy(this: *mut Block<T>, start: usize) {
86         // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
87         // begun destruction of the block.
88         for i in start..BLOCK_CAP - 1 {
89             let slot = (*this).slots.get_unchecked(i);
90 
91             // Mark the `DESTROY` bit if a thread is still using the slot.
92             if slot.state.load(Ordering::Acquire) & READ == 0
93                 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
94             {
95                 // If a thread is still using the slot, it will continue destruction of the block.
96                 return;
97             }
98         }
99 
100         // No thread is using the block, now it is safe to destroy it.
101         drop(Box::from_raw(this));
102     }
103 }
104 
105 /// A position in a queue.
106 struct Position<T> {
107     /// The index in the queue.
108     index: AtomicUsize,
109 
110     /// The block in the linked list.
111     block: AtomicPtr<Block<T>>,
112 }
113 
114 /// An unbounded queue.
115 pub struct Unbounded<T> {
116     /// The head of the queue.
117     head: CachePadded<Position<T>>,
118 
119     /// The tail of the queue.
120     tail: CachePadded<Position<T>>,
121 }
122 
123 impl<T> Unbounded<T> {
124     /// Creates a new unbounded queue.
new() -> Unbounded<T>125     pub fn new() -> Unbounded<T> {
126         Unbounded {
127             head: CachePadded::new(Position {
128                 block: AtomicPtr::new(ptr::null_mut()),
129                 index: AtomicUsize::new(0),
130             }),
131             tail: CachePadded::new(Position {
132                 block: AtomicPtr::new(ptr::null_mut()),
133                 index: AtomicUsize::new(0),
134             }),
135         }
136     }
137 
138     /// Pushes an item into the queue.
push(&self, value: T) -> Result<(), PushError<T>>139     pub fn push(&self, value: T) -> Result<(), PushError<T>> {
140         let mut tail = self.tail.index.load(Ordering::Acquire);
141         let mut block = self.tail.block.load(Ordering::Acquire);
142         let mut next_block = None;
143 
144         loop {
145             // Check if the queue is closed.
146             if tail & MARK_BIT != 0 {
147                 return Err(PushError::Closed(value));
148             }
149 
150             // Calculate the offset of the index into the block.
151             let offset = (tail >> SHIFT) % LAP;
152 
153             // If we reached the end of the block, wait until the next one is installed.
154             if offset == BLOCK_CAP {
155                 thread::yield_now();
156                 tail = self.tail.index.load(Ordering::Acquire);
157                 block = self.tail.block.load(Ordering::Acquire);
158                 continue;
159             }
160 
161             // If we're going to have to install the next block, allocate it in advance in order to
162             // make the wait for other threads as short as possible.
163             if offset + 1 == BLOCK_CAP && next_block.is_none() {
164                 next_block = Some(Box::new(Block::<T>::new()));
165             }
166 
167             // If this is the first value to be pushed into the queue, we need to allocate the
168             // first block and install it.
169             if block.is_null() {
170                 let new = Box::into_raw(Box::new(Block::<T>::new()));
171 
172                 if self
173                     .tail
174                     .block
175                     .compare_and_swap(block, new, Ordering::Release)
176                     == block
177                 {
178                     self.head.block.store(new, Ordering::Release);
179                     block = new;
180                 } else {
181                     next_block = unsafe { Some(Box::from_raw(new)) };
182                     tail = self.tail.index.load(Ordering::Acquire);
183                     block = self.tail.block.load(Ordering::Acquire);
184                     continue;
185                 }
186             }
187 
188             let new_tail = tail + (1 << SHIFT);
189 
190             // Try advancing the tail forward.
191             match self.tail.index.compare_exchange_weak(
192                 tail,
193                 new_tail,
194                 Ordering::SeqCst,
195                 Ordering::Acquire,
196             ) {
197                 Ok(_) => unsafe {
198                     // If we've reached the end of the block, install the next one.
199                     if offset + 1 == BLOCK_CAP {
200                         let next_block = Box::into_raw(next_block.unwrap());
201                         self.tail.block.store(next_block, Ordering::Release);
202                         self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
203                         (*block).next.store(next_block, Ordering::Release);
204                     }
205 
206                     // Write the value into the slot.
207                     let slot = (*block).slots.get_unchecked(offset);
208                     slot.value.get().write(MaybeUninit::new(value));
209                     slot.state.fetch_or(WRITE, Ordering::Release);
210                     return Ok(());
211                 },
212                 Err(t) => {
213                     tail = t;
214                     block = self.tail.block.load(Ordering::Acquire);
215                 }
216             }
217         }
218     }
219 
220     /// Pops an item from the queue.
pop(&self) -> Result<T, PopError>221     pub fn pop(&self) -> Result<T, PopError> {
222         let mut head = self.head.index.load(Ordering::Acquire);
223         let mut block = self.head.block.load(Ordering::Acquire);
224 
225         loop {
226             // Calculate the offset of the index into the block.
227             let offset = (head >> SHIFT) % LAP;
228 
229             // If we reached the end of the block, wait until the next one is installed.
230             if offset == BLOCK_CAP {
231                 thread::yield_now();
232                 head = self.head.index.load(Ordering::Acquire);
233                 block = self.head.block.load(Ordering::Acquire);
234                 continue;
235             }
236 
237             let mut new_head = head + (1 << SHIFT);
238 
239             if new_head & MARK_BIT == 0 {
240                 crate::full_fence();
241                 let tail = self.tail.index.load(Ordering::Relaxed);
242 
243                 // If the tail equals the head, that means the queue is empty.
244                 if head >> SHIFT == tail >> SHIFT {
245                     // Check if the queue is closed.
246                     if tail & MARK_BIT != 0 {
247                         return Err(PopError::Closed);
248                     } else {
249                         return Err(PopError::Empty);
250                     }
251                 }
252 
253                 // If head and tail are not in the same block, set `MARK_BIT` in head.
254                 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
255                     new_head |= MARK_BIT;
256                 }
257             }
258 
259             // The block can be null here only if the first push operation is in progress.
260             if block.is_null() {
261                 thread::yield_now();
262                 head = self.head.index.load(Ordering::Acquire);
263                 block = self.head.block.load(Ordering::Acquire);
264                 continue;
265             }
266 
267             // Try moving the head index forward.
268             match self.head.index.compare_exchange_weak(
269                 head,
270                 new_head,
271                 Ordering::SeqCst,
272                 Ordering::Acquire,
273             ) {
274                 Ok(_) => unsafe {
275                     // If we've reached the end of the block, move to the next one.
276                     if offset + 1 == BLOCK_CAP {
277                         let next = (*block).wait_next();
278                         let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
279                         if !(*next).next.load(Ordering::Relaxed).is_null() {
280                             next_index |= MARK_BIT;
281                         }
282 
283                         self.head.block.store(next, Ordering::Release);
284                         self.head.index.store(next_index, Ordering::Release);
285                     }
286 
287                     // Read the value.
288                     let slot = (*block).slots.get_unchecked(offset);
289                     slot.wait_write();
290                     let value = slot.value.get().read().assume_init();
291 
292                     // Destroy the block if we've reached the end, or if another thread wanted to
293                     // destroy but couldn't because we were busy reading from the slot.
294                     if offset + 1 == BLOCK_CAP {
295                         Block::destroy(block, 0);
296                     } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
297                         Block::destroy(block, offset + 1);
298                     }
299 
300                     return Ok(value);
301                 },
302                 Err(h) => {
303                     head = h;
304                     block = self.head.block.load(Ordering::Acquire);
305                 }
306             }
307         }
308     }
309 
310     /// Returns the number of items in the queue.
len(&self) -> usize311     pub fn len(&self) -> usize {
312         loop {
313             // Load the tail index, then load the head index.
314             let mut tail = self.tail.index.load(Ordering::SeqCst);
315             let mut head = self.head.index.load(Ordering::SeqCst);
316 
317             // If the tail index didn't change, we've got consistent indices to work with.
318             if self.tail.index.load(Ordering::SeqCst) == tail {
319                 // Erase the lower bits.
320                 tail &= !((1 << SHIFT) - 1);
321                 head &= !((1 << SHIFT) - 1);
322 
323                 // Fix up indices if they fall onto block ends.
324                 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
325                     tail = tail.wrapping_add(1 << SHIFT);
326                 }
327                 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
328                     head = head.wrapping_add(1 << SHIFT);
329                 }
330 
331                 // Rotate indices so that head falls into the first block.
332                 let lap = (head >> SHIFT) / LAP;
333                 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
334                 head = head.wrapping_sub((lap * LAP) << SHIFT);
335 
336                 // Remove the lower bits.
337                 tail >>= SHIFT;
338                 head >>= SHIFT;
339 
340                 // Return the difference minus the number of blocks between tail and head.
341                 return tail - head - tail / LAP;
342             }
343         }
344     }
345 
346     /// Returns `true` if the queue is empty.
is_empty(&self) -> bool347     pub fn is_empty(&self) -> bool {
348         let head = self.head.index.load(Ordering::SeqCst);
349         let tail = self.tail.index.load(Ordering::SeqCst);
350         head >> SHIFT == tail >> SHIFT
351     }
352 
353     /// Returns `true` if the queue is full.
is_full(&self) -> bool354     pub fn is_full(&self) -> bool {
355         false
356     }
357 
358     /// Closes the queue.
359     ///
360     /// Returns `true` if this call closed the queue.
close(&self) -> bool361     pub fn close(&self) -> bool {
362         let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
363         tail & MARK_BIT == 0
364     }
365 
366     /// Returns `true` if the queue is closed.
is_closed(&self) -> bool367     pub fn is_closed(&self) -> bool {
368         self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
369     }
370 }
371 
372 impl<T> Drop for Unbounded<T> {
drop(&mut self)373     fn drop(&mut self) {
374         let mut head = self.head.index.load(Ordering::Relaxed);
375         let mut tail = self.tail.index.load(Ordering::Relaxed);
376         let mut block = self.head.block.load(Ordering::Relaxed);
377 
378         // Erase the lower bits.
379         head &= !((1 << SHIFT) - 1);
380         tail &= !((1 << SHIFT) - 1);
381 
382         unsafe {
383             // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
384             while head != tail {
385                 let offset = (head >> SHIFT) % LAP;
386 
387                 if offset < BLOCK_CAP {
388                     // Drop the value in the slot.
389                     let slot = (*block).slots.get_unchecked(offset);
390                     let value = slot.value.get().read().assume_init();
391                     drop(value);
392                 } else {
393                     // Deallocate the block and move to the next one.
394                     let next = (*block).next.load(Ordering::Relaxed);
395                     drop(Box::from_raw(block));
396                     block = next;
397                 }
398 
399                 head = head.wrapping_add(1 << SHIFT);
400             }
401 
402             // Deallocate the last remaining block.
403             if !block.is_null() {
404                 drop(Box::from_raw(block));
405             }
406         }
407     }
408 }
409