1 //! Zero-capacity channel. 2 //! 3 //! This kind of channel is also known as *rendezvous* channel. 4 5 use std::cell::UnsafeCell; 6 use std::marker::PhantomData; 7 use std::sync::atomic::{AtomicBool, Ordering}; 8 use std::time::Instant; 9 10 use crossbeam_utils::Backoff; 11 12 use crate::context::Context; 13 use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; 14 use crate::select::{Operation, SelectHandle, Selected, Token}; 15 use crate::utils::Spinlock; 16 use crate::waker::Waker; 17 18 /// A pointer to a packet. 19 pub(crate) type ZeroToken = usize; 20 21 /// A slot for passing one message from a sender to a receiver. 22 struct Packet<T> { 23 /// Equals `true` if the packet is allocated on the stack. 24 on_stack: bool, 25 26 /// Equals `true` once the packet is ready for reading or writing. 27 ready: AtomicBool, 28 29 /// The message. 30 msg: UnsafeCell<Option<T>>, 31 } 32 33 impl<T> Packet<T> { 34 /// Creates an empty packet on the stack. empty_on_stack() -> Packet<T>35 fn empty_on_stack() -> Packet<T> { 36 Packet { 37 on_stack: true, 38 ready: AtomicBool::new(false), 39 msg: UnsafeCell::new(None), 40 } 41 } 42 43 /// Creates an empty packet on the heap. empty_on_heap() -> Box<Packet<T>>44 fn empty_on_heap() -> Box<Packet<T>> { 45 Box::new(Packet { 46 on_stack: false, 47 ready: AtomicBool::new(false), 48 msg: UnsafeCell::new(None), 49 }) 50 } 51 52 /// Creates a packet on the stack, containing a message. message_on_stack(msg: T) -> Packet<T>53 fn message_on_stack(msg: T) -> Packet<T> { 54 Packet { 55 on_stack: true, 56 ready: AtomicBool::new(false), 57 msg: UnsafeCell::new(Some(msg)), 58 } 59 } 60 61 /// Waits until the packet becomes ready for reading or writing. wait_ready(&self)62 fn wait_ready(&self) { 63 let backoff = Backoff::new(); 64 while !self.ready.load(Ordering::Acquire) { 65 backoff.snooze(); 66 } 67 } 68 } 69 70 /// Inner representation of a zero-capacity channel. 71 struct Inner { 72 /// Senders waiting to pair up with a receive operation. 73 senders: Waker, 74 75 /// Receivers waiting to pair up with a send operation. 76 receivers: Waker, 77 78 /// Equals `true` when the channel is disconnected. 79 is_disconnected: bool, 80 } 81 82 /// Zero-capacity channel. 83 pub(crate) struct Channel<T> { 84 /// Inner representation of the channel. 85 inner: Spinlock<Inner>, 86 87 /// Indicates that dropping a `Channel<T>` may drop values of type `T`. 88 _marker: PhantomData<T>, 89 } 90 91 impl<T> Channel<T> { 92 /// Constructs a new zero-capacity channel. new() -> Self93 pub(crate) fn new() -> Self { 94 Channel { 95 inner: Spinlock::new(Inner { 96 senders: Waker::new(), 97 receivers: Waker::new(), 98 is_disconnected: false, 99 }), 100 _marker: PhantomData, 101 } 102 } 103 104 /// Returns a receiver handle to the channel. receiver(&self) -> Receiver<'_, T>105 pub(crate) fn receiver(&self) -> Receiver<'_, T> { 106 Receiver(self) 107 } 108 109 /// Returns a sender handle to the channel. sender(&self) -> Sender<'_, T>110 pub(crate) fn sender(&self) -> Sender<'_, T> { 111 Sender(self) 112 } 113 114 /// Attempts to reserve a slot for sending a message. start_send(&self, token: &mut Token) -> bool115 fn start_send(&self, token: &mut Token) -> bool { 116 let mut inner = self.inner.lock(); 117 118 // If there's a waiting receiver, pair up with it. 119 if let Some(operation) = inner.receivers.try_select() { 120 token.zero = operation.packet; 121 true 122 } else if inner.is_disconnected { 123 token.zero = 0; 124 true 125 } else { 126 false 127 } 128 } 129 130 /// Writes a message into the packet. write(&self, token: &mut Token, msg: T) -> Result<(), T>131 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { 132 // If there is no packet, the channel is disconnected. 133 if token.zero == 0 { 134 return Err(msg); 135 } 136 137 let packet = &*(token.zero as *const Packet<T>); 138 packet.msg.get().write(Some(msg)); 139 packet.ready.store(true, Ordering::Release); 140 Ok(()) 141 } 142 143 /// Attempts to pair up with a sender. start_recv(&self, token: &mut Token) -> bool144 fn start_recv(&self, token: &mut Token) -> bool { 145 let mut inner = self.inner.lock(); 146 147 // If there's a waiting sender, pair up with it. 148 if let Some(operation) = inner.senders.try_select() { 149 token.zero = operation.packet; 150 true 151 } else if inner.is_disconnected { 152 token.zero = 0; 153 true 154 } else { 155 false 156 } 157 } 158 159 /// Reads a message from the packet. read(&self, token: &mut Token) -> Result<T, ()>160 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { 161 // If there is no packet, the channel is disconnected. 162 if token.zero == 0 { 163 return Err(()); 164 } 165 166 let packet = &*(token.zero as *const Packet<T>); 167 168 if packet.on_stack { 169 // The message has been in the packet from the beginning, so there is no need to wait 170 // for it. However, after reading the message, we need to set `ready` to `true` in 171 // order to signal that the packet can be destroyed. 172 let msg = packet.msg.get().replace(None).unwrap(); 173 packet.ready.store(true, Ordering::Release); 174 Ok(msg) 175 } else { 176 // Wait until the message becomes available, then read it and destroy the 177 // heap-allocated packet. 178 packet.wait_ready(); 179 let msg = packet.msg.get().replace(None).unwrap(); 180 drop(Box::from_raw(packet as *const Packet<T> as *mut Packet<T>)); 181 Ok(msg) 182 } 183 } 184 185 /// Attempts to send a message into the channel. try_send(&self, msg: T) -> Result<(), TrySendError<T>>186 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { 187 let token = &mut Token::default(); 188 let mut inner = self.inner.lock(); 189 190 // If there's a waiting receiver, pair up with it. 191 if let Some(operation) = inner.receivers.try_select() { 192 token.zero = operation.packet; 193 drop(inner); 194 unsafe { 195 self.write(token, msg).ok().unwrap(); 196 } 197 Ok(()) 198 } else if inner.is_disconnected { 199 Err(TrySendError::Disconnected(msg)) 200 } else { 201 Err(TrySendError::Full(msg)) 202 } 203 } 204 205 /// Sends a message into the channel. send( &self, msg: T, deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>206 pub(crate) fn send( 207 &self, 208 msg: T, 209 deadline: Option<Instant>, 210 ) -> Result<(), SendTimeoutError<T>> { 211 let token = &mut Token::default(); 212 let mut inner = self.inner.lock(); 213 214 // If there's a waiting receiver, pair up with it. 215 if let Some(operation) = inner.receivers.try_select() { 216 token.zero = operation.packet; 217 drop(inner); 218 unsafe { 219 self.write(token, msg).ok().unwrap(); 220 } 221 return Ok(()); 222 } 223 224 if inner.is_disconnected { 225 return Err(SendTimeoutError::Disconnected(msg)); 226 } 227 228 Context::with(|cx| { 229 // Prepare for blocking until a receiver wakes us up. 230 let oper = Operation::hook(token); 231 let packet = Packet::<T>::message_on_stack(msg); 232 inner 233 .senders 234 .register_with_packet(oper, &packet as *const Packet<T> as usize, cx); 235 inner.receivers.notify(); 236 drop(inner); 237 238 // Block the current thread. 239 let sel = cx.wait_until(deadline); 240 241 match sel { 242 Selected::Waiting => unreachable!(), 243 Selected::Aborted => { 244 self.inner.lock().senders.unregister(oper).unwrap(); 245 let msg = unsafe { packet.msg.get().replace(None).unwrap() }; 246 Err(SendTimeoutError::Timeout(msg)) 247 } 248 Selected::Disconnected => { 249 self.inner.lock().senders.unregister(oper).unwrap(); 250 let msg = unsafe { packet.msg.get().replace(None).unwrap() }; 251 Err(SendTimeoutError::Disconnected(msg)) 252 } 253 Selected::Operation(_) => { 254 // Wait until the message is read, then drop the packet. 255 packet.wait_ready(); 256 Ok(()) 257 } 258 } 259 }) 260 } 261 262 /// Attempts to receive a message without blocking. try_recv(&self) -> Result<T, TryRecvError>263 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { 264 let token = &mut Token::default(); 265 let mut inner = self.inner.lock(); 266 267 // If there's a waiting sender, pair up with it. 268 if let Some(operation) = inner.senders.try_select() { 269 token.zero = operation.packet; 270 drop(inner); 271 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } 272 } else if inner.is_disconnected { 273 Err(TryRecvError::Disconnected) 274 } else { 275 Err(TryRecvError::Empty) 276 } 277 } 278 279 /// Receives a message from the channel. recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>280 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { 281 let token = &mut Token::default(); 282 let mut inner = self.inner.lock(); 283 284 // If there's a waiting sender, pair up with it. 285 if let Some(operation) = inner.senders.try_select() { 286 token.zero = operation.packet; 287 drop(inner); 288 unsafe { 289 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); 290 } 291 } 292 293 if inner.is_disconnected { 294 return Err(RecvTimeoutError::Disconnected); 295 } 296 297 Context::with(|cx| { 298 // Prepare for blocking until a sender wakes us up. 299 let oper = Operation::hook(token); 300 let packet = Packet::<T>::empty_on_stack(); 301 inner 302 .receivers 303 .register_with_packet(oper, &packet as *const Packet<T> as usize, cx); 304 inner.senders.notify(); 305 drop(inner); 306 307 // Block the current thread. 308 let sel = cx.wait_until(deadline); 309 310 match sel { 311 Selected::Waiting => unreachable!(), 312 Selected::Aborted => { 313 self.inner.lock().receivers.unregister(oper).unwrap(); 314 Err(RecvTimeoutError::Timeout) 315 } 316 Selected::Disconnected => { 317 self.inner.lock().receivers.unregister(oper).unwrap(); 318 Err(RecvTimeoutError::Disconnected) 319 } 320 Selected::Operation(_) => { 321 // Wait until the message is provided, then read it. 322 packet.wait_ready(); 323 unsafe { Ok(packet.msg.get().replace(None).unwrap()) } 324 } 325 } 326 }) 327 } 328 329 /// Disconnects the channel and wakes up all blocked senders and receivers. 330 /// 331 /// Returns `true` if this call disconnected the channel. disconnect(&self) -> bool332 pub(crate) fn disconnect(&self) -> bool { 333 let mut inner = self.inner.lock(); 334 335 if !inner.is_disconnected { 336 inner.is_disconnected = true; 337 inner.senders.disconnect(); 338 inner.receivers.disconnect(); 339 true 340 } else { 341 false 342 } 343 } 344 345 /// Returns the current number of messages inside the channel. len(&self) -> usize346 pub(crate) fn len(&self) -> usize { 347 0 348 } 349 350 /// Returns the capacity of the channel. 351 #[allow(clippy::unnecessary_wraps)] // This is intentional. capacity(&self) -> Option<usize>352 pub(crate) fn capacity(&self) -> Option<usize> { 353 Some(0) 354 } 355 356 /// Returns `true` if the channel is empty. is_empty(&self) -> bool357 pub(crate) fn is_empty(&self) -> bool { 358 true 359 } 360 361 /// Returns `true` if the channel is full. is_full(&self) -> bool362 pub(crate) fn is_full(&self) -> bool { 363 true 364 } 365 } 366 367 /// Receiver handle to a channel. 368 pub(crate) struct Receiver<'a, T>(&'a Channel<T>); 369 370 /// Sender handle to a channel. 371 pub(crate) struct Sender<'a, T>(&'a Channel<T>); 372 373 impl<T> SelectHandle for Receiver<'_, T> { try_select(&self, token: &mut Token) -> bool374 fn try_select(&self, token: &mut Token) -> bool { 375 self.0.start_recv(token) 376 } 377 deadline(&self) -> Option<Instant>378 fn deadline(&self) -> Option<Instant> { 379 None 380 } 381 register(&self, oper: Operation, cx: &Context) -> bool382 fn register(&self, oper: Operation, cx: &Context) -> bool { 383 let packet = Box::into_raw(Packet::<T>::empty_on_heap()); 384 385 let mut inner = self.0.inner.lock(); 386 inner 387 .receivers 388 .register_with_packet(oper, packet as usize, cx); 389 inner.senders.notify(); 390 inner.senders.can_select() || inner.is_disconnected 391 } 392 unregister(&self, oper: Operation)393 fn unregister(&self, oper: Operation) { 394 if let Some(operation) = self.0.inner.lock().receivers.unregister(oper) { 395 unsafe { 396 drop(Box::from_raw(operation.packet as *mut Packet<T>)); 397 } 398 } 399 } 400 accept(&self, token: &mut Token, cx: &Context) -> bool401 fn accept(&self, token: &mut Token, cx: &Context) -> bool { 402 token.zero = cx.wait_packet(); 403 true 404 } 405 is_ready(&self) -> bool406 fn is_ready(&self) -> bool { 407 let inner = self.0.inner.lock(); 408 inner.senders.can_select() || inner.is_disconnected 409 } 410 watch(&self, oper: Operation, cx: &Context) -> bool411 fn watch(&self, oper: Operation, cx: &Context) -> bool { 412 let mut inner = self.0.inner.lock(); 413 inner.receivers.watch(oper, cx); 414 inner.senders.can_select() || inner.is_disconnected 415 } 416 unwatch(&self, oper: Operation)417 fn unwatch(&self, oper: Operation) { 418 let mut inner = self.0.inner.lock(); 419 inner.receivers.unwatch(oper); 420 } 421 } 422 423 impl<T> SelectHandle for Sender<'_, T> { try_select(&self, token: &mut Token) -> bool424 fn try_select(&self, token: &mut Token) -> bool { 425 self.0.start_send(token) 426 } 427 deadline(&self) -> Option<Instant>428 fn deadline(&self) -> Option<Instant> { 429 None 430 } 431 register(&self, oper: Operation, cx: &Context) -> bool432 fn register(&self, oper: Operation, cx: &Context) -> bool { 433 let packet = Box::into_raw(Packet::<T>::empty_on_heap()); 434 435 let mut inner = self.0.inner.lock(); 436 inner 437 .senders 438 .register_with_packet(oper, packet as usize, cx); 439 inner.receivers.notify(); 440 inner.receivers.can_select() || inner.is_disconnected 441 } 442 unregister(&self, oper: Operation)443 fn unregister(&self, oper: Operation) { 444 if let Some(operation) = self.0.inner.lock().senders.unregister(oper) { 445 unsafe { 446 drop(Box::from_raw(operation.packet as *mut Packet<T>)); 447 } 448 } 449 } 450 accept(&self, token: &mut Token, cx: &Context) -> bool451 fn accept(&self, token: &mut Token, cx: &Context) -> bool { 452 token.zero = cx.wait_packet(); 453 true 454 } 455 is_ready(&self) -> bool456 fn is_ready(&self) -> bool { 457 let inner = self.0.inner.lock(); 458 inner.receivers.can_select() || inner.is_disconnected 459 } 460 watch(&self, oper: Operation, cx: &Context) -> bool461 fn watch(&self, oper: Operation, cx: &Context) -> bool { 462 let mut inner = self.0.inner.lock(); 463 inner.senders.watch(oper, cx); 464 inner.receivers.can_select() || inner.is_disconnected 465 } 466 unwatch(&self, oper: Operation)467 fn unwatch(&self, oper: Operation) { 468 let mut inner = self.0.inner.lock(); 469 inner.senders.unwatch(oper); 470 } 471 } 472