1 //! Thread safe communication channel implementing `Evented`
2
3 #![allow(unused_imports, deprecated, missing_debug_implementations)]
4
5 use {io, Ready, Poll, PollOpt, Registration, SetReadiness, Token};
6 use event::Evented;
7 use lazycell::{LazyCell, AtomicLazyCell};
8 use std::any::Any;
9 use std::fmt;
10 use std::error;
11 use std::sync::{mpsc, Arc};
12 use std::sync::atomic::{AtomicUsize, Ordering};
13
14 /// Creates a new asynchronous channel, where the `Receiver` can be registered
15 /// with `Poll`.
channel<T>() -> (Sender<T>, Receiver<T>)16 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
17 let (tx_ctl, rx_ctl) = ctl_pair();
18 let (tx, rx) = mpsc::channel();
19
20 let tx = Sender {
21 tx,
22 ctl: tx_ctl,
23 };
24
25 let rx = Receiver {
26 rx,
27 ctl: rx_ctl,
28 };
29
30 (tx, rx)
31 }
32
33 /// Creates a new synchronous, bounded channel where the `Receiver` can be
34 /// registered with `Poll`.
sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>)35 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
36 let (tx_ctl, rx_ctl) = ctl_pair();
37 let (tx, rx) = mpsc::sync_channel(bound);
38
39 let tx = SyncSender {
40 tx,
41 ctl: tx_ctl,
42 };
43
44 let rx = Receiver {
45 rx,
46 ctl: rx_ctl,
47 };
48
49 (tx, rx)
50 }
51
ctl_pair() -> (SenderCtl, ReceiverCtl)52 pub fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
53 let inner = Arc::new(Inner {
54 pending: AtomicUsize::new(0),
55 senders: AtomicUsize::new(1),
56 set_readiness: AtomicLazyCell::new(),
57 });
58
59 let tx = SenderCtl {
60 inner: inner.clone(),
61 };
62
63 let rx = ReceiverCtl {
64 registration: LazyCell::new(),
65 inner,
66 };
67
68 (tx, rx)
69 }
70
71 /// Tracks messages sent on a channel in order to update readiness.
72 pub struct SenderCtl {
73 inner: Arc<Inner>,
74 }
75
76 /// Tracks messages received on a channel in order to track readiness.
77 pub struct ReceiverCtl {
78 registration: LazyCell<Registration>,
79 inner: Arc<Inner>,
80 }
81
82 pub struct Sender<T> {
83 tx: mpsc::Sender<T>,
84 ctl: SenderCtl,
85 }
86
87 pub struct SyncSender<T> {
88 tx: mpsc::SyncSender<T>,
89 ctl: SenderCtl,
90 }
91
92 pub struct Receiver<T> {
93 rx: mpsc::Receiver<T>,
94 ctl: ReceiverCtl,
95 }
96
97 pub enum SendError<T> {
98 Io(io::Error),
99 Disconnected(T),
100 }
101
102 pub enum TrySendError<T> {
103 Io(io::Error),
104 Full(T),
105 Disconnected(T),
106 }
107
108 struct Inner {
109 // The number of outstanding messages for the receiver to read
110 pending: AtomicUsize,
111 // The number of sender handles
112 senders: AtomicUsize,
113 // The set readiness handle
114 set_readiness: AtomicLazyCell<SetReadiness>,
115 }
116
117 impl<T> Sender<T> {
send(&self, t: T) -> Result<(), SendError<T>>118 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
119 self.tx.send(t)
120 .map_err(SendError::from)
121 .and_then(|_| {
122 self.ctl.inc()?;
123 Ok(())
124 })
125 }
126 }
127
128 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>129 fn clone(&self) -> Sender<T> {
130 Sender {
131 tx: self.tx.clone(),
132 ctl: self.ctl.clone(),
133 }
134 }
135 }
136
137 impl<T> SyncSender<T> {
send(&self, t: T) -> Result<(), SendError<T>>138 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
139 self.tx.send(t)
140 .map_err(From::from)
141 .and_then(|_| {
142 self.ctl.inc()?;
143 Ok(())
144 })
145 }
146
try_send(&self, t: T) -> Result<(), TrySendError<T>>147 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
148 self.tx.try_send(t)
149 .map_err(From::from)
150 .and_then(|_| {
151 self.ctl.inc()?;
152 Ok(())
153 })
154 }
155 }
156
157 impl<T> Clone for SyncSender<T> {
clone(&self) -> SyncSender<T>158 fn clone(&self) -> SyncSender<T> {
159 SyncSender {
160 tx: self.tx.clone(),
161 ctl: self.ctl.clone(),
162 }
163 }
164 }
165
166 impl<T> Receiver<T> {
try_recv(&self) -> Result<T, mpsc::TryRecvError>167 pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
168 self.rx.try_recv().and_then(|res| {
169 let _ = self.ctl.dec();
170 Ok(res)
171 })
172 }
173 }
174
175 impl<T> Evented for Receiver<T> {
register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>176 fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
177 self.ctl.register(poll, token, interest, opts)
178 }
179
reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>180 fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
181 self.ctl.reregister(poll, token, interest, opts)
182 }
183
deregister(&self, poll: &Poll) -> io::Result<()>184 fn deregister(&self, poll: &Poll) -> io::Result<()> {
185 self.ctl.deregister(poll)
186 }
187 }
188
189 /*
190 *
191 * ===== SenderCtl / ReceiverCtl =====
192 *
193 */
194
195 impl SenderCtl {
196 /// Call to track that a message has been sent
inc(&self) -> io::Result<()>197 pub fn inc(&self) -> io::Result<()> {
198 let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
199
200 if 0 == cnt {
201 // Toggle readiness to readable
202 if let Some(set_readiness) = self.inner.set_readiness.borrow() {
203 set_readiness.set_readiness(Ready::readable())?;
204 }
205 }
206
207 Ok(())
208 }
209 }
210
211 impl Clone for SenderCtl {
clone(&self) -> SenderCtl212 fn clone(&self) -> SenderCtl {
213 self.inner.senders.fetch_add(1, Ordering::Relaxed);
214 SenderCtl { inner: self.inner.clone() }
215 }
216 }
217
218 impl Drop for SenderCtl {
drop(&mut self)219 fn drop(&mut self) {
220 if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
221 let _ = self.inc();
222 }
223 }
224 }
225
226 impl ReceiverCtl {
dec(&self) -> io::Result<()>227 pub fn dec(&self) -> io::Result<()> {
228 let first = self.inner.pending.load(Ordering::Acquire);
229
230 if first == 1 {
231 // Unset readiness
232 if let Some(set_readiness) = self.inner.set_readiness.borrow() {
233 set_readiness.set_readiness(Ready::empty())?;
234 }
235 }
236
237 // Decrement
238 let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
239
240 if first == 1 && second > 1 {
241 // There are still pending messages. Since readiness was
242 // previously unset, it must be reset here
243 if let Some(set_readiness) = self.inner.set_readiness.borrow() {
244 set_readiness.set_readiness(Ready::readable())?;
245 }
246 }
247
248 Ok(())
249 }
250 }
251
252 impl Evented for ReceiverCtl {
register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>253 fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
254 if self.registration.borrow().is_some() {
255 return Err(io::Error::new(io::ErrorKind::Other, "receiver already registered"));
256 }
257
258 let (registration, set_readiness) = Registration::new(poll, token, interest, opts);
259
260
261 if self.inner.pending.load(Ordering::Relaxed) > 0 {
262 // TODO: Don't drop readiness
263 let _ = set_readiness.set_readiness(Ready::readable());
264 }
265
266 self.registration.fill(registration).expect("unexpected state encountered");
267 self.inner.set_readiness.fill(set_readiness).expect("unexpected state encountered");
268
269 Ok(())
270 }
271
reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>272 fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
273 match self.registration.borrow() {
274 Some(registration) => registration.update(poll, token, interest, opts),
275 None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
276 }
277 }
278
deregister(&self, poll: &Poll) -> io::Result<()>279 fn deregister(&self, poll: &Poll) -> io::Result<()> {
280 match self.registration.borrow() {
281 Some(registration) => registration.deregister(poll),
282 None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
283 }
284 }
285 }
286
287 /*
288 *
289 * ===== Error conversions =====
290 *
291 */
292
293 impl<T> From<mpsc::SendError<T>> for SendError<T> {
from(src: mpsc::SendError<T>) -> SendError<T>294 fn from(src: mpsc::SendError<T>) -> SendError<T> {
295 SendError::Disconnected(src.0)
296 }
297 }
298
299 impl<T> From<io::Error> for SendError<T> {
from(src: io::Error) -> SendError<T>300 fn from(src: io::Error) -> SendError<T> {
301 SendError::Io(src)
302 }
303 }
304
305 impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
from(src: mpsc::TrySendError<T>) -> TrySendError<T>306 fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
307 match src {
308 mpsc::TrySendError::Full(v) => TrySendError::Full(v),
309 mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
310 }
311 }
312 }
313
314 impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
from(src: mpsc::SendError<T>) -> TrySendError<T>315 fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
316 TrySendError::Disconnected(src.0)
317 }
318 }
319
320 impl<T> From<io::Error> for TrySendError<T> {
from(src: io::Error) -> TrySendError<T>321 fn from(src: io::Error) -> TrySendError<T> {
322 TrySendError::Io(src)
323 }
324 }
325
326 /*
327 *
328 * ===== Implement Error, Debug and Display for Errors =====
329 *
330 */
331
332 impl<T: Any> error::Error for SendError<T> {
description(&self) -> &str333 fn description(&self) -> &str {
334 match *self {
335 SendError::Io(ref io_err) => io_err.description(),
336 SendError::Disconnected(..) => "Disconnected",
337 }
338 }
339 }
340
341 impl<T: Any> error::Error for TrySendError<T> {
description(&self) -> &str342 fn description(&self) -> &str {
343 match *self {
344 TrySendError::Io(ref io_err) => io_err.description(),
345 TrySendError::Full(..) => "Full",
346 TrySendError::Disconnected(..) => "Disconnected",
347 }
348 }
349 }
350
351 impl<T> fmt::Debug for SendError<T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result352 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
353 format_send_error(self, f)
354 }
355 }
356
357 impl<T> fmt::Display for SendError<T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result358 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
359 format_send_error(self, f)
360 }
361 }
362
363 impl<T> fmt::Debug for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result364 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
365 format_try_send_error(self, f)
366 }
367 }
368
369 impl<T> fmt::Display for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result370 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
371 format_try_send_error(self, f)
372 }
373 }
374
375 #[inline]
format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result376 fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
377 match *e {
378 SendError::Io(ref io_err) => write!(f, "{}", io_err),
379 SendError::Disconnected(..) => write!(f, "Disconnected"),
380 }
381 }
382
383 #[inline]
format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result384 fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
385 match *e {
386 TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
387 TrySendError::Full(..) => write!(f, "Full"),
388 TrySendError::Disconnected(..) => write!(f, "Disconnected"),
389 }
390 }
391