1 use std::mem;
2
3 use futures::{Async, Future, Poll, Stream};
4 use futures::future::Shared;
5 use futures::sync::{mpsc, oneshot};
6
7 use super::Never;
8
channel() -> (Signal, Watch)9 pub fn channel() -> (Signal, Watch) {
10 let (tx, rx) = oneshot::channel();
11 let (drained_tx, drained_rx) = mpsc::channel(0);
12 (
13 Signal {
14 drained_rx,
15 tx,
16 },
17 Watch {
18 drained_tx,
19 rx: rx.shared(),
20 },
21 )
22 }
23
24 pub struct Signal {
25 drained_rx: mpsc::Receiver<Never>,
26 tx: oneshot::Sender<()>,
27 }
28
29 pub struct Draining {
30 drained_rx: mpsc::Receiver<Never>,
31 }
32
33 #[derive(Clone)]
34 pub struct Watch {
35 drained_tx: mpsc::Sender<Never>,
36 rx: Shared<oneshot::Receiver<()>>,
37 }
38
39 #[allow(missing_debug_implementations)]
40 pub struct Watching<F, FN> {
41 future: F,
42 state: State<FN>,
43 watch: Watch,
44 }
45
46 enum State<F> {
47 Watch(F),
48 Draining,
49 }
50
51 impl Signal {
drain(self) -> Draining52 pub fn drain(self) -> Draining {
53 let _ = self.tx.send(());
54 Draining {
55 drained_rx: self.drained_rx,
56 }
57 }
58 }
59
60 impl Future for Draining {
61 type Item = ();
62 type Error = ();
63
poll(&mut self) -> Poll<Self::Item, Self::Error>64 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
65 match try_ready!(self.drained_rx.poll()) {
66 Some(never) => match never {},
67 None => Ok(Async::Ready(())),
68 }
69 }
70 }
71
72 impl Watch {
watch<F, FN>(self, future: F, on_drain: FN) -> Watching<F, FN> where F: Future, FN: FnOnce(&mut F),73 pub fn watch<F, FN>(self, future: F, on_drain: FN) -> Watching<F, FN>
74 where
75 F: Future,
76 FN: FnOnce(&mut F),
77 {
78 Watching {
79 future,
80 state: State::Watch(on_drain),
81 watch: self,
82 }
83 }
84 }
85
86 impl<F, FN> Future for Watching<F, FN>
87 where
88 F: Future,
89 FN: FnOnce(&mut F),
90 {
91 type Item = F::Item;
92 type Error = F::Error;
93
poll(&mut self) -> Poll<Self::Item, Self::Error>94 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
95 loop {
96 match mem::replace(&mut self.state, State::Draining) {
97 State::Watch(on_drain) => {
98 match self.watch.rx.poll() {
99 Ok(Async::Ready(_)) | Err(_) => {
100 // Drain has been triggered!
101 on_drain(&mut self.future);
102 },
103 Ok(Async::NotReady) => {
104 self.state = State::Watch(on_drain);
105 return self.future.poll();
106 },
107 }
108 },
109 State::Draining => {
110 return self.future.poll();
111 },
112 }
113 }
114 }
115 }
116
117 #[cfg(test)]
118 mod tests {
119 use futures::{future, Async, Future, Poll};
120 use super::*;
121
122 struct TestMe {
123 draining: bool,
124 finished: bool,
125 poll_cnt: usize,
126 }
127
128 impl Future for TestMe {
129 type Item = ();
130 type Error = ();
131
poll(&mut self) -> Poll<Self::Item, Self::Error>132 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
133 self.poll_cnt += 1;
134 if self.finished {
135 Ok(Async::Ready(()))
136 } else {
137 Ok(Async::NotReady)
138 }
139 }
140 }
141
142 #[test]
watch()143 fn watch() {
144 future::lazy(|| {
145 let (tx, rx) = channel();
146 let fut = TestMe {
147 draining: false,
148 finished: false,
149 poll_cnt: 0,
150 };
151
152 let mut watch = rx.watch(fut, |fut| {
153 fut.draining = true;
154 });
155
156 assert_eq!(watch.future.poll_cnt, 0);
157
158 // First poll should poll the inner future
159 assert!(watch.poll().unwrap().is_not_ready());
160 assert_eq!(watch.future.poll_cnt, 1);
161
162 // Second poll should poll the inner future again
163 assert!(watch.poll().unwrap().is_not_ready());
164 assert_eq!(watch.future.poll_cnt, 2);
165
166 let mut draining = tx.drain();
167 // Drain signaled, but needs another poll to be noticed.
168 assert!(!watch.future.draining);
169 assert_eq!(watch.future.poll_cnt, 2);
170
171 // Now, poll after drain has been signaled.
172 assert!(watch.poll().unwrap().is_not_ready());
173 assert_eq!(watch.future.poll_cnt, 3);
174 assert!(watch.future.draining);
175
176 // Draining is not ready until watcher completes
177 assert!(draining.poll().unwrap().is_not_ready());
178
179 // Finishing up the watch future
180 watch.future.finished = true;
181 assert!(watch.poll().unwrap().is_ready());
182 assert_eq!(watch.future.poll_cnt, 4);
183 drop(watch);
184
185 assert!(draining.poll().unwrap().is_ready());
186
187 Ok::<_, ()>(())
188 }).wait().unwrap();
189 }
190
191 #[test]
watch_clones()192 fn watch_clones() {
193 future::lazy(|| {
194 let (tx, rx) = channel();
195
196 let fut1 = TestMe {
197 draining: false,
198 finished: false,
199 poll_cnt: 0,
200 };
201 let fut2 = TestMe {
202 draining: false,
203 finished: false,
204 poll_cnt: 0,
205 };
206
207 let watch1 = rx.clone().watch(fut1, |fut| {
208 fut.draining = true;
209 });
210 let watch2 = rx.watch(fut2, |fut| {
211 fut.draining = true;
212 });
213
214 let mut draining = tx.drain();
215
216 // Still 2 outstanding watchers
217 assert!(draining.poll().unwrap().is_not_ready());
218
219 // drop 1 for whatever reason
220 drop(watch1);
221
222 // Still not ready, 1 other watcher still pending
223 assert!(draining.poll().unwrap().is_not_ready());
224
225 drop(watch2);
226
227 // Now all watchers are gone, draining is complete
228 assert!(draining.poll().unwrap().is_ready());
229
230 Ok::<_, ()>(())
231 }).wait().unwrap();
232 }
233 }
234
235