1 //! Thread-local context used in select.
2 
3 use std::cell::Cell;
4 use std::sync::atomic::{AtomicUsize, Ordering};
5 use std::sync::Arc;
6 use std::thread::{self, Thread, ThreadId};
7 use std::time::Instant;
8 
9 use crossbeam_utils::Backoff;
10 
11 use crate::select::Selected;
12 
13 /// Thread-local context used in select.
14 #[derive(Debug, Clone)]
15 pub struct Context {
16     inner: Arc<Inner>,
17 }
18 
19 /// Inner representation of `Context`.
20 #[derive(Debug)]
21 struct Inner {
22     /// Selected operation.
23     select: AtomicUsize,
24 
25     /// A slot into which another thread may store a pointer to its `Packet`.
26     packet: AtomicUsize,
27 
28     /// Thread handle.
29     thread: Thread,
30 
31     /// Thread id.
32     thread_id: ThreadId,
33 }
34 
35 impl Context {
36     /// Creates a new context for the duration of the closure.
37     #[inline]
with<F, R>(f: F) -> R where F: FnOnce(&Context) -> R,38     pub fn with<F, R>(f: F) -> R
39     where
40         F: FnOnce(&Context) -> R,
41     {
42         thread_local! {
43             /// Cached thread-local context.
44             static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
45         }
46 
47         let mut f = Some(f);
48         let mut f = move |cx: &Context| -> R {
49             let f = f.take().unwrap();
50             f(cx)
51         };
52 
53         CONTEXT
54             .try_with(|cell| match cell.take() {
55                 None => f(&Context::new()),
56                 Some(cx) => {
57                     cx.reset();
58                     let res = f(&cx);
59                     cell.set(Some(cx));
60                     res
61                 }
62             })
63             .unwrap_or_else(|_| f(&Context::new()))
64     }
65 
66     /// Creates a new `Context`.
67     #[cold]
new() -> Context68     fn new() -> Context {
69         Context {
70             inner: Arc::new(Inner {
71                 select: AtomicUsize::new(Selected::Waiting.into()),
72                 packet: AtomicUsize::new(0),
73                 thread: thread::current(),
74                 thread_id: thread::current().id(),
75             }),
76         }
77     }
78 
79     /// Resets `select` and `packet`.
80     #[inline]
reset(&self)81     fn reset(&self) {
82         self.inner
83             .select
84             .store(Selected::Waiting.into(), Ordering::Release);
85         self.inner.packet.store(0, Ordering::Release);
86     }
87 
88     /// Attempts to select an operation.
89     ///
90     /// On failure, the previously selected operation is returned.
91     #[inline]
try_select(&self, select: Selected) -> Result<(), Selected>92     pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
93         self.inner
94             .select
95             .compare_exchange(
96                 Selected::Waiting.into(),
97                 select.into(),
98                 Ordering::AcqRel,
99                 Ordering::Acquire,
100             )
101             .map(|_| ())
102             .map_err(|e| e.into())
103     }
104 
105     /// Returns the selected operation.
106     #[inline]
selected(&self) -> Selected107     pub fn selected(&self) -> Selected {
108         Selected::from(self.inner.select.load(Ordering::Acquire))
109     }
110 
111     /// Stores a packet.
112     ///
113     /// This method must be called after `try_select` succeeds and there is a packet to provide.
114     #[inline]
store_packet(&self, packet: usize)115     pub fn store_packet(&self, packet: usize) {
116         if packet != 0 {
117             self.inner.packet.store(packet, Ordering::Release);
118         }
119     }
120 
121     /// Waits until a packet is provided and returns it.
122     #[inline]
wait_packet(&self) -> usize123     pub fn wait_packet(&self) -> usize {
124         let backoff = Backoff::new();
125         loop {
126             let packet = self.inner.packet.load(Ordering::Acquire);
127             if packet != 0 {
128                 return packet;
129             }
130             backoff.snooze();
131         }
132     }
133 
134     /// Waits until an operation is selected and returns it.
135     ///
136     /// If the deadline is reached, `Selected::Aborted` will be selected.
137     #[inline]
wait_until(&self, deadline: Option<Instant>) -> Selected138     pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
139         // Spin for a short time, waiting until an operation is selected.
140         let backoff = Backoff::new();
141         loop {
142             let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
143             if sel != Selected::Waiting {
144                 return sel;
145             }
146 
147             if backoff.is_completed() {
148                 break;
149             } else {
150                 backoff.snooze();
151             }
152         }
153 
154         loop {
155             // Check whether an operation has been selected.
156             let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
157             if sel != Selected::Waiting {
158                 return sel;
159             }
160 
161             // If there's a deadline, park the current thread until the deadline is reached.
162             if let Some(end) = deadline {
163                 let now = Instant::now();
164 
165                 if now < end {
166                     thread::park_timeout(end - now);
167                 } else {
168                     // The deadline has been reached. Try aborting select.
169                     return match self.try_select(Selected::Aborted) {
170                         Ok(()) => Selected::Aborted,
171                         Err(s) => s,
172                     };
173                 }
174             } else {
175                 thread::park();
176             }
177         }
178     }
179 
180     /// Unparks the thread this context belongs to.
181     #[inline]
unpark(&self)182     pub fn unpark(&self) {
183         self.inner.thread.unpark();
184     }
185 
186     /// Returns the id of the thread this context belongs to.
187     #[inline]
thread_id(&self) -> ThreadId188     pub fn thread_id(&self) -> ThreadId {
189         self.inner.thread_id
190     }
191 }
192