1 use std::mem;
2 
3 use futures::{Async, Future, Poll, Stream};
4 use futures::future::Shared;
5 use futures::sync::{mpsc, oneshot};
6 
7 use super::Never;
8 
channel() -> (Signal, Watch)9 pub fn channel() -> (Signal, Watch) {
10     let (tx, rx) = oneshot::channel();
11     let (drained_tx, drained_rx) = mpsc::channel(0);
12     (
13         Signal {
14             drained_rx,
15             tx,
16         },
17         Watch {
18             drained_tx,
19             rx: rx.shared(),
20         },
21     )
22 }
23 
24 pub struct Signal {
25     drained_rx: mpsc::Receiver<Never>,
26     tx: oneshot::Sender<()>,
27 }
28 
29 pub struct Draining {
30     drained_rx: mpsc::Receiver<Never>,
31 }
32 
33 #[derive(Clone)]
34 pub struct Watch {
35     drained_tx: mpsc::Sender<Never>,
36     rx: Shared<oneshot::Receiver<()>>,
37 }
38 
39 #[allow(missing_debug_implementations)]
40 pub struct Watching<F, FN> {
41     future: F,
42     state: State<FN>,
43     watch: Watch,
44 }
45 
46 enum State<F> {
47     Watch(F),
48     Draining,
49 }
50 
51 impl Signal {
drain(self) -> Draining52     pub fn drain(self) -> Draining {
53         let _ = self.tx.send(());
54         Draining {
55             drained_rx: self.drained_rx,
56         }
57     }
58 }
59 
60 impl Future for Draining {
61     type Item = ();
62     type Error = ();
63 
poll(&mut self) -> Poll<Self::Item, Self::Error>64     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
65         match try_ready!(self.drained_rx.poll()) {
66             Some(never) => match never {},
67             None => Ok(Async::Ready(())),
68         }
69     }
70 }
71 
72 impl Watch {
watch<F, FN>(self, future: F, on_drain: FN) -> Watching<F, FN> where F: Future, FN: FnOnce(&mut F),73     pub fn watch<F, FN>(self, future: F, on_drain: FN) -> Watching<F, FN>
74     where
75         F: Future,
76         FN: FnOnce(&mut F),
77     {
78         Watching {
79             future,
80             state: State::Watch(on_drain),
81             watch: self,
82         }
83     }
84 }
85 
86 impl<F, FN> Future for Watching<F, FN>
87 where
88     F: Future,
89     FN: FnOnce(&mut F),
90 {
91     type Item = F::Item;
92     type Error = F::Error;
93 
poll(&mut self) -> Poll<Self::Item, Self::Error>94     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
95         loop {
96             match mem::replace(&mut self.state, State::Draining) {
97                 State::Watch(on_drain) => {
98                     match self.watch.rx.poll() {
99                         Ok(Async::Ready(_)) | Err(_) => {
100                             // Drain has been triggered!
101                             on_drain(&mut self.future);
102                         },
103                         Ok(Async::NotReady) => {
104                             self.state = State::Watch(on_drain);
105                             return self.future.poll();
106                         },
107                     }
108                 },
109                 State::Draining => {
110                     return self.future.poll();
111                 },
112             }
113         }
114     }
115 }
116 
117 #[cfg(test)]
118 mod tests {
119     use futures::{future, Async, Future, Poll};
120     use super::*;
121 
122     struct TestMe {
123         draining: bool,
124         finished: bool,
125         poll_cnt: usize,
126     }
127 
128     impl Future for TestMe {
129         type Item = ();
130         type Error = ();
131 
poll(&mut self) -> Poll<Self::Item, Self::Error>132         fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
133             self.poll_cnt += 1;
134             if self.finished {
135                 Ok(Async::Ready(()))
136             } else {
137                 Ok(Async::NotReady)
138             }
139         }
140     }
141 
142     #[test]
watch()143     fn watch() {
144         future::lazy(|| {
145             let (tx, rx) = channel();
146             let fut = TestMe {
147                 draining: false,
148                 finished: false,
149                 poll_cnt: 0,
150             };
151 
152             let mut watch = rx.watch(fut, |fut| {
153                 fut.draining = true;
154             });
155 
156             assert_eq!(watch.future.poll_cnt, 0);
157 
158             // First poll should poll the inner future
159             assert!(watch.poll().unwrap().is_not_ready());
160             assert_eq!(watch.future.poll_cnt, 1);
161 
162             // Second poll should poll the inner future again
163             assert!(watch.poll().unwrap().is_not_ready());
164             assert_eq!(watch.future.poll_cnt, 2);
165 
166             let mut draining = tx.drain();
167             // Drain signaled, but needs another poll to be noticed.
168             assert!(!watch.future.draining);
169             assert_eq!(watch.future.poll_cnt, 2);
170 
171             // Now, poll after drain has been signaled.
172             assert!(watch.poll().unwrap().is_not_ready());
173             assert_eq!(watch.future.poll_cnt, 3);
174             assert!(watch.future.draining);
175 
176             // Draining is not ready until watcher completes
177             assert!(draining.poll().unwrap().is_not_ready());
178 
179             // Finishing up the watch future
180             watch.future.finished = true;
181             assert!(watch.poll().unwrap().is_ready());
182             assert_eq!(watch.future.poll_cnt, 4);
183             drop(watch);
184 
185             assert!(draining.poll().unwrap().is_ready());
186 
187             Ok::<_, ()>(())
188         }).wait().unwrap();
189     }
190 
191     #[test]
watch_clones()192     fn watch_clones() {
193         future::lazy(|| {
194             let (tx, rx) = channel();
195 
196             let fut1 = TestMe {
197                 draining: false,
198                 finished: false,
199                 poll_cnt: 0,
200             };
201             let fut2 = TestMe {
202                 draining: false,
203                 finished: false,
204                 poll_cnt: 0,
205             };
206 
207             let watch1 = rx.clone().watch(fut1, |fut| {
208                 fut.draining = true;
209             });
210             let watch2 = rx.watch(fut2, |fut| {
211                 fut.draining = true;
212             });
213 
214             let mut draining = tx.drain();
215 
216             // Still 2 outstanding watchers
217             assert!(draining.poll().unwrap().is_not_ready());
218 
219             // drop 1 for whatever reason
220             drop(watch1);
221 
222             // Still not ready, 1 other watcher still pending
223             assert!(draining.poll().unwrap().is_not_ready());
224 
225             drop(watch2);
226 
227             // Now all watchers are gone, draining is complete
228             assert!(draining.poll().unwrap().is_ready());
229 
230             Ok::<_, ()>(())
231         }).wait().unwrap();
232     }
233 }
234 
235