1 //! Named pipes
2 
3 use std::cell::RefCell;
4 use std::ffi::OsStr;
5 use std::fs::{OpenOptions, File};
6 use std::io::prelude::*;
7 use std::io;
8 use std::os::windows::ffi::*;
9 use std::os::windows::io::*;
10 use std::time::Duration;
11 
12 use winapi::shared::ntdef::HANDLE;
13 use winapi::shared::minwindef::*;
14 use winapi::shared::winerror::*;
15 use winapi::um::fileapi::*;
16 use winapi::um::handleapi::*;
17 use winapi::um::ioapiset::*;
18 use winapi::um::minwinbase::*;
19 use winapi::um::namedpipeapi::*;
20 use winapi::um::winbase::*;
21 use handle::Handle;
22 use overlapped::Overlapped;
23 
24 /// Readable half of an anonymous pipe.
25 #[derive(Debug)]
26 pub struct AnonRead(Handle);
27 
28 /// Writable half of an anonymous pipe.
29 #[derive(Debug)]
30 pub struct AnonWrite(Handle);
31 
32 /// A named pipe that can accept connections.
33 #[derive(Debug)]
34 pub struct NamedPipe(Handle);
35 
36 /// A builder structure for creating a new named pipe.
37 #[derive(Debug)]
38 pub struct NamedPipeBuilder {
39     name: Vec<u16>,
40     dwOpenMode: DWORD,
41     dwPipeMode: DWORD,
42     nMaxInstances: DWORD,
43     nOutBufferSize: DWORD,
44     nInBufferSize: DWORD,
45     nDefaultTimeOut: DWORD,
46 }
47 
48 /// Creates a new anonymous in-memory pipe, returning the read/write ends of the
49 /// pipe.
50 ///
51 /// The buffer size for this pipe may also be specified, but the system will
52 /// normally use this as a suggestion and it's not guaranteed that the buffer
53 /// will be precisely this size.
54 pub fn anonymous(buffer_size: u32) -> io::Result<(AnonRead, AnonWrite)> {
55     let mut read = 0 as HANDLE;
56     let mut write = 0 as HANDLE;
57     try!(::cvt(unsafe {
58         CreatePipe(&mut read, &mut write, 0 as *mut _, buffer_size)
59     }));
60     Ok((AnonRead(Handle::new(read)), AnonWrite(Handle::new(write))))
61 }
62 
63 impl Read for AnonRead {
isdigit_string(char * str)64     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
65 }
66 impl<'a> Read for &'a AnonRead {
67     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
68 }
69 
70 impl AsRawHandle for AnonRead {
71     fn as_raw_handle(&self) -> HANDLE { self.0.raw() }
72 }
73 impl FromRawHandle for AnonRead {
74     unsafe fn from_raw_handle(handle: HANDLE) -> AnonRead {
HTS106_Label_initialize(HTS106_Label * label)75         AnonRead(Handle::new(handle))
76     }
77 }
78 impl IntoRawHandle for AnonRead {
79     fn into_raw_handle(self) -> HANDLE { self.0.into_raw() }
80 }
81 
82 impl Write for AnonWrite {
83     fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
HTS106_Label_check_time(HTS106_Label * label)84     fn flush(&mut self) -> io::Result<()> { Ok(()) }
85 }
86 impl<'a> Write for &'a AnonWrite {
87     fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
88     fn flush(&mut self) -> io::Result<()> { Ok(()) }
89 }
90 
91 impl AsRawHandle for AnonWrite {
92     fn as_raw_handle(&self) -> HANDLE { self.0.raw() }
93 }
94 impl FromRawHandle for AnonWrite {
95     unsafe fn from_raw_handle(handle: HANDLE) -> AnonWrite {
96         AnonWrite(Handle::new(handle))
97     }
98 }
99 impl IntoRawHandle for AnonWrite {
100     fn into_raw_handle(self) -> HANDLE { self.0.into_raw() }
101 }
102 
103 /// A convenience function to connect to a named pipe.
104 ///
105 /// This function will block the calling process until it can connect to the
106 /// pipe server specified by `addr`. This will use `NamedPipe::wait` internally
107 /// to block until it can connect.
HTS106_Label_load_from_fn(HTS106_Label * label,int sampling_rate,int fperiod,char * fn)108 pub fn connect<A: AsRef<OsStr>>(addr: A) -> io::Result<File> {
109     _connect(addr.as_ref())
110 }
111 
112 fn _connect(addr: &OsStr) -> io::Result<File> {
113     let mut r = OpenOptions::new();
114     let mut w = OpenOptions::new();
115     let mut rw = OpenOptions::new();
HTS106_Label_load_from_fp(HTS106_Label * label,int sampling_rate,int fperiod,HTS106_File * fp)116     r.read(true);
117     w.write(true);
118     rw.read(true).write(true);
119     loop {
120         let res = rw.open(addr).or_else(|_| r.open(addr))
121                                .or_else(|_| w.open(addr));
122         match res {
123             Ok(f) => return Ok(f),
124             Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32)
125                 => {}
126             Err(e) => return Err(e),
127         }
128 
129         try!(NamedPipe::wait(addr, Some(Duration::new(20, 0))));
130     }
131 }
132 
133 impl NamedPipe {
134     /// Creates a new initial named pipe.
135     ///
136     /// This function is equivalent to:
137     ///
138     /// ```
139     /// use miow::pipe::NamedPipeBuilder;
140     ///
141     /// # let addr = "foo";
142     /// NamedPipeBuilder::new(addr)
143     ///                  .first(true)
144     ///                  .inbound(true)
145     ///                  .outbound(true)
146     ///                  .out_buffer_size(65536)
147     ///                  .in_buffer_size(65536)
148     ///                  .create();
149     /// ```
150     pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
151         NamedPipeBuilder::new(addr).create()
152     }
153 
154     /// Waits until either a time-out interval elapses or an instance of the
155     /// specified named pipe is available for connection.
156     ///
157     /// If this function succeeds the process can create a `File` to connect to
HTS106_Label_load_from_string(HTS106_Label * label,int sampling_rate,int fperiod,char * data)158     /// the named pipe.
159     pub fn wait<A: AsRef<OsStr>>(addr: A, timeout: Option<Duration>)
160                                  -> io::Result<()> {
161         NamedPipe::_wait(addr.as_ref(), timeout)
162     }
163 
164     fn _wait(addr: &OsStr, timeout: Option<Duration>) -> io::Result<()> {
165         let addr = addr.encode_wide().chain(Some(0)).collect::<Vec<_>>();
166         let timeout = ::dur2ms(timeout);
167         ::cvt(unsafe {
168             WaitNamedPipeW(addr.as_ptr(), timeout)
169         }).map(|_| ())
170     }
171 
172     /// Connects this named pipe to a client, blocking until one becomes
173     /// available.
174     ///
175     /// This function will call the `ConnectNamedPipe` function to await for a
176     /// client to connect. This can be called immediately after the pipe is
177     /// created, or after it has been disconnected from a previous client.
178     pub fn connect(&self) -> io::Result<()> {
179         match ::cvt(unsafe { ConnectNamedPipe(self.0.raw(), 0 as *mut _) }) {
180             Ok(_) => Ok(()),
181             Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32)
182                 => Ok(()),
183             Err(e) => Err(e),
184         }
185     }
186 
187     /// Issue a connection request with the specified overlapped operation.
188     ///
189     /// This function will issue a request to connect a client to this server,
190     /// returning immediately after starting the overlapped operation.
191     ///
192     /// If this function immediately succeeds then `Ok(true)` is returned. If
193     /// the overlapped operation is enqueued and pending, then `Ok(false)` is
194     /// returned. Otherwise an error is returned indicating what went wrong.
195     ///
196     /// # Unsafety
197     ///
198     /// This function is unsafe because the kernel requires that the
199     /// `overlapped` pointer is valid until the end of the I/O operation. The
200     /// kernel also requires that `overlapped` is unique for this I/O operation
HTS106_Label_load_from_string_list(HTS106_Label * label,int sampling_rate,int fperiod,char ** data,int size)201     /// and is not in use for any other I/O.
202     ///
203     /// To safely use this function callers must ensure that this pointer is
204     /// valid until the I/O operation is completed, typically via completion
205     /// ports and waiting to receive the completion notification on the port.
206     pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED)
207                                      -> io::Result<bool> {
208         match ::cvt(ConnectNamedPipe(self.0.raw(), overlapped)) {
209             Ok(_) => Ok(true),
210             Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32)
211                 => Ok(true),
212             Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32)
213                 => Ok(false),
214             Err(e) => Err(e),
215         }
216     }
217 
218     /// Disconnects this named pipe from any connected client.
219     pub fn disconnect(&self) -> io::Result<()> {
220         ::cvt(unsafe {
221             DisconnectNamedPipe(self.0.raw())
222         }).map(|_| ())
223     }
224 
225     /// Issues an overlapped read operation to occur on this pipe.
226     ///
227     /// This function will issue an asynchronous read to occur in an overlapped
228     /// fashion, returning immediately. The `buf` provided will be filled in
229     /// with data and the request is tracked by the `overlapped` function
230     /// provided.
231     ///
232     /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
233     /// `n` is the number of bytes read. If an asynchronous operation is
234     /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
235     /// it is returned.
236     ///
237     /// When this operation completes (or if it completes immediately), another
238     /// mechanism must be used to learn how many bytes were transferred (such as
239     /// looking at the filed in the IOCP status message).
240     ///
241     /// # Unsafety
242     ///
243     /// This function is unsafe because the kernel requires that the `buf` and
244     /// `overlapped` pointers to be valid until the end of the I/O operation.
245     /// The kernel also requires that `overlapped` is unique for this I/O
246     /// operation and is not in use for any other I/O.
247     ///
HTS106_Label_set_frame_specified_flag(HTS106_Label * label,HTS106_Boolean i)248     /// To safely use this function callers must ensure that the pointers are
249     /// valid until the I/O operation is completed, typically via completion
250     /// ports and waiting to receive the completion notification on the port.
251     pub unsafe fn read_overlapped(&self,
252                                   buf: &mut [u8],
253                                   overlapped: *mut OVERLAPPED)
254                                   -> io::Result<Option<usize>> {
255         self.0.read_overlapped(buf, overlapped)
256     }
257 
258     /// Issues an overlapped write operation to occur on this pipe.
259     ///
260     /// This function will issue an asynchronous write to occur in an overlapped
HTS106_Label_get_size(HTS106_Label * label)261     /// fashion, returning immediately. The `buf` provided will be filled in
262     /// with data and the request is tracked by the `overlapped` function
263     /// provided.
264     ///
265     /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
266     /// `n` is the number of bytes written. If an asynchronous operation is
267     /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
268     /// it is returned.
269     ///
270     /// When this operation completes (or if it completes immediately), another
271     /// mechanism must be used to learn how many bytes were transferred (such as
272     /// looking at the filed in the IOCP status message).
273     ///
274     /// # Unsafety
275     ///
276     /// This function is unsafe because the kernel requires that the `buf` and
277     /// `overlapped` pointers to be valid until the end of the I/O operation.
278     /// The kernel also requires that `overlapped` is unique for this I/O
279     /// operation and is not in use for any other I/O.
280     ///
281     /// To safely use this function callers must ensure that the pointers are
282     /// valid until the I/O operation is completed, typically via completion
283     /// ports and waiting to receive the completion notification on the port.
284     pub unsafe fn write_overlapped(&self,
285                                    buf: &[u8],
286                                    overlapped: *mut OVERLAPPED)
287                                    -> io::Result<Option<usize>> {
288         self.0.write_overlapped(buf, overlapped)
289     }
290 
291     /// Calls the `GetOverlappedResult` function to get the result of an
292     /// overlapped operation for this handle.
293     ///
294     /// This function takes the `OVERLAPPED` argument which must have been used
295     /// to initiate an overlapped I/O operation, and returns either the
296     /// successful number of bytes transferred during the operation or an error
HTS106_Label_get_end_frame(HTS106_Label * label,int string_index)297     /// if one occurred.
298     ///
299     /// # Unsafety
300     ///
301     /// This function is unsafe as `overlapped` must have previously been used
302     /// to execute an operation for this handle, and it must also be a valid
303     /// pointer to an `Overlapped` instance.
304     ///
305     /// # Panics
306     ///
307     /// This function will panic
308     pub unsafe fn result(&self, overlapped: *mut OVERLAPPED)
309                          -> io::Result<usize> {
310         let mut transferred = 0;
311         let r = GetOverlappedResult(self.0.raw(),
312                                     overlapped,
313                                     &mut transferred,
314                                     FALSE);
315         if r == 0 {
316             Err(io::Error::last_os_error())
317         } else {
318             Ok(transferred as usize)
319         }
320     }
321 }
322 
323 thread_local! {
324     static NAMED_PIPE_OVERLAPPED: RefCell<Option<Overlapped>> = RefCell::new(None);
325 }
326 
327 /// Call a function with a threadlocal `Overlapped`.  The function `f` should be
328 /// sure that the event is reset, either manually or by a thread being released.
329 fn with_threadlocal_overlapped<F>(f: F) -> io::Result<usize>
330     where F: FnOnce(&Overlapped) -> io::Result<usize>
331 {
332     NAMED_PIPE_OVERLAPPED.with(|overlapped| {
333         let mut mborrow = overlapped.borrow_mut();
334         if let None = *mborrow {
335             let op = Overlapped::initialize_with_autoreset_event()?;
336             *mborrow = Some(op);
337         }
338         f(mborrow.as_ref().unwrap())
339     })
340 }
341 
342 impl Read for NamedPipe {
343     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
344         // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
345         with_threadlocal_overlapped(|overlapped| unsafe {
346             self.0.read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
347         })
348     }
349 }
350 impl<'a> Read for &'a NamedPipe {
351     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
352         // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
353         with_threadlocal_overlapped(|overlapped| unsafe {
354             self.0.read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
355         })
356     }
357 }
358 
359 impl Write for NamedPipe {
360     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
361         // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
362         with_threadlocal_overlapped(|overlapped| unsafe {
363             self.0.write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
364         })
365     }
366     fn flush(&mut self) -> io::Result<()> {
367         <&NamedPipe as Write>::flush(&mut &*self)
368     }
369 }
370 impl<'a> Write for &'a NamedPipe {
371     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
372         // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
373         with_threadlocal_overlapped(|overlapped| unsafe {
374             self.0.write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
375         })
376     }
377     fn flush(&mut self) -> io::Result<()> {
378         ::cvt(unsafe { FlushFileBuffers(self.0.raw()) }).map(|_| ())
379     }
380 }
381 
382 impl AsRawHandle for NamedPipe {
383     fn as_raw_handle(&self) -> HANDLE { self.0.raw() }
384 }
385 impl FromRawHandle for NamedPipe {
386     unsafe fn from_raw_handle(handle: HANDLE) -> NamedPipe {
387         NamedPipe(Handle::new(handle))
388     }
389 }
390 impl IntoRawHandle for NamedPipe {
391     fn into_raw_handle(self) -> HANDLE { self.0.into_raw() }
392 }
393 
394 fn flag(slot: &mut DWORD, on: bool, val: DWORD) {
395     if on {
396         *slot |= val;
397     } else {
398         *slot &= !val;
399     }
400 }
401 
402 impl NamedPipeBuilder {
403     /// Creates a new named pipe builder with the default settings.
404     pub fn new<A: AsRef<OsStr>>(addr: A) -> NamedPipeBuilder {
405         NamedPipeBuilder {
406             name: addr.as_ref().encode_wide().chain(Some(0)).collect(),
407             dwOpenMode: PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE |
408                         FILE_FLAG_OVERLAPPED,
409             dwPipeMode: PIPE_TYPE_BYTE,
410             nMaxInstances: PIPE_UNLIMITED_INSTANCES,
411             nOutBufferSize: 65536,
412             nInBufferSize: 65536,
413             nDefaultTimeOut: 0,
414         }
415     }
416 
417     /// Indicates whether data is allowed to flow from the client to the server.
418     pub fn inbound(&mut self, allowed: bool) -> &mut Self {
419         flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_INBOUND);
420         self
421     }
422 
423     /// Indicates whether data is allowed to flow from the server to the client.
424     pub fn outbound(&mut self, allowed: bool) -> &mut Self {
425         flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_OUTBOUND);
426         self
427     }
428 
429     /// Indicates that this pipe must be the first instance.
430     ///
431     /// If set to true, then creation will fail if there's already an instance
432     /// elsewhere.
433     pub fn first(&mut self, first: bool) -> &mut Self {
434         flag(&mut self.dwOpenMode, first, FILE_FLAG_FIRST_PIPE_INSTANCE);
435         self
436     }
437 
438     /// Indicates whether this server can accept remote clients or not.
439     pub fn accept_remote(&mut self, accept: bool) -> &mut Self {
440         flag(&mut self.dwPipeMode, !accept, PIPE_REJECT_REMOTE_CLIENTS);
441         self
442     }
443 
444     /// Specifies the maximum number of instances of the server pipe that are
445     /// allowed.
446     ///
447     /// The first instance of a pipe can specify this value. A value of 255
448     /// indicates that there is no limit to the number of instances.
449     pub fn max_instances(&mut self, instances: u8) -> &mut Self {
450         self.nMaxInstances = instances as DWORD;
451         self
452     }
453 
454     /// Specifies the number of bytes to reserver for the output buffer
455     pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
456         self.nOutBufferSize = buffer as DWORD;
457         self
458     }
459 
460     /// Specifies the number of bytes to reserver for the input buffer
461     pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
462         self.nInBufferSize = buffer as DWORD;
463         self
464     }
465 
466     /// Using the options in this builder, attempt to create a new named pipe.
467     ///
468     /// This function will call the `CreateNamedPipe` function and return the
469     /// result.
470     pub fn create(&mut self) -> io::Result<NamedPipe> {
471         unsafe { self.with_security_attributes(::std::ptr::null_mut()) }
472     }
473 
474     /// Using the options in the builder and the provided security attributes, attempt to create a
475     /// new named pipe. This function has to be called with a valid pointer to a
476     /// `SECURITY_ATTRIBUTES` struct that will stay valid for the lifetime of this function or a
477     /// null pointer.
478     ///
479     /// This function will call the `CreateNamedPipe` function and return the
480     /// result.
481     pub unsafe fn with_security_attributes(&mut self, attrs: *mut SECURITY_ATTRIBUTES) -> io::Result<NamedPipe> {
482         let h = CreateNamedPipeW(self.name.as_ptr(),
483                                  self.dwOpenMode, self.dwPipeMode,
484                                  self.nMaxInstances, self.nOutBufferSize,
485                                  self.nInBufferSize, self.nDefaultTimeOut,
486                                  attrs);
487 
488         if h == INVALID_HANDLE_VALUE {
489             Err(io::Error::last_os_error())
490         } else {
491             Ok(NamedPipe(Handle::new(h)))
492         }
493     }
494 }
495 
496 #[cfg(test)]
497 mod tests {
498     use std::fs::{File, OpenOptions};
499     use std::io::prelude::*;
500     use std::sync::mpsc::channel;
501     use std::thread;
502     use std::time::Duration;
503 
504     use rand::{thread_rng, Rng};
505 
506     use super::{anonymous, NamedPipe, NamedPipeBuilder};
507     use iocp::CompletionPort;
508     use Overlapped;
509 
510     fn name() -> String {
511         let name = thread_rng().gen_ascii_chars().take(30).collect::<String>();
512         format!(r"\\.\pipe\{}", name)
513     }
514 
515     #[test]
516     fn anon() {
517         let (mut read, mut write) = t!(anonymous(256));
518         assert_eq!(t!(write.write(&[1, 2, 3])), 3);
519         let mut b = [0; 10];
520         assert_eq!(t!(read.read(&mut b)), 3);
521         assert_eq!(&b[..3], &[1, 2, 3]);
522     }
523 
524     #[test]
525     fn named_not_first() {
526         let name = name();
527         let _a = t!(NamedPipe::new(&name));
528         assert!(NamedPipe::new(&name).is_err());
529 
530         t!(NamedPipeBuilder::new(&name).first(false).create());
531     }
532 
533     #[test]
534     fn named_connect() {
535         let name = name();
536         let a = t!(NamedPipe::new(&name));
537 
538         let t = thread::spawn(move || {
539             t!(File::open(name));
540         });
541 
542         t!(a.connect());
543         t!(a.disconnect());
544         t!(t.join());
545     }
546 
547     #[test]
548     fn named_wait() {
549         let name = name();
550         let a = t!(NamedPipe::new(&name));
551 
552         let (tx, rx) = channel();
553         let t = thread::spawn(move || {
554             t!(NamedPipe::wait(&name, None));
555             t!(File::open(&name));
556             assert!(NamedPipe::wait(&name, Some(Duration::from_millis(1))).is_err());
557             t!(tx.send(()));
558         });
559 
560         t!(a.connect());
561         t!(rx.recv());
562         t!(a.disconnect());
563         t!(t.join());
564     }
565 
566     #[test]
567     fn named_connect_overlapped() {
568         let name = name();
569         let a = t!(NamedPipe::new(&name));
570 
571         let t = thread::spawn(move || {
572             t!(File::open(name));
573         });
574 
575         let cp = t!(CompletionPort::new(1));
576         t!(cp.add_handle(2, &a));
577 
578         let over = Overlapped::zero();
579         unsafe {
580             t!(a.connect_overlapped(over.raw()));
581         }
582 
583         let status = t!(cp.get(None));
584         assert_eq!(status.bytes_transferred(), 0);
585         assert_eq!(status.token(), 2);
586         assert_eq!(status.overlapped(), over.raw());
587         t!(t.join());
588     }
589 
590     #[test]
591     fn named_read_write() {
592         let name = name();
593         let mut a = t!(NamedPipe::new(&name));
594 
595         let t = thread::spawn(move || {
596             let mut f = t!(OpenOptions::new().read(true).write(true).open(name));
597             t!(f.write_all(&[1, 2, 3]));
598             let mut b = [0; 10];
599             assert_eq!(t!(f.read(&mut b)), 3);
600             assert_eq!(&b[..3], &[1, 2, 3]);
601         });
602 
603         t!(a.connect());
604         let mut b = [0; 10];
605         assert_eq!(t!(a.read(&mut b)), 3);
606         assert_eq!(&b[..3], &[1, 2, 3]);
607         t!(a.write_all(&[1, 2, 3]));
608         t!(a.flush());
609         t!(a.disconnect());
610         t!(t.join());
611     }
612 
613     #[test]
614     fn named_read_write_multi() {
615         for _ in 0..5 {
616             named_read_write()
617         }
618     }
619 
620     #[test]
621     fn named_read_write_multi_same_thread() {
622         let name1 = name();
623         let mut a1 = t!(NamedPipe::new(&name1));
624         let name2 = name();
625         let mut a2 = t!(NamedPipe::new(&name2));
626 
627         let t = thread::spawn(move || {
628             let mut f = t!(OpenOptions::new().read(true).write(true).open(name1));
629             t!(f.write_all(&[1, 2, 3]));
630             let mut b = [0; 10];
631             assert_eq!(t!(f.read(&mut b)), 3);
632             assert_eq!(&b[..3], &[1, 2, 3]);
633 
634             let mut f = t!(OpenOptions::new().read(true).write(true).open(name2));
635             t!(f.write_all(&[1, 2, 3]));
636             let mut b = [0; 10];
637             assert_eq!(t!(f.read(&mut b)), 3);
638             assert_eq!(&b[..3], &[1, 2, 3]);
639         });
640 
641         t!(a1.connect());
642         let mut b = [0; 10];
643         assert_eq!(t!(a1.read(&mut b)), 3);
644         assert_eq!(&b[..3], &[1, 2, 3]);
645         t!(a1.write_all(&[1, 2, 3]));
646         t!(a1.flush());
647         t!(a1.disconnect());
648 
649         t!(a2.connect());
650         let mut b = [0; 10];
651         assert_eq!(t!(a2.read(&mut b)), 3);
652         assert_eq!(&b[..3], &[1, 2, 3]);
653         t!(a2.write_all(&[1, 2, 3]));
654         t!(a2.flush());
655         t!(a2.disconnect());
656 
657         t!(t.join());
658     }
659 
660     #[test]
661     fn named_read_overlapped() {
662         let name = name();
663         let a = t!(NamedPipe::new(&name));
664 
665         let t = thread::spawn(move || {
666             let mut f = t!(File::create(name));
667             t!(f.write_all(&[1, 2, 3]));
668         });
669 
670         let cp = t!(CompletionPort::new(1));
671         t!(cp.add_handle(3, &a));
672         t!(a.connect());
673 
674         let mut b = [0; 10];
675         let over = Overlapped::zero();
676         unsafe {
677             t!(a.read_overlapped(&mut b, over.raw()));
678         }
679         let status = t!(cp.get(None));
680         assert_eq!(status.bytes_transferred(), 3);
681         assert_eq!(status.token(), 3);
682         assert_eq!(status.overlapped(), over.raw());
683         assert_eq!(&b[..3], &[1, 2, 3]);
684 
685         t!(t.join());
686     }
687 
688     #[test]
689     fn named_write_overlapped() {
690         let name = name();
691         let a = t!(NamedPipe::new(&name));
692 
693         let t = thread::spawn(move || {
694             let mut f = t!(super::connect(name));
695             let mut b = [0; 10];
696             assert_eq!(t!(f.read(&mut b)), 3);
697             assert_eq!(&b[..3], &[1, 2, 3])
698         });
699 
700         let cp = t!(CompletionPort::new(1));
701         t!(cp.add_handle(3, &a));
702         t!(a.connect());
703 
704         let over = Overlapped::zero();
705         unsafe {
706             t!(a.write_overlapped(&[1, 2, 3], over.raw()));
707         }
708 
709         let status = t!(cp.get(None));
710         assert_eq!(status.bytes_transferred(), 3);
711         assert_eq!(status.token(), 3);
712         assert_eq!(status.overlapped(), over.raw());
713 
714         t!(t.join());
715     }
716 }
717