1 //! Thread-local context used in select. 2 3 use std::cell::Cell; 4 use std::sync::atomic::{AtomicUsize, Ordering}; 5 use std::sync::Arc; 6 use std::thread::{self, Thread, ThreadId}; 7 use std::time::Instant; 8 9 use crossbeam_utils::Backoff; 10 11 use crate::select::Selected; 12 13 /// Thread-local context used in select. 14 #[derive(Debug, Clone)] 15 pub struct Context { 16 inner: Arc<Inner>, 17 } 18 19 /// Inner representation of `Context`. 20 #[derive(Debug)] 21 struct Inner { 22 /// Selected operation. 23 select: AtomicUsize, 24 25 /// A slot into which another thread may store a pointer to its `Packet`. 26 packet: AtomicUsize, 27 28 /// Thread handle. 29 thread: Thread, 30 31 /// Thread id. 32 thread_id: ThreadId, 33 } 34 35 impl Context { 36 /// Creates a new context for the duration of the closure. 37 #[inline] with<F, R>(f: F) -> R where F: FnOnce(&Context) -> R,38 pub fn with<F, R>(f: F) -> R 39 where 40 F: FnOnce(&Context) -> R, 41 { 42 thread_local! { 43 /// Cached thread-local context. 44 static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new())); 45 } 46 47 let mut f = Some(f); 48 let mut f = move |cx: &Context| -> R { 49 let f = f.take().unwrap(); 50 f(cx) 51 }; 52 53 CONTEXT 54 .try_with(|cell| match cell.take() { 55 None => f(&Context::new()), 56 Some(cx) => { 57 cx.reset(); 58 let res = f(&cx); 59 cell.set(Some(cx)); 60 res 61 } 62 }) 63 .unwrap_or_else(|_| f(&Context::new())) 64 } 65 66 /// Creates a new `Context`. 67 #[cold] new() -> Context68 fn new() -> Context { 69 Context { 70 inner: Arc::new(Inner { 71 select: AtomicUsize::new(Selected::Waiting.into()), 72 packet: AtomicUsize::new(0), 73 thread: thread::current(), 74 thread_id: thread::current().id(), 75 }), 76 } 77 } 78 79 /// Resets `select` and `packet`. 80 #[inline] reset(&self)81 fn reset(&self) { 82 self.inner 83 .select 84 .store(Selected::Waiting.into(), Ordering::Release); 85 self.inner.packet.store(0, Ordering::Release); 86 } 87 88 /// Attempts to select an operation. 89 /// 90 /// On failure, the previously selected operation is returned. 91 #[inline] try_select(&self, select: Selected) -> Result<(), Selected>92 pub fn try_select(&self, select: Selected) -> Result<(), Selected> { 93 self.inner 94 .select 95 .compare_exchange( 96 Selected::Waiting.into(), 97 select.into(), 98 Ordering::AcqRel, 99 Ordering::Acquire, 100 ) 101 .map(|_| ()) 102 .map_err(|e| e.into()) 103 } 104 105 /// Returns the selected operation. 106 #[inline] selected(&self) -> Selected107 pub fn selected(&self) -> Selected { 108 Selected::from(self.inner.select.load(Ordering::Acquire)) 109 } 110 111 /// Stores a packet. 112 /// 113 /// This method must be called after `try_select` succeeds and there is a packet to provide. 114 #[inline] store_packet(&self, packet: usize)115 pub fn store_packet(&self, packet: usize) { 116 if packet != 0 { 117 self.inner.packet.store(packet, Ordering::Release); 118 } 119 } 120 121 /// Waits until a packet is provided and returns it. 122 #[inline] wait_packet(&self) -> usize123 pub fn wait_packet(&self) -> usize { 124 let backoff = Backoff::new(); 125 loop { 126 let packet = self.inner.packet.load(Ordering::Acquire); 127 if packet != 0 { 128 return packet; 129 } 130 backoff.snooze(); 131 } 132 } 133 134 /// Waits until an operation is selected and returns it. 135 /// 136 /// If the deadline is reached, `Selected::Aborted` will be selected. 137 #[inline] wait_until(&self, deadline: Option<Instant>) -> Selected138 pub fn wait_until(&self, deadline: Option<Instant>) -> Selected { 139 // Spin for a short time, waiting until an operation is selected. 140 let backoff = Backoff::new(); 141 loop { 142 let sel = Selected::from(self.inner.select.load(Ordering::Acquire)); 143 if sel != Selected::Waiting { 144 return sel; 145 } 146 147 if backoff.is_completed() { 148 break; 149 } else { 150 backoff.snooze(); 151 } 152 } 153 154 loop { 155 // Check whether an operation has been selected. 156 let sel = Selected::from(self.inner.select.load(Ordering::Acquire)); 157 if sel != Selected::Waiting { 158 return sel; 159 } 160 161 // If there's a deadline, park the current thread until the deadline is reached. 162 if let Some(end) = deadline { 163 let now = Instant::now(); 164 165 if now < end { 166 thread::park_timeout(end - now); 167 } else { 168 // The deadline has been reached. Try aborting select. 169 return match self.try_select(Selected::Aborted) { 170 Ok(()) => Selected::Aborted, 171 Err(s) => s, 172 }; 173 } 174 } else { 175 thread::park(); 176 } 177 } 178 } 179 180 /// Unparks the thread this context belongs to. 181 #[inline] unpark(&self)182 pub fn unpark(&self) { 183 self.inner.thread.unpark(); 184 } 185 186 /// Returns the id of the thread this context belongs to. 187 #[inline] thread_id(&self) -> ThreadId188 pub fn thread_id(&self) -> ThreadId { 189 self.inner.thread_id 190 } 191 } 192