1 // FIXME: re-implement tests with `async/await` 2 /* 3 #[cfg(feature = "runtime")] 4 use std::collections::HashMap; 5 use std::cmp; 6 use std::io::{self, Read, Write}; 7 #[cfg(feature = "runtime")] 8 use std::sync::{Arc, Mutex}; 9 10 use bytes::Buf; 11 use futures::{Async, Poll}; 12 #[cfg(feature = "runtime")] 13 use futures::Future; 14 use futures::task::{self, Task}; 15 use tokio_io::{AsyncRead, AsyncWrite}; 16 17 #[cfg(feature = "runtime")] 18 use crate::client::connect::{Connect, Connected, Destination}; 19 20 21 22 #[cfg(feature = "runtime")] 23 pub struct Duplex { 24 inner: Arc<Mutex<DuplexInner>>, 25 } 26 27 #[cfg(feature = "runtime")] 28 struct DuplexInner { 29 handle_read_task: Option<Task>, 30 read: AsyncIo<MockCursor>, 31 write: AsyncIo<MockCursor>, 32 } 33 34 #[cfg(feature = "runtime")] 35 impl Duplex { 36 pub(crate) fn channel() -> (Duplex, DuplexHandle) { 37 let mut inner = DuplexInner { 38 handle_read_task: None, 39 read: AsyncIo::new_buf(Vec::new(), 0), 40 write: AsyncIo::new_buf(Vec::new(), std::usize::MAX), 41 }; 42 43 inner.read.park_tasks(true); 44 inner.write.park_tasks(true); 45 46 let inner = Arc::new(Mutex::new(inner)); 47 48 let duplex = Duplex { 49 inner: inner.clone(), 50 }; 51 let handle = DuplexHandle { 52 inner: inner, 53 }; 54 55 (duplex, handle) 56 } 57 } 58 59 #[cfg(feature = "runtime")] 60 impl Read for Duplex { 61 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 62 self.inner.lock().unwrap().read.read(buf) 63 } 64 } 65 66 #[cfg(feature = "runtime")] 67 impl Write for Duplex { 68 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 69 let mut inner = self.inner.lock().unwrap(); 70 let ret = inner.write.write(buf); 71 if let Some(task) = inner.handle_read_task.take() { 72 trace!("waking DuplexHandle read"); 73 task.notify(); 74 } 75 ret 76 } 77 78 fn flush(&mut self) -> io::Result<()> { 79 self.inner.lock().unwrap().write.flush() 80 } 81 } 82 83 #[cfg(feature = "runtime")] 84 impl AsyncRead for Duplex { 85 } 86 87 #[cfg(feature = "runtime")] 88 impl AsyncWrite for Duplex { 89 fn shutdown(&mut self) -> Poll<(), io::Error> { 90 Ok(().into()) 91 } 92 93 fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { 94 let mut inner = self.inner.lock().unwrap(); 95 if let Some(task) = inner.handle_read_task.take() { 96 task.notify(); 97 } 98 inner.write.write_buf(buf) 99 } 100 } 101 102 #[cfg(feature = "runtime")] 103 pub struct DuplexHandle { 104 inner: Arc<Mutex<DuplexInner>>, 105 } 106 107 #[cfg(feature = "runtime")] 108 impl DuplexHandle { 109 pub fn read(&self, buf: &mut [u8]) -> Poll<usize, io::Error> { 110 let mut inner = self.inner.lock().unwrap(); 111 assert!(buf.len() >= inner.write.inner.len()); 112 if inner.write.inner.is_empty() { 113 trace!("DuplexHandle read parking"); 114 inner.handle_read_task = Some(task::current()); 115 return Ok(Async::NotReady); 116 } 117 inner.write.read(buf).map(Async::Ready) 118 } 119 120 pub fn write(&self, bytes: &[u8]) -> Poll<usize, io::Error> { 121 let mut inner = self.inner.lock().unwrap(); 122 assert_eq!(inner.read.inner.pos, 0); 123 assert_eq!(inner.read.inner.vec.len(), 0, "write but read isn't empty"); 124 inner 125 .read 126 .inner 127 .vec 128 .extend(bytes); 129 inner.read.block_in(bytes.len()); 130 Ok(Async::Ready(bytes.len())) 131 } 132 } 133 134 #[cfg(feature = "runtime")] 135 impl Drop for DuplexHandle { 136 fn drop(&mut self) { 137 trace!("mock duplex handle drop"); 138 if !::std::thread::panicking() { 139 let mut inner = self.inner.lock().unwrap(); 140 inner.read.close(); 141 inner.write.close(); 142 } 143 } 144 } 145 146 #[cfg(feature = "runtime")] 147 type BoxedConnectFut = Box<dyn Future<Item=(Duplex, Connected), Error=io::Error> + Send>; 148 149 #[cfg(feature = "runtime")] 150 #[derive(Clone)] 151 pub struct MockConnector { 152 mocks: Arc<Mutex<MockedConnections>>, 153 } 154 155 #[cfg(feature = "runtime")] 156 struct MockedConnections(HashMap<String, Vec<BoxedConnectFut>>); 157 158 #[cfg(feature = "runtime")] 159 impl MockConnector { 160 pub fn new() -> MockConnector { 161 MockConnector { 162 mocks: Arc::new(Mutex::new(MockedConnections(HashMap::new()))), 163 } 164 } 165 166 pub fn mock(&mut self, key: &str) -> DuplexHandle { 167 use futures::future; 168 self.mock_fut(key, future::ok::<_, ()>(())) 169 } 170 171 pub fn mock_fut<F>(&mut self, key: &str, fut: F) -> DuplexHandle 172 where 173 F: Future + Send + 'static, 174 { 175 self.mock_opts(key, Connected::new(), fut) 176 } 177 178 pub fn mock_opts<F>(&mut self, key: &str, connected: Connected, fut: F) -> DuplexHandle 179 where 180 F: Future + Send + 'static, 181 { 182 let key = key.to_owned(); 183 184 let (duplex, handle) = Duplex::channel(); 185 186 let fut = Box::new(fut.then(move |_| { 187 trace!("MockConnector mocked fut ready"); 188 Ok((duplex, connected)) 189 })); 190 self.mocks.lock().unwrap().0.entry(key) 191 .or_insert(Vec::new()) 192 .push(fut); 193 194 handle 195 } 196 } 197 198 #[cfg(feature = "runtime")] 199 impl Connect for MockConnector { 200 type Transport = Duplex; 201 type Error = io::Error; 202 type Future = BoxedConnectFut; 203 204 fn connect(&self, dst: Destination) -> Self::Future { 205 trace!("mock connect: {:?}", dst); 206 let key = format!("{}://{}{}", dst.scheme(), dst.host(), if let Some(port) = dst.port() { 207 format!(":{}", port) 208 } else { 209 "".to_owned() 210 }); 211 let mut mocks = self.mocks.lock().unwrap(); 212 let mocks = mocks.0.get_mut(&key) 213 .expect(&format!("unknown mocks uri: {}", key)); 214 assert!(!mocks.is_empty(), "no additional mocks for {}", key); 215 mocks.remove(0) 216 } 217 } 218 219 220 #[cfg(feature = "runtime")] 221 impl Drop for MockedConnections { 222 fn drop(&mut self) { 223 if !::std::thread::panicking() { 224 for (key, mocks) in self.0.iter() { 225 assert_eq!( 226 mocks.len(), 227 0, 228 "not all mocked connects for {:?} were used", 229 key, 230 ); 231 } 232 } 233 } 234 } 235 */ 236