1 #![cfg(feature = "full")]
2 #![cfg(all(windows))]
3 
4 use std::io;
5 use std::mem;
6 use std::os::windows::io::AsRawHandle;
7 use std::time::Duration;
8 use tokio::io::AsyncWriteExt;
9 use tokio::net::windows::named_pipe::{ClientOptions, PipeMode, ServerOptions};
10 use tokio::time;
11 use winapi::shared::winerror;
12 
13 #[tokio::test]
test_named_pipe_client_drop() -> io::Result<()>14 async fn test_named_pipe_client_drop() -> io::Result<()> {
15     const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-client-drop";
16 
17     let mut server = ServerOptions::new().create(PIPE_NAME)?;
18 
19     assert_eq!(num_instances("test-named-pipe-client-drop")?, 1);
20 
21     let client = ClientOptions::new().open(PIPE_NAME)?;
22 
23     server.connect().await?;
24     drop(client);
25 
26     // instance will be broken because client is gone
27     match server.write_all(b"ping").await {
28         Err(e) if e.raw_os_error() == Some(winerror::ERROR_NO_DATA as i32) => (),
29         x => panic!("{:?}", x),
30     }
31 
32     Ok(())
33 }
34 
35 #[tokio::test]
test_named_pipe_single_client() -> io::Result<()>36 async fn test_named_pipe_single_client() -> io::Result<()> {
37     use tokio::io::{AsyncBufReadExt as _, BufReader};
38 
39     const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-single-client";
40 
41     let server = ServerOptions::new().create(PIPE_NAME)?;
42 
43     let server = tokio::spawn(async move {
44         // Note: we wait for a client to connect.
45         server.connect().await?;
46 
47         let mut server = BufReader::new(server);
48 
49         let mut buf = String::new();
50         server.read_line(&mut buf).await?;
51         server.write_all(b"pong\n").await?;
52         Ok::<_, io::Error>(buf)
53     });
54 
55     let client = tokio::spawn(async move {
56         let client = ClientOptions::new().open(PIPE_NAME)?;
57 
58         let mut client = BufReader::new(client);
59 
60         let mut buf = String::new();
61         client.write_all(b"ping\n").await?;
62         client.read_line(&mut buf).await?;
63         Ok::<_, io::Error>(buf)
64     });
65 
66     let (server, client) = tokio::try_join!(server, client)?;
67 
68     assert_eq!(server?, "ping\n");
69     assert_eq!(client?, "pong\n");
70 
71     Ok(())
72 }
73 
74 #[tokio::test]
test_named_pipe_multi_client() -> io::Result<()>75 async fn test_named_pipe_multi_client() -> io::Result<()> {
76     use tokio::io::{AsyncBufReadExt as _, BufReader};
77 
78     const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client";
79     const N: usize = 10;
80 
81     // The first server needs to be constructed early so that clients can
82     // be correctly connected. Otherwise calling .wait will cause the client to
83     // error.
84     let mut server = ServerOptions::new().create(PIPE_NAME)?;
85 
86     let server = tokio::spawn(async move {
87         for _ in 0..N {
88             // Wait for client to connect.
89             server.connect().await?;
90             let mut inner = BufReader::new(server);
91 
92             // Construct the next server to be connected before sending the one
93             // we already have of onto a task. This ensures that the server
94             // isn't closed (after it's done in the task) before a new one is
95             // available. Otherwise the client might error with
96             // `io::ErrorKind::NotFound`.
97             server = ServerOptions::new().create(PIPE_NAME)?;
98 
99             let _ = tokio::spawn(async move {
100                 let mut buf = String::new();
101                 inner.read_line(&mut buf).await?;
102                 inner.write_all(b"pong\n").await?;
103                 inner.flush().await?;
104                 Ok::<_, io::Error>(())
105             });
106         }
107 
108         Ok::<_, io::Error>(())
109     });
110 
111     let mut clients = Vec::new();
112 
113     for _ in 0..N {
114         clients.push(tokio::spawn(async move {
115             // This showcases a generic connect loop.
116             //
117             // We immediately try to create a client, if it's not found or the
118             // pipe is busy we use the specialized wait function on the client
119             // builder.
120             let client = loop {
121                 match ClientOptions::new().open(PIPE_NAME) {
122                     Ok(client) => break client,
123                     Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
124                     Err(e) if e.kind() == io::ErrorKind::NotFound => (),
125                     Err(e) => return Err(e),
126                 }
127 
128                 // Wait for a named pipe to become available.
129                 time::sleep(Duration::from_millis(10)).await;
130             };
131 
132             let mut client = BufReader::new(client);
133 
134             let mut buf = String::new();
135             client.write_all(b"ping\n").await?;
136             client.flush().await?;
137             client.read_line(&mut buf).await?;
138             Ok::<_, io::Error>(buf)
139         }));
140     }
141 
142     for client in clients {
143         let result = client.await?;
144         assert_eq!(result?, "pong\n");
145     }
146 
147     server.await??;
148     Ok(())
149 }
150 
151 #[tokio::test]
test_named_pipe_multi_client_ready() -> io::Result<()>152 async fn test_named_pipe_multi_client_ready() -> io::Result<()> {
153     use tokio::io::Interest;
154 
155     const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client-ready";
156     const N: usize = 10;
157 
158     // The first server needs to be constructed early so that clients can
159     // be correctly connected. Otherwise calling .wait will cause the client to
160     // error.
161     let mut server = ServerOptions::new().create(PIPE_NAME)?;
162 
163     let server = tokio::spawn(async move {
164         for _ in 0..N {
165             // Wait for client to connect.
166             server.connect().await?;
167 
168             let inner_server = server;
169 
170             // Construct the next server to be connected before sending the one
171             // we already have of onto a task. This ensures that the server
172             // isn't closed (after it's done in the task) before a new one is
173             // available. Otherwise the client might error with
174             // `io::ErrorKind::NotFound`.
175             server = ServerOptions::new().create(PIPE_NAME)?;
176 
177             let _ = tokio::spawn(async move {
178                 let server = inner_server;
179 
180                 {
181                     let mut read_buf = [0u8; 5];
182                     let mut read_buf_cursor = 0;
183 
184                     loop {
185                         server.readable().await?;
186 
187                         let buf = &mut read_buf[read_buf_cursor..];
188 
189                         match server.try_read(buf) {
190                             Ok(n) => {
191                                 read_buf_cursor += n;
192 
193                                 if read_buf_cursor == read_buf.len() {
194                                     break;
195                                 }
196                             }
197                             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
198                                 continue;
199                             }
200                             Err(e) => {
201                                 return Err(e);
202                             }
203                         }
204                     }
205                 };
206 
207                 {
208                     let write_buf = b"pong\n";
209                     let mut write_buf_cursor = 0;
210 
211                     loop {
212                         server.writable().await?;
213                         let buf = &write_buf[write_buf_cursor..];
214 
215                         match server.try_write(buf) {
216                             Ok(n) => {
217                                 write_buf_cursor += n;
218 
219                                 if write_buf_cursor == write_buf.len() {
220                                     break;
221                                 }
222                             }
223                             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
224                                 continue;
225                             }
226                             Err(e) => {
227                                 return Err(e);
228                             }
229                         }
230                     }
231                 }
232 
233                 Ok::<_, io::Error>(())
234             });
235         }
236 
237         Ok::<_, io::Error>(())
238     });
239 
240     let mut clients = Vec::new();
241 
242     for _ in 0..N {
243         clients.push(tokio::spawn(async move {
244             // This showcases a generic connect loop.
245             //
246             // We immediately try to create a client, if it's not found or the
247             // pipe is busy we use the specialized wait function on the client
248             // builder.
249             let client = loop {
250                 match ClientOptions::new().open(PIPE_NAME) {
251                     Ok(client) => break client,
252                     Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
253                     Err(e) if e.kind() == io::ErrorKind::NotFound => (),
254                     Err(e) => return Err(e),
255                 }
256 
257                 // Wait for a named pipe to become available.
258                 time::sleep(Duration::from_millis(10)).await;
259             };
260 
261             let mut read_buf = [0u8; 5];
262             let mut read_buf_cursor = 0;
263             let write_buf = b"ping\n";
264             let mut write_buf_cursor = 0;
265 
266             loop {
267                 let mut interest = Interest::READABLE;
268                 if write_buf_cursor < write_buf.len() {
269                     interest |= Interest::WRITABLE;
270                 }
271 
272                 let ready = client.ready(interest).await?;
273 
274                 if ready.is_readable() {
275                     let buf = &mut read_buf[read_buf_cursor..];
276 
277                     match client.try_read(buf) {
278                         Ok(n) => {
279                             read_buf_cursor += n;
280 
281                             if read_buf_cursor == read_buf.len() {
282                                 break;
283                             }
284                         }
285                         Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
286                             continue;
287                         }
288                         Err(e) => {
289                             return Err(e);
290                         }
291                     }
292                 }
293 
294                 if ready.is_writable() {
295                     let buf = &write_buf[write_buf_cursor..];
296 
297                     if buf.is_empty() {
298                         continue;
299                     }
300 
301                     match client.try_write(buf) {
302                         Ok(n) => {
303                             write_buf_cursor += n;
304                         }
305                         Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
306                             continue;
307                         }
308                         Err(e) => {
309                             return Err(e);
310                         }
311                     }
312                 }
313             }
314 
315             let buf = String::from_utf8_lossy(&read_buf).into_owned();
316 
317             Ok::<_, io::Error>(buf)
318         }));
319     }
320 
321     for client in clients {
322         let result = client.await?;
323         assert_eq!(result?, "pong\n");
324     }
325 
326     server.await??;
327     Ok(())
328 }
329 
330 // This tests what happens when a client tries to disconnect.
331 #[tokio::test]
test_named_pipe_mode_message() -> io::Result<()>332 async fn test_named_pipe_mode_message() -> io::Result<()> {
333     const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-mode-message";
334 
335     let server = ServerOptions::new()
336         .pipe_mode(PipeMode::Message)
337         .create(PIPE_NAME)?;
338 
339     let _ = ClientOptions::new().open(PIPE_NAME)?;
340     server.connect().await?;
341     Ok(())
342 }
343 
num_instances(pipe_name: impl AsRef<str>) -> io::Result<u32>344 fn num_instances(pipe_name: impl AsRef<str>) -> io::Result<u32> {
345     use ntapi::ntioapi;
346     use winapi::shared::ntdef;
347 
348     let mut name = pipe_name.as_ref().encode_utf16().collect::<Vec<_>>();
349     let mut name = ntdef::UNICODE_STRING {
350         Length: (name.len() * mem::size_of::<u16>()) as u16,
351         MaximumLength: (name.len() * mem::size_of::<u16>()) as u16,
352         Buffer: name.as_mut_ptr(),
353     };
354     let root = std::fs::File::open(r"\\.\Pipe\")?;
355     let mut io_status_block = unsafe { mem::zeroed() };
356     let mut file_directory_information = [0_u8; 1024];
357 
358     let status = unsafe {
359         ntioapi::NtQueryDirectoryFile(
360             root.as_raw_handle(),
361             std::ptr::null_mut(),
362             None,
363             std::ptr::null_mut(),
364             &mut io_status_block,
365             &mut file_directory_information as *mut _ as *mut _,
366             1024,
367             ntioapi::FileDirectoryInformation,
368             0,
369             &mut name,
370             0,
371         )
372     };
373 
374     if status as u32 != winerror::NO_ERROR {
375         return Err(io::Error::last_os_error());
376     }
377 
378     let info = unsafe {
379         mem::transmute::<_, &ntioapi::FILE_DIRECTORY_INFORMATION>(&file_directory_information)
380     };
381     let raw_name = unsafe {
382         std::slice::from_raw_parts(
383             info.FileName.as_ptr(),
384             info.FileNameLength as usize / mem::size_of::<u16>(),
385         )
386     };
387     let name = String::from_utf16(raw_name).unwrap();
388     let num_instances = unsafe { *info.EndOfFile.QuadPart() };
389 
390     assert_eq!(name, pipe_name.as_ref());
391 
392     Ok(num_instances as u32)
393 }
394