1 // FIXME: re-implement tests with `async/await`
2 /*
3 #[cfg(feature = "runtime")]
4 use std::collections::HashMap;
5 use std::cmp;
6 use std::io::{self, Read, Write};
7 #[cfg(feature = "runtime")]
8 use std::sync::{Arc, Mutex};
9 
10 use bytes::Buf;
11 use futures::{Async, Poll};
12 #[cfg(feature = "runtime")]
13 use futures::Future;
14 use futures::task::{self, Task};
15 use tokio_io::{AsyncRead, AsyncWrite};
16 
17 #[cfg(feature = "runtime")]
18 use crate::client::connect::{Connect, Connected, Destination};
19 
20 
21 
22 #[cfg(feature = "runtime")]
23 pub struct Duplex {
24     inner: Arc<Mutex<DuplexInner>>,
25 }
26 
27 #[cfg(feature = "runtime")]
28 struct DuplexInner {
29     handle_read_task: Option<Task>,
30     read: AsyncIo<MockCursor>,
31     write: AsyncIo<MockCursor>,
32 }
33 
34 #[cfg(feature = "runtime")]
35 impl Duplex {
36     pub(crate) fn channel() -> (Duplex, DuplexHandle) {
37         let mut inner = DuplexInner {
38             handle_read_task: None,
39             read: AsyncIo::new_buf(Vec::new(), 0),
40             write: AsyncIo::new_buf(Vec::new(), std::usize::MAX),
41         };
42 
43         inner.read.park_tasks(true);
44         inner.write.park_tasks(true);
45 
46         let inner = Arc::new(Mutex::new(inner));
47 
48         let duplex = Duplex {
49             inner: inner.clone(),
50         };
51         let handle = DuplexHandle {
52             inner: inner,
53         };
54 
55         (duplex, handle)
56     }
57 }
58 
59 #[cfg(feature = "runtime")]
60 impl Read for Duplex {
61     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
62         self.inner.lock().unwrap().read.read(buf)
63     }
64 }
65 
66 #[cfg(feature = "runtime")]
67 impl Write for Duplex {
68     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
69         let mut inner = self.inner.lock().unwrap();
70         let ret = inner.write.write(buf);
71         if let Some(task) = inner.handle_read_task.take() {
72             trace!("waking DuplexHandle read");
73             task.notify();
74         }
75         ret
76     }
77 
78     fn flush(&mut self) -> io::Result<()> {
79         self.inner.lock().unwrap().write.flush()
80     }
81 }
82 
83 #[cfg(feature = "runtime")]
84 impl AsyncRead for Duplex {
85 }
86 
87 #[cfg(feature = "runtime")]
88 impl AsyncWrite for Duplex {
89     fn shutdown(&mut self) -> Poll<(), io::Error> {
90         Ok(().into())
91     }
92 
93     fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
94         let mut inner = self.inner.lock().unwrap();
95         if let Some(task) = inner.handle_read_task.take() {
96             task.notify();
97         }
98         inner.write.write_buf(buf)
99     }
100 }
101 
102 #[cfg(feature = "runtime")]
103 pub struct DuplexHandle {
104     inner: Arc<Mutex<DuplexInner>>,
105 }
106 
107 #[cfg(feature = "runtime")]
108 impl DuplexHandle {
109     pub fn read(&self, buf: &mut [u8]) -> Poll<usize, io::Error> {
110         let mut inner = self.inner.lock().unwrap();
111         assert!(buf.len() >= inner.write.inner.len());
112         if inner.write.inner.is_empty() {
113             trace!("DuplexHandle read parking");
114             inner.handle_read_task = Some(task::current());
115             return Ok(Async::NotReady);
116         }
117         inner.write.read(buf).map(Async::Ready)
118     }
119 
120     pub fn write(&self, bytes: &[u8]) -> Poll<usize, io::Error> {
121         let mut inner = self.inner.lock().unwrap();
122         assert_eq!(inner.read.inner.pos, 0);
123         assert_eq!(inner.read.inner.vec.len(), 0, "write but read isn't empty");
124         inner
125             .read
126             .inner
127             .vec
128             .extend(bytes);
129         inner.read.block_in(bytes.len());
130         Ok(Async::Ready(bytes.len()))
131     }
132 }
133 
134 #[cfg(feature = "runtime")]
135 impl Drop for DuplexHandle {
136     fn drop(&mut self) {
137         trace!("mock duplex handle drop");
138         if !::std::thread::panicking() {
139             let mut inner = self.inner.lock().unwrap();
140             inner.read.close();
141             inner.write.close();
142         }
143     }
144 }
145 
146 #[cfg(feature = "runtime")]
147 type BoxedConnectFut = Box<dyn Future<Item=(Duplex, Connected), Error=io::Error> + Send>;
148 
149 #[cfg(feature = "runtime")]
150 #[derive(Clone)]
151 pub struct MockConnector {
152     mocks: Arc<Mutex<MockedConnections>>,
153 }
154 
155 #[cfg(feature = "runtime")]
156 struct MockedConnections(HashMap<String, Vec<BoxedConnectFut>>);
157 
158 #[cfg(feature = "runtime")]
159 impl MockConnector {
160     pub fn new() -> MockConnector {
161         MockConnector {
162             mocks: Arc::new(Mutex::new(MockedConnections(HashMap::new()))),
163         }
164     }
165 
166     pub fn mock(&mut self, key: &str) -> DuplexHandle {
167         use futures::future;
168         self.mock_fut(key, future::ok::<_, ()>(()))
169     }
170 
171     pub fn mock_fut<F>(&mut self, key: &str, fut: F) -> DuplexHandle
172     where
173         F: Future + Send + 'static,
174     {
175         self.mock_opts(key, Connected::new(), fut)
176     }
177 
178     pub fn mock_opts<F>(&mut self, key: &str, connected: Connected, fut: F) -> DuplexHandle
179     where
180         F: Future + Send + 'static,
181     {
182         let key = key.to_owned();
183 
184         let (duplex, handle) = Duplex::channel();
185 
186         let fut = Box::new(fut.then(move |_| {
187             trace!("MockConnector mocked fut ready");
188             Ok((duplex, connected))
189         }));
190         self.mocks.lock().unwrap().0.entry(key)
191             .or_insert(Vec::new())
192             .push(fut);
193 
194         handle
195     }
196 }
197 
198 #[cfg(feature = "runtime")]
199 impl Connect for MockConnector {
200     type Transport = Duplex;
201     type Error = io::Error;
202     type Future = BoxedConnectFut;
203 
204     fn connect(&self, dst: Destination) -> Self::Future {
205         trace!("mock connect: {:?}", dst);
206         let key = format!("{}://{}{}", dst.scheme(), dst.host(), if let Some(port) = dst.port() {
207             format!(":{}", port)
208         } else {
209             "".to_owned()
210         });
211         let mut mocks = self.mocks.lock().unwrap();
212         let mocks = mocks.0.get_mut(&key)
213             .expect(&format!("unknown mocks uri: {}", key));
214         assert!(!mocks.is_empty(), "no additional mocks for {}", key);
215         mocks.remove(0)
216     }
217 }
218 
219 
220 #[cfg(feature = "runtime")]
221 impl Drop for MockedConnections {
222     fn drop(&mut self) {
223         if !::std::thread::panicking() {
224             for (key, mocks) in self.0.iter() {
225                 assert_eq!(
226                     mocks.len(),
227                     0,
228                     "not all mocked connects for {:?} were used",
229                     key,
230                 );
231             }
232         }
233     }
234 }
235 */
236