1 use std::cell::UnsafeCell; 2 use std::mem::MaybeUninit; 3 use std::ptr; 4 use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; 5 use std::thread; 6 7 use cache_padded::CachePadded; 8 9 use crate::{PopError, PushError}; 10 11 // Bits indicating the state of a slot: 12 // * If a value has been written into the slot, `WRITE` is set. 13 // * If a value has been read from the slot, `READ` is set. 14 // * If the block is being destroyed, `DESTROY` is set. 15 const WRITE: usize = 1; 16 const READ: usize = 2; 17 const DESTROY: usize = 4; 18 19 // Each block covers one "lap" of indices. 20 const LAP: usize = 32; 21 // The maximum number of items a block can hold. 22 const BLOCK_CAP: usize = LAP - 1; 23 // How many lower bits are reserved for metadata. 24 const SHIFT: usize = 1; 25 // Has two different purposes: 26 // * If set in head, indicates that the block is not the last one. 27 // * If set in tail, indicates that the queue is closed. 28 const MARK_BIT: usize = 1; 29 30 /// A slot in a block. 31 struct Slot<T> { 32 /// The value. 33 value: UnsafeCell<MaybeUninit<T>>, 34 35 /// The state of the slot. 36 state: AtomicUsize, 37 } 38 39 impl<T> Slot<T> { 40 const UNINIT: Slot<T> = Slot { 41 value: UnsafeCell::new(MaybeUninit::uninit()), 42 state: AtomicUsize::new(0), 43 }; 44 45 /// Waits until a value is written into the slot. wait_write(&self)46 fn wait_write(&self) { 47 while self.state.load(Ordering::Acquire) & WRITE == 0 { 48 thread::yield_now(); 49 } 50 } 51 } 52 53 /// A block in a linked list. 54 /// 55 /// Each block in the list can hold up to `BLOCK_CAP` values. 56 struct Block<T> { 57 /// The next block in the linked list. 58 next: AtomicPtr<Block<T>>, 59 60 /// Slots for values. 61 slots: [Slot<T>; BLOCK_CAP], 62 } 63 64 impl<T> Block<T> { 65 /// Creates an empty block. new() -> Block<T>66 fn new() -> Block<T> { 67 Block { 68 next: AtomicPtr::new(ptr::null_mut()), 69 slots: [Slot::UNINIT; BLOCK_CAP], 70 } 71 } 72 73 /// Waits until the next pointer is set. wait_next(&self) -> *mut Block<T>74 fn wait_next(&self) -> *mut Block<T> { 75 loop { 76 let next = self.next.load(Ordering::Acquire); 77 if !next.is_null() { 78 return next; 79 } 80 thread::yield_now(); 81 } 82 } 83 84 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. destroy(this: *mut Block<T>, start: usize)85 unsafe fn destroy(this: *mut Block<T>, start: usize) { 86 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has 87 // begun destruction of the block. 88 for i in start..BLOCK_CAP - 1 { 89 let slot = (*this).slots.get_unchecked(i); 90 91 // Mark the `DESTROY` bit if a thread is still using the slot. 92 if slot.state.load(Ordering::Acquire) & READ == 0 93 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 94 { 95 // If a thread is still using the slot, it will continue destruction of the block. 96 return; 97 } 98 } 99 100 // No thread is using the block, now it is safe to destroy it. 101 drop(Box::from_raw(this)); 102 } 103 } 104 105 /// A position in a queue. 106 struct Position<T> { 107 /// The index in the queue. 108 index: AtomicUsize, 109 110 /// The block in the linked list. 111 block: AtomicPtr<Block<T>>, 112 } 113 114 /// An unbounded queue. 115 pub struct Unbounded<T> { 116 /// The head of the queue. 117 head: CachePadded<Position<T>>, 118 119 /// The tail of the queue. 120 tail: CachePadded<Position<T>>, 121 } 122 123 impl<T> Unbounded<T> { 124 /// Creates a new unbounded queue. new() -> Unbounded<T>125 pub fn new() -> Unbounded<T> { 126 Unbounded { 127 head: CachePadded::new(Position { 128 block: AtomicPtr::new(ptr::null_mut()), 129 index: AtomicUsize::new(0), 130 }), 131 tail: CachePadded::new(Position { 132 block: AtomicPtr::new(ptr::null_mut()), 133 index: AtomicUsize::new(0), 134 }), 135 } 136 } 137 138 /// Pushes an item into the queue. push(&self, value: T) -> Result<(), PushError<T>>139 pub fn push(&self, value: T) -> Result<(), PushError<T>> { 140 let mut tail = self.tail.index.load(Ordering::Acquire); 141 let mut block = self.tail.block.load(Ordering::Acquire); 142 let mut next_block = None; 143 144 loop { 145 // Check if the queue is closed. 146 if tail & MARK_BIT != 0 { 147 return Err(PushError::Closed(value)); 148 } 149 150 // Calculate the offset of the index into the block. 151 let offset = (tail >> SHIFT) % LAP; 152 153 // If we reached the end of the block, wait until the next one is installed. 154 if offset == BLOCK_CAP { 155 thread::yield_now(); 156 tail = self.tail.index.load(Ordering::Acquire); 157 block = self.tail.block.load(Ordering::Acquire); 158 continue; 159 } 160 161 // If we're going to have to install the next block, allocate it in advance in order to 162 // make the wait for other threads as short as possible. 163 if offset + 1 == BLOCK_CAP && next_block.is_none() { 164 next_block = Some(Box::new(Block::<T>::new())); 165 } 166 167 // If this is the first value to be pushed into the queue, we need to allocate the 168 // first block and install it. 169 if block.is_null() { 170 let new = Box::into_raw(Box::new(Block::<T>::new())); 171 172 if self 173 .tail 174 .block 175 .compare_and_swap(block, new, Ordering::Release) 176 == block 177 { 178 self.head.block.store(new, Ordering::Release); 179 block = new; 180 } else { 181 next_block = unsafe { Some(Box::from_raw(new)) }; 182 tail = self.tail.index.load(Ordering::Acquire); 183 block = self.tail.block.load(Ordering::Acquire); 184 continue; 185 } 186 } 187 188 let new_tail = tail + (1 << SHIFT); 189 190 // Try advancing the tail forward. 191 match self.tail.index.compare_exchange_weak( 192 tail, 193 new_tail, 194 Ordering::SeqCst, 195 Ordering::Acquire, 196 ) { 197 Ok(_) => unsafe { 198 // If we've reached the end of the block, install the next one. 199 if offset + 1 == BLOCK_CAP { 200 let next_block = Box::into_raw(next_block.unwrap()); 201 self.tail.block.store(next_block, Ordering::Release); 202 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release); 203 (*block).next.store(next_block, Ordering::Release); 204 } 205 206 // Write the value into the slot. 207 let slot = (*block).slots.get_unchecked(offset); 208 slot.value.get().write(MaybeUninit::new(value)); 209 slot.state.fetch_or(WRITE, Ordering::Release); 210 return Ok(()); 211 }, 212 Err(t) => { 213 tail = t; 214 block = self.tail.block.load(Ordering::Acquire); 215 } 216 } 217 } 218 } 219 220 /// Pops an item from the queue. pop(&self) -> Result<T, PopError>221 pub fn pop(&self) -> Result<T, PopError> { 222 let mut head = self.head.index.load(Ordering::Acquire); 223 let mut block = self.head.block.load(Ordering::Acquire); 224 225 loop { 226 // Calculate the offset of the index into the block. 227 let offset = (head >> SHIFT) % LAP; 228 229 // If we reached the end of the block, wait until the next one is installed. 230 if offset == BLOCK_CAP { 231 thread::yield_now(); 232 head = self.head.index.load(Ordering::Acquire); 233 block = self.head.block.load(Ordering::Acquire); 234 continue; 235 } 236 237 let mut new_head = head + (1 << SHIFT); 238 239 if new_head & MARK_BIT == 0 { 240 crate::full_fence(); 241 let tail = self.tail.index.load(Ordering::Relaxed); 242 243 // If the tail equals the head, that means the queue is empty. 244 if head >> SHIFT == tail >> SHIFT { 245 // Check if the queue is closed. 246 if tail & MARK_BIT != 0 { 247 return Err(PopError::Closed); 248 } else { 249 return Err(PopError::Empty); 250 } 251 } 252 253 // If head and tail are not in the same block, set `MARK_BIT` in head. 254 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 255 new_head |= MARK_BIT; 256 } 257 } 258 259 // The block can be null here only if the first push operation is in progress. 260 if block.is_null() { 261 thread::yield_now(); 262 head = self.head.index.load(Ordering::Acquire); 263 block = self.head.block.load(Ordering::Acquire); 264 continue; 265 } 266 267 // Try moving the head index forward. 268 match self.head.index.compare_exchange_weak( 269 head, 270 new_head, 271 Ordering::SeqCst, 272 Ordering::Acquire, 273 ) { 274 Ok(_) => unsafe { 275 // If we've reached the end of the block, move to the next one. 276 if offset + 1 == BLOCK_CAP { 277 let next = (*block).wait_next(); 278 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT); 279 if !(*next).next.load(Ordering::Relaxed).is_null() { 280 next_index |= MARK_BIT; 281 } 282 283 self.head.block.store(next, Ordering::Release); 284 self.head.index.store(next_index, Ordering::Release); 285 } 286 287 // Read the value. 288 let slot = (*block).slots.get_unchecked(offset); 289 slot.wait_write(); 290 let value = slot.value.get().read().assume_init(); 291 292 // Destroy the block if we've reached the end, or if another thread wanted to 293 // destroy but couldn't because we were busy reading from the slot. 294 if offset + 1 == BLOCK_CAP { 295 Block::destroy(block, 0); 296 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { 297 Block::destroy(block, offset + 1); 298 } 299 300 return Ok(value); 301 }, 302 Err(h) => { 303 head = h; 304 block = self.head.block.load(Ordering::Acquire); 305 } 306 } 307 } 308 } 309 310 /// Returns the number of items in the queue. len(&self) -> usize311 pub fn len(&self) -> usize { 312 loop { 313 // Load the tail index, then load the head index. 314 let mut tail = self.tail.index.load(Ordering::SeqCst); 315 let mut head = self.head.index.load(Ordering::SeqCst); 316 317 // If the tail index didn't change, we've got consistent indices to work with. 318 if self.tail.index.load(Ordering::SeqCst) == tail { 319 // Erase the lower bits. 320 tail &= !((1 << SHIFT) - 1); 321 head &= !((1 << SHIFT) - 1); 322 323 // Fix up indices if they fall onto block ends. 324 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { 325 tail = tail.wrapping_add(1 << SHIFT); 326 } 327 if (head >> SHIFT) & (LAP - 1) == LAP - 1 { 328 head = head.wrapping_add(1 << SHIFT); 329 } 330 331 // Rotate indices so that head falls into the first block. 332 let lap = (head >> SHIFT) / LAP; 333 tail = tail.wrapping_sub((lap * LAP) << SHIFT); 334 head = head.wrapping_sub((lap * LAP) << SHIFT); 335 336 // Remove the lower bits. 337 tail >>= SHIFT; 338 head >>= SHIFT; 339 340 // Return the difference minus the number of blocks between tail and head. 341 return tail - head - tail / LAP; 342 } 343 } 344 } 345 346 /// Returns `true` if the queue is empty. is_empty(&self) -> bool347 pub fn is_empty(&self) -> bool { 348 let head = self.head.index.load(Ordering::SeqCst); 349 let tail = self.tail.index.load(Ordering::SeqCst); 350 head >> SHIFT == tail >> SHIFT 351 } 352 353 /// Returns `true` if the queue is full. is_full(&self) -> bool354 pub fn is_full(&self) -> bool { 355 false 356 } 357 358 /// Closes the queue. 359 /// 360 /// Returns `true` if this call closed the queue. close(&self) -> bool361 pub fn close(&self) -> bool { 362 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); 363 tail & MARK_BIT == 0 364 } 365 366 /// Returns `true` if the queue is closed. is_closed(&self) -> bool367 pub fn is_closed(&self) -> bool { 368 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0 369 } 370 } 371 372 impl<T> Drop for Unbounded<T> { drop(&mut self)373 fn drop(&mut self) { 374 let mut head = self.head.index.load(Ordering::Relaxed); 375 let mut tail = self.tail.index.load(Ordering::Relaxed); 376 let mut block = self.head.block.load(Ordering::Relaxed); 377 378 // Erase the lower bits. 379 head &= !((1 << SHIFT) - 1); 380 tail &= !((1 << SHIFT) - 1); 381 382 unsafe { 383 // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. 384 while head != tail { 385 let offset = (head >> SHIFT) % LAP; 386 387 if offset < BLOCK_CAP { 388 // Drop the value in the slot. 389 let slot = (*block).slots.get_unchecked(offset); 390 let value = slot.value.get().read().assume_init(); 391 drop(value); 392 } else { 393 // Deallocate the block and move to the next one. 394 let next = (*block).next.load(Ordering::Relaxed); 395 drop(Box::from_raw(block)); 396 block = next; 397 } 398 399 head = head.wrapping_add(1 << SHIFT); 400 } 401 402 // Deallocate the last remaining block. 403 if !block.is_null() { 404 drop(Box::from_raw(block)); 405 } 406 } 407 } 408 } 409