1 //! Bounded channel based on a preallocated array. 2 //! 3 //! This flavor has a fixed, positive capacity. 4 //! 5 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue. 6 //! 7 //! Source: 8 //! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue> 9 //! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub> 10 11 use std::cell::UnsafeCell; 12 use std::marker::PhantomData; 13 use std::mem::{self, MaybeUninit}; 14 use std::ptr; 15 use std::sync::atomic::{self, AtomicUsize, Ordering}; 16 use std::time::Instant; 17 18 use crossbeam_utils::{Backoff, CachePadded}; 19 20 use crate::context::Context; 21 use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; 22 use crate::select::{Operation, SelectHandle, Selected, Token}; 23 use crate::waker::SyncWaker; 24 25 /// A slot in a channel. 26 struct Slot<T> { 27 /// The current stamp. 28 stamp: AtomicUsize, 29 30 /// The message in this slot. 31 msg: UnsafeCell<MaybeUninit<T>>, 32 } 33 34 /// The token type for the array flavor. 35 #[derive(Debug)] 36 pub struct ArrayToken { 37 /// Slot to read from or write to. 38 slot: *const u8, 39 40 /// Stamp to store into the slot after reading or writing. 41 stamp: usize, 42 } 43 44 impl Default for ArrayToken { 45 #[inline] default() -> Self46 fn default() -> Self { 47 ArrayToken { 48 slot: ptr::null(), 49 stamp: 0, 50 } 51 } 52 } 53 54 /// Bounded channel based on a preallocated array. 55 pub(crate) struct Channel<T> { 56 /// The head of the channel. 57 /// 58 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but 59 /// packed into a single `usize`. The lower bits represent the index, while the upper bits 60 /// represent the lap. The mark bit in the head is always zero. 61 /// 62 /// Messages are popped from the head of the channel. 63 head: CachePadded<AtomicUsize>, 64 65 /// The tail of the channel. 66 /// 67 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but 68 /// packed into a single `usize`. The lower bits represent the index, while the upper bits 69 /// represent the lap. The mark bit indicates that the channel is disconnected. 70 /// 71 /// Messages are pushed into the tail of the channel. 72 tail: CachePadded<AtomicUsize>, 73 74 /// The buffer holding slots. 75 buffer: *mut Slot<T>, 76 77 /// The channel capacity. 78 cap: usize, 79 80 /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`. 81 one_lap: usize, 82 83 /// If this bit is set in the tail, that means the channel is disconnected. 84 mark_bit: usize, 85 86 /// Senders waiting while the channel is full. 87 senders: SyncWaker, 88 89 /// Receivers waiting while the channel is empty and not disconnected. 90 receivers: SyncWaker, 91 92 /// Indicates that dropping a `Channel<T>` may drop values of type `T`. 93 _marker: PhantomData<T>, 94 } 95 96 impl<T> Channel<T> { 97 /// Creates a bounded channel of capacity `cap`. with_capacity(cap: usize) -> Self98 pub(crate) fn with_capacity(cap: usize) -> Self { 99 assert!(cap > 0, "capacity must be positive"); 100 101 // Compute constants `mark_bit` and `one_lap`. 102 let mark_bit = (cap + 1).next_power_of_two(); 103 let one_lap = mark_bit * 2; 104 105 // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`. 106 let head = 0; 107 // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`. 108 let tail = 0; 109 110 // Allocate a buffer of `cap` slots initialized 111 // with stamps. 112 let buffer = { 113 let mut boxed: Box<[Slot<T>]> = (0..cap) 114 .map(|i| { 115 // Set the stamp to `{ lap: 0, mark: 0, index: i }`. 116 Slot { 117 stamp: AtomicUsize::new(i), 118 msg: UnsafeCell::new(MaybeUninit::uninit()), 119 } 120 }) 121 .collect(); 122 let ptr = boxed.as_mut_ptr(); 123 mem::forget(boxed); 124 ptr 125 }; 126 127 Channel { 128 buffer, 129 cap, 130 one_lap, 131 mark_bit, 132 head: CachePadded::new(AtomicUsize::new(head)), 133 tail: CachePadded::new(AtomicUsize::new(tail)), 134 senders: SyncWaker::new(), 135 receivers: SyncWaker::new(), 136 _marker: PhantomData, 137 } 138 } 139 140 /// Returns a receiver handle to the channel. receiver(&self) -> Receiver<'_, T>141 pub(crate) fn receiver(&self) -> Receiver<'_, T> { 142 Receiver(self) 143 } 144 145 /// Returns a sender handle to the channel. sender(&self) -> Sender<'_, T>146 pub(crate) fn sender(&self) -> Sender<'_, T> { 147 Sender(self) 148 } 149 150 /// Attempts to reserve a slot for sending a message. start_send(&self, token: &mut Token) -> bool151 fn start_send(&self, token: &mut Token) -> bool { 152 let backoff = Backoff::new(); 153 let mut tail = self.tail.load(Ordering::Relaxed); 154 155 loop { 156 // Check if the channel is disconnected. 157 if tail & self.mark_bit != 0 { 158 token.array.slot = ptr::null(); 159 token.array.stamp = 0; 160 return true; 161 } 162 163 // Deconstruct the tail. 164 let index = tail & (self.mark_bit - 1); 165 let lap = tail & !(self.one_lap - 1); 166 167 // Inspect the corresponding slot. 168 let slot = unsafe { &*self.buffer.add(index) }; 169 let stamp = slot.stamp.load(Ordering::Acquire); 170 171 // If the tail and the stamp match, we may attempt to push. 172 if tail == stamp { 173 let new_tail = if index + 1 < self.cap { 174 // Same lap, incremented index. 175 // Set to `{ lap: lap, mark: 0, index: index + 1 }`. 176 tail + 1 177 } else { 178 // One lap forward, index wraps around to zero. 179 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. 180 lap.wrapping_add(self.one_lap) 181 }; 182 183 // Try moving the tail. 184 match self.tail.compare_exchange_weak( 185 tail, 186 new_tail, 187 Ordering::SeqCst, 188 Ordering::Relaxed, 189 ) { 190 Ok(_) => { 191 // Prepare the token for the follow-up call to `write`. 192 token.array.slot = slot as *const Slot<T> as *const u8; 193 token.array.stamp = tail + 1; 194 return true; 195 } 196 Err(t) => { 197 tail = t; 198 backoff.spin(); 199 } 200 } 201 } else if stamp.wrapping_add(self.one_lap) == tail + 1 { 202 atomic::fence(Ordering::SeqCst); 203 let head = self.head.load(Ordering::Relaxed); 204 205 // If the head lags one lap behind the tail as well... 206 if head.wrapping_add(self.one_lap) == tail { 207 // ...then the channel is full. 208 return false; 209 } 210 211 backoff.spin(); 212 tail = self.tail.load(Ordering::Relaxed); 213 } else { 214 // Snooze because we need to wait for the stamp to get updated. 215 backoff.snooze(); 216 tail = self.tail.load(Ordering::Relaxed); 217 } 218 } 219 } 220 221 /// Writes a message into the channel. write(&self, token: &mut Token, msg: T) -> Result<(), T>222 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { 223 // If there is no slot, the channel is disconnected. 224 if token.array.slot.is_null() { 225 return Err(msg); 226 } 227 228 let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>); 229 230 // Write the message into the slot and update the stamp. 231 slot.msg.get().write(MaybeUninit::new(msg)); 232 slot.stamp.store(token.array.stamp, Ordering::Release); 233 234 // Wake a sleeping receiver. 235 self.receivers.notify(); 236 Ok(()) 237 } 238 239 /// Attempts to reserve a slot for receiving a message. start_recv(&self, token: &mut Token) -> bool240 fn start_recv(&self, token: &mut Token) -> bool { 241 let backoff = Backoff::new(); 242 let mut head = self.head.load(Ordering::Relaxed); 243 244 loop { 245 // Deconstruct the head. 246 let index = head & (self.mark_bit - 1); 247 let lap = head & !(self.one_lap - 1); 248 249 // Inspect the corresponding slot. 250 let slot = unsafe { &*self.buffer.add(index) }; 251 let stamp = slot.stamp.load(Ordering::Acquire); 252 253 // If the the stamp is ahead of the head by 1, we may attempt to pop. 254 if head + 1 == stamp { 255 let new = if index + 1 < self.cap { 256 // Same lap, incremented index. 257 // Set to `{ lap: lap, mark: 0, index: index + 1 }`. 258 head + 1 259 } else { 260 // One lap forward, index wraps around to zero. 261 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. 262 lap.wrapping_add(self.one_lap) 263 }; 264 265 // Try moving the head. 266 match self.head.compare_exchange_weak( 267 head, 268 new, 269 Ordering::SeqCst, 270 Ordering::Relaxed, 271 ) { 272 Ok(_) => { 273 // Prepare the token for the follow-up call to `read`. 274 token.array.slot = slot as *const Slot<T> as *const u8; 275 token.array.stamp = head.wrapping_add(self.one_lap); 276 return true; 277 } 278 Err(h) => { 279 head = h; 280 backoff.spin(); 281 } 282 } 283 } else if stamp == head { 284 atomic::fence(Ordering::SeqCst); 285 let tail = self.tail.load(Ordering::Relaxed); 286 287 // If the tail equals the head, that means the channel is empty. 288 if (tail & !self.mark_bit) == head { 289 // If the channel is disconnected... 290 if tail & self.mark_bit != 0 { 291 // ...then receive an error. 292 token.array.slot = ptr::null(); 293 token.array.stamp = 0; 294 return true; 295 } else { 296 // Otherwise, the receive operation is not ready. 297 return false; 298 } 299 } 300 301 backoff.spin(); 302 head = self.head.load(Ordering::Relaxed); 303 } else { 304 // Snooze because we need to wait for the stamp to get updated. 305 backoff.snooze(); 306 head = self.head.load(Ordering::Relaxed); 307 } 308 } 309 } 310 311 /// Reads a message from the channel. read(&self, token: &mut Token) -> Result<T, ()>312 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { 313 if token.array.slot.is_null() { 314 // The channel is disconnected. 315 return Err(()); 316 } 317 318 let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>); 319 320 // Read the message from the slot and update the stamp. 321 let msg = slot.msg.get().read().assume_init(); 322 slot.stamp.store(token.array.stamp, Ordering::Release); 323 324 // Wake a sleeping sender. 325 self.senders.notify(); 326 Ok(msg) 327 } 328 329 /// Attempts to send a message into the channel. try_send(&self, msg: T) -> Result<(), TrySendError<T>>330 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { 331 let token = &mut Token::default(); 332 if self.start_send(token) { 333 unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) } 334 } else { 335 Err(TrySendError::Full(msg)) 336 } 337 } 338 339 /// Sends a message into the channel. send( &self, msg: T, deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>340 pub(crate) fn send( 341 &self, 342 msg: T, 343 deadline: Option<Instant>, 344 ) -> Result<(), SendTimeoutError<T>> { 345 let token = &mut Token::default(); 346 loop { 347 // Try sending a message several times. 348 let backoff = Backoff::new(); 349 loop { 350 if self.start_send(token) { 351 let res = unsafe { self.write(token, msg) }; 352 return res.map_err(SendTimeoutError::Disconnected); 353 } 354 355 if backoff.is_completed() { 356 break; 357 } else { 358 backoff.snooze(); 359 } 360 } 361 362 if let Some(d) = deadline { 363 if Instant::now() >= d { 364 return Err(SendTimeoutError::Timeout(msg)); 365 } 366 } 367 368 Context::with(|cx| { 369 // Prepare for blocking until a receiver wakes us up. 370 let oper = Operation::hook(token); 371 self.senders.register(oper, cx); 372 373 // Has the channel become ready just now? 374 if !self.is_full() || self.is_disconnected() { 375 let _ = cx.try_select(Selected::Aborted); 376 } 377 378 // Block the current thread. 379 let sel = cx.wait_until(deadline); 380 381 match sel { 382 Selected::Waiting => unreachable!(), 383 Selected::Aborted | Selected::Disconnected => { 384 self.senders.unregister(oper).unwrap(); 385 } 386 Selected::Operation(_) => {} 387 } 388 }); 389 } 390 } 391 392 /// Attempts to receive a message without blocking. try_recv(&self) -> Result<T, TryRecvError>393 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { 394 let token = &mut Token::default(); 395 396 if self.start_recv(token) { 397 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } 398 } else { 399 Err(TryRecvError::Empty) 400 } 401 } 402 403 /// Receives a message from the channel. recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>404 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { 405 let token = &mut Token::default(); 406 loop { 407 // Try receiving a message several times. 408 let backoff = Backoff::new(); 409 loop { 410 if self.start_recv(token) { 411 let res = unsafe { self.read(token) }; 412 return res.map_err(|_| RecvTimeoutError::Disconnected); 413 } 414 415 if backoff.is_completed() { 416 break; 417 } else { 418 backoff.snooze(); 419 } 420 } 421 422 if let Some(d) = deadline { 423 if Instant::now() >= d { 424 return Err(RecvTimeoutError::Timeout); 425 } 426 } 427 428 Context::with(|cx| { 429 // Prepare for blocking until a sender wakes us up. 430 let oper = Operation::hook(token); 431 self.receivers.register(oper, cx); 432 433 // Has the channel become ready just now? 434 if !self.is_empty() || self.is_disconnected() { 435 let _ = cx.try_select(Selected::Aborted); 436 } 437 438 // Block the current thread. 439 let sel = cx.wait_until(deadline); 440 441 match sel { 442 Selected::Waiting => unreachable!(), 443 Selected::Aborted | Selected::Disconnected => { 444 self.receivers.unregister(oper).unwrap(); 445 // If the channel was disconnected, we still have to check for remaining 446 // messages. 447 } 448 Selected::Operation(_) => {} 449 } 450 }); 451 } 452 } 453 454 /// Returns the current number of messages inside the channel. len(&self) -> usize455 pub(crate) fn len(&self) -> usize { 456 loop { 457 // Load the tail, then load the head. 458 let tail = self.tail.load(Ordering::SeqCst); 459 let head = self.head.load(Ordering::SeqCst); 460 461 // If the tail didn't change, we've got consistent values to work with. 462 if self.tail.load(Ordering::SeqCst) == tail { 463 let hix = head & (self.mark_bit - 1); 464 let tix = tail & (self.mark_bit - 1); 465 466 return if hix < tix { 467 tix - hix 468 } else if hix > tix { 469 self.cap - hix + tix 470 } else if (tail & !self.mark_bit) == head { 471 0 472 } else { 473 self.cap 474 }; 475 } 476 } 477 } 478 479 /// Returns the capacity of the channel. 480 #[allow(clippy::unnecessary_wraps)] // This is intentional. capacity(&self) -> Option<usize>481 pub(crate) fn capacity(&self) -> Option<usize> { 482 Some(self.cap) 483 } 484 485 /// Disconnects the channel and wakes up all blocked senders and receivers. 486 /// 487 /// Returns `true` if this call disconnected the channel. disconnect(&self) -> bool488 pub(crate) fn disconnect(&self) -> bool { 489 let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); 490 491 if tail & self.mark_bit == 0 { 492 self.senders.disconnect(); 493 self.receivers.disconnect(); 494 true 495 } else { 496 false 497 } 498 } 499 500 /// Returns `true` if the channel is disconnected. is_disconnected(&self) -> bool501 pub(crate) fn is_disconnected(&self) -> bool { 502 self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 503 } 504 505 /// Returns `true` if the channel is empty. is_empty(&self) -> bool506 pub(crate) fn is_empty(&self) -> bool { 507 let head = self.head.load(Ordering::SeqCst); 508 let tail = self.tail.load(Ordering::SeqCst); 509 510 // Is the tail equal to the head? 511 // 512 // Note: If the head changes just before we load the tail, that means there was a moment 513 // when the channel was not empty, so it is safe to just return `false`. 514 (tail & !self.mark_bit) == head 515 } 516 517 /// Returns `true` if the channel is full. is_full(&self) -> bool518 pub(crate) fn is_full(&self) -> bool { 519 let tail = self.tail.load(Ordering::SeqCst); 520 let head = self.head.load(Ordering::SeqCst); 521 522 // Is the head lagging one lap behind tail? 523 // 524 // Note: If the tail changes just before we load the head, that means there was a moment 525 // when the channel was not full, so it is safe to just return `false`. 526 head.wrapping_add(self.one_lap) == tail & !self.mark_bit 527 } 528 } 529 530 impl<T> Drop for Channel<T> { drop(&mut self)531 fn drop(&mut self) { 532 // Get the index of the head. 533 let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1); 534 535 // Loop over all slots that hold a message and drop them. 536 for i in 0..self.len() { 537 // Compute the index of the next slot holding a message. 538 let index = if hix + i < self.cap { 539 hix + i 540 } else { 541 hix + i - self.cap 542 }; 543 544 unsafe { 545 let p = { 546 let slot = &mut *self.buffer.add(index); 547 let msg = &mut *slot.msg.get(); 548 msg.as_mut_ptr() 549 }; 550 p.drop_in_place(); 551 } 552 } 553 554 // Finally, deallocate the buffer, but don't run any destructors. 555 unsafe { 556 // Create a slice from the buffer to make 557 // a fat pointer. Then, use Box::from_raw 558 // to deallocate it. 559 let ptr = std::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>]; 560 Box::from_raw(ptr); 561 } 562 } 563 } 564 565 /// Receiver handle to a channel. 566 pub(crate) struct Receiver<'a, T>(&'a Channel<T>); 567 568 /// Sender handle to a channel. 569 pub(crate) struct Sender<'a, T>(&'a Channel<T>); 570 571 impl<T> SelectHandle for Receiver<'_, T> { try_select(&self, token: &mut Token) -> bool572 fn try_select(&self, token: &mut Token) -> bool { 573 self.0.start_recv(token) 574 } 575 deadline(&self) -> Option<Instant>576 fn deadline(&self) -> Option<Instant> { 577 None 578 } 579 register(&self, oper: Operation, cx: &Context) -> bool580 fn register(&self, oper: Operation, cx: &Context) -> bool { 581 self.0.receivers.register(oper, cx); 582 self.is_ready() 583 } 584 unregister(&self, oper: Operation)585 fn unregister(&self, oper: Operation) { 586 self.0.receivers.unregister(oper); 587 } 588 accept(&self, token: &mut Token, _cx: &Context) -> bool589 fn accept(&self, token: &mut Token, _cx: &Context) -> bool { 590 self.try_select(token) 591 } 592 is_ready(&self) -> bool593 fn is_ready(&self) -> bool { 594 !self.0.is_empty() || self.0.is_disconnected() 595 } 596 watch(&self, oper: Operation, cx: &Context) -> bool597 fn watch(&self, oper: Operation, cx: &Context) -> bool { 598 self.0.receivers.watch(oper, cx); 599 self.is_ready() 600 } 601 unwatch(&self, oper: Operation)602 fn unwatch(&self, oper: Operation) { 603 self.0.receivers.unwatch(oper); 604 } 605 } 606 607 impl<T> SelectHandle for Sender<'_, T> { try_select(&self, token: &mut Token) -> bool608 fn try_select(&self, token: &mut Token) -> bool { 609 self.0.start_send(token) 610 } 611 deadline(&self) -> Option<Instant>612 fn deadline(&self) -> Option<Instant> { 613 None 614 } 615 register(&self, oper: Operation, cx: &Context) -> bool616 fn register(&self, oper: Operation, cx: &Context) -> bool { 617 self.0.senders.register(oper, cx); 618 self.is_ready() 619 } 620 unregister(&self, oper: Operation)621 fn unregister(&self, oper: Operation) { 622 self.0.senders.unregister(oper); 623 } 624 accept(&self, token: &mut Token, _cx: &Context) -> bool625 fn accept(&self, token: &mut Token, _cx: &Context) -> bool { 626 self.try_select(token) 627 } 628 is_ready(&self) -> bool629 fn is_ready(&self) -> bool { 630 !self.0.is_full() || self.0.is_disconnected() 631 } 632 watch(&self, oper: Operation, cx: &Context) -> bool633 fn watch(&self, oper: Operation, cx: &Context) -> bool { 634 self.0.senders.watch(oper, cx); 635 self.is_ready() 636 } 637 unwatch(&self, oper: Operation)638 fn unwatch(&self, oper: Operation) { 639 self.0.senders.unwatch(oper); 640 } 641 } 642