1 #![cfg(feature = "full")]
2 #![cfg(all(windows))]
3
4 use std::io;
5 use std::mem;
6 use std::os::windows::io::AsRawHandle;
7 use std::time::Duration;
8 use tokio::io::AsyncWriteExt;
9 use tokio::net::windows::named_pipe::{ClientOptions, PipeMode, ServerOptions};
10 use tokio::time;
11 use winapi::shared::winerror;
12
13 #[tokio::test]
test_named_pipe_client_drop() -> io::Result<()>14 async fn test_named_pipe_client_drop() -> io::Result<()> {
15 const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-client-drop";
16
17 let mut server = ServerOptions::new().create(PIPE_NAME)?;
18
19 assert_eq!(num_instances("test-named-pipe-client-drop")?, 1);
20
21 let client = ClientOptions::new().open(PIPE_NAME)?;
22
23 server.connect().await?;
24 drop(client);
25
26 // instance will be broken because client is gone
27 match server.write_all(b"ping").await {
28 Err(e) if e.raw_os_error() == Some(winerror::ERROR_NO_DATA as i32) => (),
29 x => panic!("{:?}", x),
30 }
31
32 Ok(())
33 }
34
35 #[tokio::test]
test_named_pipe_single_client() -> io::Result<()>36 async fn test_named_pipe_single_client() -> io::Result<()> {
37 use tokio::io::{AsyncBufReadExt as _, BufReader};
38
39 const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-single-client";
40
41 let server = ServerOptions::new().create(PIPE_NAME)?;
42
43 let server = tokio::spawn(async move {
44 // Note: we wait for a client to connect.
45 server.connect().await?;
46
47 let mut server = BufReader::new(server);
48
49 let mut buf = String::new();
50 server.read_line(&mut buf).await?;
51 server.write_all(b"pong\n").await?;
52 Ok::<_, io::Error>(buf)
53 });
54
55 let client = tokio::spawn(async move {
56 let client = ClientOptions::new().open(PIPE_NAME)?;
57
58 let mut client = BufReader::new(client);
59
60 let mut buf = String::new();
61 client.write_all(b"ping\n").await?;
62 client.read_line(&mut buf).await?;
63 Ok::<_, io::Error>(buf)
64 });
65
66 let (server, client) = tokio::try_join!(server, client)?;
67
68 assert_eq!(server?, "ping\n");
69 assert_eq!(client?, "pong\n");
70
71 Ok(())
72 }
73
74 #[tokio::test]
test_named_pipe_multi_client() -> io::Result<()>75 async fn test_named_pipe_multi_client() -> io::Result<()> {
76 use tokio::io::{AsyncBufReadExt as _, BufReader};
77
78 const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client";
79 const N: usize = 10;
80
81 // The first server needs to be constructed early so that clients can
82 // be correctly connected. Otherwise calling .wait will cause the client to
83 // error.
84 let mut server = ServerOptions::new().create(PIPE_NAME)?;
85
86 let server = tokio::spawn(async move {
87 for _ in 0..N {
88 // Wait for client to connect.
89 server.connect().await?;
90 let mut inner = BufReader::new(server);
91
92 // Construct the next server to be connected before sending the one
93 // we already have of onto a task. This ensures that the server
94 // isn't closed (after it's done in the task) before a new one is
95 // available. Otherwise the client might error with
96 // `io::ErrorKind::NotFound`.
97 server = ServerOptions::new().create(PIPE_NAME)?;
98
99 let _ = tokio::spawn(async move {
100 let mut buf = String::new();
101 inner.read_line(&mut buf).await?;
102 inner.write_all(b"pong\n").await?;
103 inner.flush().await?;
104 Ok::<_, io::Error>(())
105 });
106 }
107
108 Ok::<_, io::Error>(())
109 });
110
111 let mut clients = Vec::new();
112
113 for _ in 0..N {
114 clients.push(tokio::spawn(async move {
115 // This showcases a generic connect loop.
116 //
117 // We immediately try to create a client, if it's not found or the
118 // pipe is busy we use the specialized wait function on the client
119 // builder.
120 let client = loop {
121 match ClientOptions::new().open(PIPE_NAME) {
122 Ok(client) => break client,
123 Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
124 Err(e) if e.kind() == io::ErrorKind::NotFound => (),
125 Err(e) => return Err(e),
126 }
127
128 // Wait for a named pipe to become available.
129 time::sleep(Duration::from_millis(10)).await;
130 };
131
132 let mut client = BufReader::new(client);
133
134 let mut buf = String::new();
135 client.write_all(b"ping\n").await?;
136 client.flush().await?;
137 client.read_line(&mut buf).await?;
138 Ok::<_, io::Error>(buf)
139 }));
140 }
141
142 for client in clients {
143 let result = client.await?;
144 assert_eq!(result?, "pong\n");
145 }
146
147 server.await??;
148 Ok(())
149 }
150
151 #[tokio::test]
test_named_pipe_multi_client_ready() -> io::Result<()>152 async fn test_named_pipe_multi_client_ready() -> io::Result<()> {
153 use tokio::io::Interest;
154
155 const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client-ready";
156 const N: usize = 10;
157
158 // The first server needs to be constructed early so that clients can
159 // be correctly connected. Otherwise calling .wait will cause the client to
160 // error.
161 let mut server = ServerOptions::new().create(PIPE_NAME)?;
162
163 let server = tokio::spawn(async move {
164 for _ in 0..N {
165 // Wait for client to connect.
166 server.connect().await?;
167
168 let inner_server = server;
169
170 // Construct the next server to be connected before sending the one
171 // we already have of onto a task. This ensures that the server
172 // isn't closed (after it's done in the task) before a new one is
173 // available. Otherwise the client might error with
174 // `io::ErrorKind::NotFound`.
175 server = ServerOptions::new().create(PIPE_NAME)?;
176
177 let _ = tokio::spawn(async move {
178 let server = inner_server;
179
180 {
181 let mut read_buf = [0u8; 5];
182 let mut read_buf_cursor = 0;
183
184 loop {
185 server.readable().await?;
186
187 let buf = &mut read_buf[read_buf_cursor..];
188
189 match server.try_read(buf) {
190 Ok(n) => {
191 read_buf_cursor += n;
192
193 if read_buf_cursor == read_buf.len() {
194 break;
195 }
196 }
197 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
198 continue;
199 }
200 Err(e) => {
201 return Err(e);
202 }
203 }
204 }
205 };
206
207 {
208 let write_buf = b"pong\n";
209 let mut write_buf_cursor = 0;
210
211 loop {
212 server.writable().await?;
213 let buf = &write_buf[write_buf_cursor..];
214
215 match server.try_write(buf) {
216 Ok(n) => {
217 write_buf_cursor += n;
218
219 if write_buf_cursor == write_buf.len() {
220 break;
221 }
222 }
223 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
224 continue;
225 }
226 Err(e) => {
227 return Err(e);
228 }
229 }
230 }
231 }
232
233 Ok::<_, io::Error>(())
234 });
235 }
236
237 Ok::<_, io::Error>(())
238 });
239
240 let mut clients = Vec::new();
241
242 for _ in 0..N {
243 clients.push(tokio::spawn(async move {
244 // This showcases a generic connect loop.
245 //
246 // We immediately try to create a client, if it's not found or the
247 // pipe is busy we use the specialized wait function on the client
248 // builder.
249 let client = loop {
250 match ClientOptions::new().open(PIPE_NAME) {
251 Ok(client) => break client,
252 Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
253 Err(e) if e.kind() == io::ErrorKind::NotFound => (),
254 Err(e) => return Err(e),
255 }
256
257 // Wait for a named pipe to become available.
258 time::sleep(Duration::from_millis(10)).await;
259 };
260
261 let mut read_buf = [0u8; 5];
262 let mut read_buf_cursor = 0;
263 let write_buf = b"ping\n";
264 let mut write_buf_cursor = 0;
265
266 loop {
267 let mut interest = Interest::READABLE;
268 if write_buf_cursor < write_buf.len() {
269 interest |= Interest::WRITABLE;
270 }
271
272 let ready = client.ready(interest).await?;
273
274 if ready.is_readable() {
275 let buf = &mut read_buf[read_buf_cursor..];
276
277 match client.try_read(buf) {
278 Ok(n) => {
279 read_buf_cursor += n;
280
281 if read_buf_cursor == read_buf.len() {
282 break;
283 }
284 }
285 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
286 continue;
287 }
288 Err(e) => {
289 return Err(e);
290 }
291 }
292 }
293
294 if ready.is_writable() {
295 let buf = &write_buf[write_buf_cursor..];
296
297 if buf.is_empty() {
298 continue;
299 }
300
301 match client.try_write(buf) {
302 Ok(n) => {
303 write_buf_cursor += n;
304 }
305 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
306 continue;
307 }
308 Err(e) => {
309 return Err(e);
310 }
311 }
312 }
313 }
314
315 let buf = String::from_utf8_lossy(&read_buf).into_owned();
316
317 Ok::<_, io::Error>(buf)
318 }));
319 }
320
321 for client in clients {
322 let result = client.await?;
323 assert_eq!(result?, "pong\n");
324 }
325
326 server.await??;
327 Ok(())
328 }
329
330 // This tests what happens when a client tries to disconnect.
331 #[tokio::test]
test_named_pipe_mode_message() -> io::Result<()>332 async fn test_named_pipe_mode_message() -> io::Result<()> {
333 const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-mode-message";
334
335 let server = ServerOptions::new()
336 .pipe_mode(PipeMode::Message)
337 .create(PIPE_NAME)?;
338
339 let _ = ClientOptions::new().open(PIPE_NAME)?;
340 server.connect().await?;
341 Ok(())
342 }
343
num_instances(pipe_name: impl AsRef<str>) -> io::Result<u32>344 fn num_instances(pipe_name: impl AsRef<str>) -> io::Result<u32> {
345 use ntapi::ntioapi;
346 use winapi::shared::ntdef;
347
348 let mut name = pipe_name.as_ref().encode_utf16().collect::<Vec<_>>();
349 let mut name = ntdef::UNICODE_STRING {
350 Length: (name.len() * mem::size_of::<u16>()) as u16,
351 MaximumLength: (name.len() * mem::size_of::<u16>()) as u16,
352 Buffer: name.as_mut_ptr(),
353 };
354 let root = std::fs::File::open(r"\\.\Pipe\")?;
355 let mut io_status_block = unsafe { mem::zeroed() };
356 let mut file_directory_information = [0_u8; 1024];
357
358 let status = unsafe {
359 ntioapi::NtQueryDirectoryFile(
360 root.as_raw_handle(),
361 std::ptr::null_mut(),
362 None,
363 std::ptr::null_mut(),
364 &mut io_status_block,
365 &mut file_directory_information as *mut _ as *mut _,
366 1024,
367 ntioapi::FileDirectoryInformation,
368 0,
369 &mut name,
370 0,
371 )
372 };
373
374 if status as u32 != winerror::NO_ERROR {
375 return Err(io::Error::last_os_error());
376 }
377
378 let info = unsafe {
379 mem::transmute::<_, &ntioapi::FILE_DIRECTORY_INFORMATION>(&file_directory_information)
380 };
381 let raw_name = unsafe {
382 std::slice::from_raw_parts(
383 info.FileName.as_ptr(),
384 info.FileNameLength as usize / mem::size_of::<u16>(),
385 )
386 };
387 let name = String::from_utf16(raw_name).unwrap();
388 let num_instances = unsafe { *info.EndOfFile.QuadPart() };
389
390 assert_eq!(name, pipe_name.as_ref());
391
392 Ok(num_instances as u32)
393 }
394