1 //! Thread safe communication channel implementing `Evented`
2 
3 #![allow(unused_imports, deprecated, missing_debug_implementations)]
4 
5 use {io, Ready, Poll, PollOpt, Registration, SetReadiness, Token};
6 use event::Evented;
7 use lazycell::{LazyCell, AtomicLazyCell};
8 use std::any::Any;
9 use std::fmt;
10 use std::error;
11 use std::sync::{mpsc, Arc};
12 use std::sync::atomic::{AtomicUsize, Ordering};
13 
14 /// Creates a new asynchronous channel, where the `Receiver` can be registered
15 /// with `Poll`.
channel<T>() -> (Sender<T>, Receiver<T>)16 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
17     let (tx_ctl, rx_ctl) = ctl_pair();
18     let (tx, rx) = mpsc::channel();
19 
20     let tx = Sender {
21         tx,
22         ctl: tx_ctl,
23     };
24 
25     let rx = Receiver {
26         rx,
27         ctl: rx_ctl,
28     };
29 
30     (tx, rx)
31 }
32 
33 /// Creates a new synchronous, bounded channel where the `Receiver` can be
34 /// registered with `Poll`.
sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>)35 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
36     let (tx_ctl, rx_ctl) = ctl_pair();
37     let (tx, rx) = mpsc::sync_channel(bound);
38 
39     let tx = SyncSender {
40         tx,
41         ctl: tx_ctl,
42     };
43 
44     let rx = Receiver {
45         rx,
46         ctl: rx_ctl,
47     };
48 
49     (tx, rx)
50 }
51 
ctl_pair() -> (SenderCtl, ReceiverCtl)52 pub fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
53     let inner = Arc::new(Inner {
54         pending: AtomicUsize::new(0),
55         senders: AtomicUsize::new(1),
56         set_readiness: AtomicLazyCell::new(),
57     });
58 
59     let tx = SenderCtl {
60         inner: inner.clone(),
61     };
62 
63     let rx = ReceiverCtl {
64         registration: LazyCell::new(),
65         inner,
66     };
67 
68     (tx, rx)
69 }
70 
71 /// Tracks messages sent on a channel in order to update readiness.
72 pub struct SenderCtl {
73     inner: Arc<Inner>,
74 }
75 
76 /// Tracks messages received on a channel in order to track readiness.
77 pub struct ReceiverCtl {
78     registration: LazyCell<Registration>,
79     inner: Arc<Inner>,
80 }
81 
82 pub struct Sender<T> {
83     tx: mpsc::Sender<T>,
84     ctl: SenderCtl,
85 }
86 
87 pub struct SyncSender<T> {
88     tx: mpsc::SyncSender<T>,
89     ctl: SenderCtl,
90 }
91 
92 pub struct Receiver<T> {
93     rx: mpsc::Receiver<T>,
94     ctl: ReceiverCtl,
95 }
96 
97 pub enum SendError<T> {
98     Io(io::Error),
99     Disconnected(T),
100 }
101 
102 pub enum TrySendError<T> {
103     Io(io::Error),
104     Full(T),
105     Disconnected(T),
106 }
107 
108 struct Inner {
109     // The number of outstanding messages for the receiver to read
110     pending: AtomicUsize,
111     // The number of sender handles
112     senders: AtomicUsize,
113     // The set readiness handle
114     set_readiness: AtomicLazyCell<SetReadiness>,
115 }
116 
117 impl<T> Sender<T> {
send(&self, t: T) -> Result<(), SendError<T>>118     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
119         self.tx.send(t)
120             .map_err(SendError::from)
121             .and_then(|_| {
122                 self.ctl.inc()?;
123                 Ok(())
124             })
125     }
126 }
127 
128 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>129     fn clone(&self) -> Sender<T> {
130         Sender {
131             tx: self.tx.clone(),
132             ctl: self.ctl.clone(),
133         }
134     }
135 }
136 
137 impl<T> SyncSender<T> {
send(&self, t: T) -> Result<(), SendError<T>>138     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
139         self.tx.send(t)
140             .map_err(From::from)
141             .and_then(|_| {
142                 self.ctl.inc()?;
143                 Ok(())
144             })
145     }
146 
try_send(&self, t: T) -> Result<(), TrySendError<T>>147     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
148         self.tx.try_send(t)
149             .map_err(From::from)
150             .and_then(|_| {
151                 self.ctl.inc()?;
152                 Ok(())
153             })
154     }
155 }
156 
157 impl<T> Clone for SyncSender<T> {
clone(&self) -> SyncSender<T>158     fn clone(&self) -> SyncSender<T> {
159         SyncSender {
160             tx: self.tx.clone(),
161             ctl: self.ctl.clone(),
162         }
163     }
164 }
165 
166 impl<T> Receiver<T> {
try_recv(&self) -> Result<T, mpsc::TryRecvError>167     pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
168         self.rx.try_recv().and_then(|res| {
169             let _ = self.ctl.dec();
170             Ok(res)
171         })
172     }
173 }
174 
175 impl<T> Evented for Receiver<T> {
register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>176     fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
177         self.ctl.register(poll, token, interest, opts)
178     }
179 
reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>180     fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
181         self.ctl.reregister(poll, token, interest, opts)
182     }
183 
deregister(&self, poll: &Poll) -> io::Result<()>184     fn deregister(&self, poll: &Poll) -> io::Result<()> {
185         self.ctl.deregister(poll)
186     }
187 }
188 
189 /*
190  *
191  * ===== SenderCtl / ReceiverCtl =====
192  *
193  */
194 
195 impl SenderCtl {
196     /// Call to track that a message has been sent
inc(&self) -> io::Result<()>197     pub fn inc(&self) -> io::Result<()> {
198         let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
199 
200         if 0 == cnt {
201             // Toggle readiness to readable
202             if let Some(set_readiness) = self.inner.set_readiness.borrow() {
203                 set_readiness.set_readiness(Ready::readable())?;
204             }
205         }
206 
207         Ok(())
208     }
209 }
210 
211 impl Clone for SenderCtl {
clone(&self) -> SenderCtl212     fn clone(&self) -> SenderCtl {
213         self.inner.senders.fetch_add(1, Ordering::Relaxed);
214         SenderCtl { inner: self.inner.clone() }
215     }
216 }
217 
218 impl Drop for SenderCtl {
drop(&mut self)219     fn drop(&mut self) {
220         if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
221             let _ = self.inc();
222         }
223     }
224 }
225 
226 impl ReceiverCtl {
dec(&self) -> io::Result<()>227     pub fn dec(&self) -> io::Result<()> {
228         let first = self.inner.pending.load(Ordering::Acquire);
229 
230         if first == 1 {
231             // Unset readiness
232             if let Some(set_readiness) = self.inner.set_readiness.borrow() {
233                 set_readiness.set_readiness(Ready::empty())?;
234             }
235         }
236 
237         // Decrement
238         let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
239 
240         if first == 1 && second > 1 {
241             // There are still pending messages. Since readiness was
242             // previously unset, it must be reset here
243             if let Some(set_readiness) = self.inner.set_readiness.borrow() {
244                 set_readiness.set_readiness(Ready::readable())?;
245             }
246         }
247 
248         Ok(())
249     }
250 }
251 
252 impl Evented for ReceiverCtl {
register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>253     fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
254         if self.registration.borrow().is_some() {
255             return Err(io::Error::new(io::ErrorKind::Other, "receiver already registered"));
256         }
257 
258         let (registration, set_readiness) = Registration::new(poll, token, interest, opts);
259 
260 
261         if self.inner.pending.load(Ordering::Relaxed) > 0 {
262             // TODO: Don't drop readiness
263             let _ = set_readiness.set_readiness(Ready::readable());
264         }
265 
266         self.registration.fill(registration).expect("unexpected state encountered");
267         self.inner.set_readiness.fill(set_readiness).expect("unexpected state encountered");
268 
269         Ok(())
270     }
271 
reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>272     fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
273         match self.registration.borrow() {
274             Some(registration) => registration.update(poll, token, interest, opts),
275             None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
276         }
277     }
278 
deregister(&self, poll: &Poll) -> io::Result<()>279     fn deregister(&self, poll: &Poll) -> io::Result<()> {
280         match self.registration.borrow() {
281             Some(registration) => registration.deregister(poll),
282             None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
283         }
284     }
285 }
286 
287 /*
288  *
289  * ===== Error conversions =====
290  *
291  */
292 
293 impl<T> From<mpsc::SendError<T>> for SendError<T> {
from(src: mpsc::SendError<T>) -> SendError<T>294     fn from(src: mpsc::SendError<T>) -> SendError<T> {
295         SendError::Disconnected(src.0)
296     }
297 }
298 
299 impl<T> From<io::Error> for SendError<T> {
from(src: io::Error) -> SendError<T>300     fn from(src: io::Error) -> SendError<T> {
301         SendError::Io(src)
302     }
303 }
304 
305 impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
from(src: mpsc::TrySendError<T>) -> TrySendError<T>306     fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
307         match src {
308             mpsc::TrySendError::Full(v) => TrySendError::Full(v),
309             mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
310         }
311     }
312 }
313 
314 impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
from(src: mpsc::SendError<T>) -> TrySendError<T>315     fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
316         TrySendError::Disconnected(src.0)
317     }
318 }
319 
320 impl<T> From<io::Error> for TrySendError<T> {
from(src: io::Error) -> TrySendError<T>321     fn from(src: io::Error) -> TrySendError<T> {
322         TrySendError::Io(src)
323     }
324 }
325 
326 /*
327  *
328  * ===== Implement Error, Debug and Display for Errors =====
329  *
330  */
331 
332 impl<T: Any> error::Error for SendError<T> {
description(&self) -> &str333     fn description(&self) -> &str {
334         match *self {
335             SendError::Io(ref io_err) => io_err.description(),
336             SendError::Disconnected(..) => "Disconnected",
337         }
338     }
339 }
340 
341 impl<T: Any> error::Error for TrySendError<T> {
description(&self) -> &str342     fn description(&self) -> &str {
343         match *self {
344             TrySendError::Io(ref io_err) => io_err.description(),
345             TrySendError::Full(..) => "Full",
346             TrySendError::Disconnected(..) => "Disconnected",
347         }
348     }
349 }
350 
351 impl<T> fmt::Debug for SendError<T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result352     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
353         format_send_error(self, f)
354     }
355 }
356 
357 impl<T> fmt::Display for SendError<T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result358     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
359         format_send_error(self, f)
360     }
361 }
362 
363 impl<T> fmt::Debug for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result364     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
365         format_try_send_error(self, f)
366     }
367 }
368 
369 impl<T> fmt::Display for TrySendError<T> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result370     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
371         format_try_send_error(self, f)
372     }
373 }
374 
375 #[inline]
format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result376 fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
377     match *e {
378         SendError::Io(ref io_err) => write!(f, "{}", io_err),
379         SendError::Disconnected(..) => write!(f, "Disconnected"),
380     }
381 }
382 
383 #[inline]
format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result384 fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
385     match *e {
386         TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
387         TrySendError::Full(..) => write!(f, "Full"),
388         TrySendError::Disconnected(..) => write!(f, "Disconnected"),
389     }
390 }
391