1 //! Interprocess Communication pipes
2 //!
3 //! A pipe is a section of shared memory that processes use for communication.
4 //! The process that creates a pipe is the _pipe server_. A process that connects
5 //! to a pipe is a _pipe client_. One process writes information to the pipe, then
6 //! the other process reads the information from the pipe. This overview
7 //! describes how to create, manage, and use pipes.
8 //!
9 //! There are two types of pipes: [anonymous pipes](#fn.anonymous.html) and
10 //! [named pipes](#fn.named.html). Anonymous pipes require less overhead than
11 //! named pipes, but offer limited services.
12 //!
13 //! # Anonymous pipes
14 //!
15 //! An anonymous pipe is an unnamed, one-way pipe that typically transfers data
16 //! between a parent process and a child process. Anonymous pipes are always
17 //! local; they cannot be used for communication over a network.
18 //!
19 //! # Named pipes
20 //!
21 //! A *named pipe* is a named, one-way or duplex pipe for communication between
22 //! the pipe server and one or more pipe clients. All instances of a named pipe
23 //! share the same pipe name, but each instance has its own buffers and handles,
24 //! and provides a separate conduit for client/server communication. The use of
25 //! instances enables multiple pipe clients to use the same named pipe
26 //! simultaneously.
27 //!
28 //! Any process can access named pipes, subject to security checks, making named
29 //! pipes an easy form of communication between related or unrelated processes.
30 //!
31 //! Any process can act as both a server and a client, making peer-to-peer
32 //! communication possible. As used here, the term pipe server refers to a
33 //! process that creates a named pipe, and the term pipe client refers to a
34 //! process that connects to an instance of a named pipe.
35 //!
36 //! Named pipes can be used to provide communication between processes on the
37 //! same computer or between processes on different computers across a network.
38 //! If the server service is running, all named pipes are accessible remotely. If
39 //! you intend to use a named pipe locally only, deny access to NT
40 //! AUTHORITY\\NETWORK or switch to local RPC.
41 //!
42 //! # References
43 //!
44 //! - [win32 pipe docs](https://github.com/MicrosoftDocs/win32/blob/docs/desktop-src/ipc/pipes.md)
45 
46 use std::cell::RefCell;
47 use std::ffi::OsStr;
48 use std::fs::{File, OpenOptions};
49 use std::io;
50 use std::io::prelude::*;
51 use std::os::windows::ffi::*;
52 use std::os::windows::io::*;
53 use std::time::Duration;
54 
55 use crate::handle::Handle;
56 use crate::overlapped::Overlapped;
57 use winapi::shared::minwindef::*;
58 use winapi::shared::ntdef::HANDLE;
59 use winapi::shared::winerror::*;
60 use winapi::um::fileapi::*;
61 use winapi::um::handleapi::*;
62 use winapi::um::ioapiset::*;
63 use winapi::um::minwinbase::*;
64 use winapi::um::namedpipeapi::*;
65 use winapi::um::winbase::*;
66 
67 /// Readable half of an anonymous pipe.
68 #[derive(Debug)]
69 pub struct AnonRead(Handle);
70 
71 /// Writable half of an anonymous pipe.
72 #[derive(Debug)]
73 pub struct AnonWrite(Handle);
74 
75 /// A named pipe that can accept connections.
76 #[derive(Debug)]
77 pub struct NamedPipe(Handle);
78 
79 /// A builder structure for creating a new named pipe.
80 #[derive(Debug)]
81 pub struct NamedPipeBuilder {
82     name: Vec<u16>,
83     dwOpenMode: DWORD,
84     dwPipeMode: DWORD,
85     nMaxInstances: DWORD,
86     nOutBufferSize: DWORD,
87     nInBufferSize: DWORD,
88     nDefaultTimeOut: DWORD,
89 }
90 
91 /// Creates a new anonymous in-memory pipe, returning the read/write ends of the
92 /// pipe.
93 ///
94 /// The buffer size for this pipe may also be specified, but the system will
95 /// normally use this as a suggestion and it's not guaranteed that the buffer
96 /// will be precisely this size.
anonymous(buffer_size: u32) -> io::Result<(AnonRead, AnonWrite)>97 pub fn anonymous(buffer_size: u32) -> io::Result<(AnonRead, AnonWrite)> {
98     let mut read = 0 as HANDLE;
99     let mut write = 0 as HANDLE;
100     crate::cvt(unsafe { CreatePipe(&mut read, &mut write, 0 as *mut _, buffer_size) })?;
101     Ok((AnonRead(Handle::new(read)), AnonWrite(Handle::new(write))))
102 }
103 
104 impl Read for AnonRead {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>105     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
106         self.0.read(buf)
107     }
108 }
109 impl<'a> Read for &'a AnonRead {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>110     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
111         self.0.read(buf)
112     }
113 }
114 
115 impl AsRawHandle for AnonRead {
as_raw_handle(&self) -> HANDLE116     fn as_raw_handle(&self) -> HANDLE {
117         self.0.raw()
118     }
119 }
120 impl FromRawHandle for AnonRead {
from_raw_handle(handle: HANDLE) -> AnonRead121     unsafe fn from_raw_handle(handle: HANDLE) -> AnonRead {
122         AnonRead(Handle::new(handle))
123     }
124 }
125 impl IntoRawHandle for AnonRead {
into_raw_handle(self) -> HANDLE126     fn into_raw_handle(self) -> HANDLE {
127         self.0.into_raw()
128     }
129 }
130 
131 impl Write for AnonWrite {
write(&mut self, buf: &[u8]) -> io::Result<usize>132     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
133         self.0.write(buf)
134     }
flush(&mut self) -> io::Result<()>135     fn flush(&mut self) -> io::Result<()> {
136         Ok(())
137     }
138 }
139 impl<'a> Write for &'a AnonWrite {
write(&mut self, buf: &[u8]) -> io::Result<usize>140     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
141         self.0.write(buf)
142     }
flush(&mut self) -> io::Result<()>143     fn flush(&mut self) -> io::Result<()> {
144         Ok(())
145     }
146 }
147 
148 impl AsRawHandle for AnonWrite {
as_raw_handle(&self) -> HANDLE149     fn as_raw_handle(&self) -> HANDLE {
150         self.0.raw()
151     }
152 }
153 impl FromRawHandle for AnonWrite {
from_raw_handle(handle: HANDLE) -> AnonWrite154     unsafe fn from_raw_handle(handle: HANDLE) -> AnonWrite {
155         AnonWrite(Handle::new(handle))
156     }
157 }
158 impl IntoRawHandle for AnonWrite {
into_raw_handle(self) -> HANDLE159     fn into_raw_handle(self) -> HANDLE {
160         self.0.into_raw()
161     }
162 }
163 
164 /// A convenience function to connect to a named pipe.
165 ///
166 /// This function will block the calling process until it can connect to the
167 /// pipe server specified by `addr`. This will use `NamedPipe::wait` internally
168 /// to block until it can connect.
connect<A: AsRef<OsStr>>(addr: A) -> io::Result<File>169 pub fn connect<A: AsRef<OsStr>>(addr: A) -> io::Result<File> {
170     _connect(addr.as_ref())
171 }
172 
_connect(addr: &OsStr) -> io::Result<File>173 fn _connect(addr: &OsStr) -> io::Result<File> {
174     let mut r = OpenOptions::new();
175     let mut w = OpenOptions::new();
176     let mut rw = OpenOptions::new();
177     r.read(true);
178     w.write(true);
179     rw.read(true).write(true);
180     loop {
181         let res = rw
182             .open(addr)
183             .or_else(|_| r.open(addr))
184             .or_else(|_| w.open(addr));
185         match res {
186             Ok(f) => return Ok(f),
187             Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => {}
188             Err(e) => return Err(e),
189         }
190 
191         NamedPipe::wait(addr, Some(Duration::new(20, 0)))?;
192     }
193 }
194 
195 impl NamedPipe {
196     /// Creates a new initial named pipe.
197     ///
198     /// This function is equivalent to:
199     ///
200     /// ```
201     /// use miow::pipe::NamedPipeBuilder;
202     ///
203     /// # let addr = "foo";
204     /// NamedPipeBuilder::new(addr)
205     ///                  .first(true)
206     ///                  .inbound(true)
207     ///                  .outbound(true)
208     ///                  .out_buffer_size(65536)
209     ///                  .in_buffer_size(65536)
210     ///                  .create();
211     /// ```
new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe>212     pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
213         NamedPipeBuilder::new(addr).create()
214     }
215 
216     /// Waits until either a time-out interval elapses or an instance of the
217     /// specified named pipe is available for connection.
218     ///
219     /// If this function succeeds the process can create a `File` to connect to
220     /// the named pipe.
wait<A: AsRef<OsStr>>(addr: A, timeout: Option<Duration>) -> io::Result<()>221     pub fn wait<A: AsRef<OsStr>>(addr: A, timeout: Option<Duration>) -> io::Result<()> {
222         NamedPipe::_wait(addr.as_ref(), timeout)
223     }
224 
_wait(addr: &OsStr, timeout: Option<Duration>) -> io::Result<()>225     fn _wait(addr: &OsStr, timeout: Option<Duration>) -> io::Result<()> {
226         let addr = addr.encode_wide().chain(Some(0)).collect::<Vec<_>>();
227         let timeout = crate::dur2ms(timeout);
228         crate::cvt(unsafe { WaitNamedPipeW(addr.as_ptr(), timeout) }).map(|_| ())
229     }
230 
231     /// Connects this named pipe to a client, blocking until one becomes
232     /// available.
233     ///
234     /// This function will call the `ConnectNamedPipe` function to await for a
235     /// client to connect. This can be called immediately after the pipe is
236     /// created, or after it has been disconnected from a previous client.
connect(&self) -> io::Result<()>237     pub fn connect(&self) -> io::Result<()> {
238         match crate::cvt(unsafe { ConnectNamedPipe(self.0.raw(), 0 as *mut _) }) {
239             Ok(_) => Ok(()),
240             Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(()),
241             Err(e) => Err(e),
242         }
243     }
244 
245     /// Issue a connection request with the specified overlapped operation.
246     ///
247     /// This function will issue a request to connect a client to this server,
248     /// returning immediately after starting the overlapped operation.
249     ///
250     /// If this function immediately succeeds then `Ok(true)` is returned. If
251     /// the overlapped operation is enqueued and pending, then `Ok(false)` is
252     /// returned. Otherwise an error is returned indicating what went wrong.
253     ///
254     /// # Unsafety
255     ///
256     /// This function is unsafe because the kernel requires that the
257     /// `overlapped` pointer is valid until the end of the I/O operation. The
258     /// kernel also requires that `overlapped` is unique for this I/O operation
259     /// and is not in use for any other I/O.
260     ///
261     /// To safely use this function callers must ensure that this pointer is
262     /// valid until the I/O operation is completed, typically via completion
263     /// ports and waiting to receive the completion notification on the port.
connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool>264     pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool> {
265         match crate::cvt(ConnectNamedPipe(self.0.raw(), overlapped)) {
266             Ok(_) => Ok(true),
267             Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(true),
268             Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => Ok(false),
269             Err(ref e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => Ok(true),
270             Err(e) => Err(e),
271         }
272     }
273 
274     /// Disconnects this named pipe from any connected client.
disconnect(&self) -> io::Result<()>275     pub fn disconnect(&self) -> io::Result<()> {
276         crate::cvt(unsafe { DisconnectNamedPipe(self.0.raw()) }).map(|_| ())
277     }
278 
279     /// Issues an overlapped read operation to occur on this pipe.
280     ///
281     /// This function will issue an asynchronous read to occur in an overlapped
282     /// fashion, returning immediately. The `buf` provided will be filled in
283     /// with data and the request is tracked by the `overlapped` function
284     /// provided.
285     ///
286     /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
287     /// `n` is the number of bytes read. If an asynchronous operation is
288     /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
289     /// it is returned.
290     ///
291     /// When this operation completes (or if it completes immediately), another
292     /// mechanism must be used to learn how many bytes were transferred (such as
293     /// looking at the filed in the IOCP status message).
294     ///
295     /// # Unsafety
296     ///
297     /// This function is unsafe because the kernel requires that the `buf` and
298     /// `overlapped` pointers to be valid until the end of the I/O operation.
299     /// The kernel also requires that `overlapped` is unique for this I/O
300     /// operation and is not in use for any other I/O.
301     ///
302     /// To safely use this function callers must ensure that the pointers are
303     /// valid until the I/O operation is completed, typically via completion
304     /// ports and waiting to receive the completion notification on the port.
read_overlapped( &self, buf: &mut [u8], overlapped: *mut OVERLAPPED, ) -> io::Result<Option<usize>>305     pub unsafe fn read_overlapped(
306         &self,
307         buf: &mut [u8],
308         overlapped: *mut OVERLAPPED,
309     ) -> io::Result<Option<usize>> {
310         self.0.read_overlapped(buf, overlapped)
311     }
312 
313     /// Issues an overlapped write operation to occur on this pipe.
314     ///
315     /// This function will issue an asynchronous write to occur in an overlapped
316     /// fashion, returning immediately. The `buf` provided will be filled in
317     /// with data and the request is tracked by the `overlapped` function
318     /// provided.
319     ///
320     /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
321     /// `n` is the number of bytes written. If an asynchronous operation is
322     /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
323     /// it is returned.
324     ///
325     /// When this operation completes (or if it completes immediately), another
326     /// mechanism must be used to learn how many bytes were transferred (such as
327     /// looking at the filed in the IOCP status message).
328     ///
329     /// # Unsafety
330     ///
331     /// This function is unsafe because the kernel requires that the `buf` and
332     /// `overlapped` pointers to be valid until the end of the I/O operation.
333     /// The kernel also requires that `overlapped` is unique for this I/O
334     /// operation and is not in use for any other I/O.
335     ///
336     /// To safely use this function callers must ensure that the pointers are
337     /// valid until the I/O operation is completed, typically via completion
338     /// ports and waiting to receive the completion notification on the port.
write_overlapped( &self, buf: &[u8], overlapped: *mut OVERLAPPED, ) -> io::Result<Option<usize>>339     pub unsafe fn write_overlapped(
340         &self,
341         buf: &[u8],
342         overlapped: *mut OVERLAPPED,
343     ) -> io::Result<Option<usize>> {
344         self.0.write_overlapped(buf, overlapped)
345     }
346 
347     /// Calls the `GetOverlappedResult` function to get the result of an
348     /// overlapped operation for this handle.
349     ///
350     /// This function takes the `OVERLAPPED` argument which must have been used
351     /// to initiate an overlapped I/O operation, and returns either the
352     /// successful number of bytes transferred during the operation or an error
353     /// if one occurred.
354     ///
355     /// # Unsafety
356     ///
357     /// This function is unsafe as `overlapped` must have previously been used
358     /// to execute an operation for this handle, and it must also be a valid
359     /// pointer to an `Overlapped` instance.
360     ///
361     /// # Panics
362     ///
363     /// This function will panic
result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize>364     pub unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize> {
365         let mut transferred = 0;
366         let r = GetOverlappedResult(self.0.raw(), overlapped, &mut transferred, FALSE);
367         if r == 0 {
368             Err(io::Error::last_os_error())
369         } else {
370             Ok(transferred as usize)
371         }
372     }
373 }
374 
375 thread_local! {
376     static NAMED_PIPE_OVERLAPPED: RefCell<Option<Overlapped>> = RefCell::new(None);
377 }
378 
379 /// Call a function with a threadlocal `Overlapped`.  The function `f` should be
380 /// sure that the event is reset, either manually or by a thread being released.
with_threadlocal_overlapped<F>(f: F) -> io::Result<usize> where F: FnOnce(&Overlapped) -> io::Result<usize>,381 fn with_threadlocal_overlapped<F>(f: F) -> io::Result<usize>
382 where
383     F: FnOnce(&Overlapped) -> io::Result<usize>,
384 {
385     NAMED_PIPE_OVERLAPPED.with(|overlapped| {
386         let mut mborrow = overlapped.borrow_mut();
387         if let None = *mborrow {
388             let op = Overlapped::initialize_with_autoreset_event()?;
389             *mborrow = Some(op);
390         }
391         f(mborrow.as_ref().unwrap())
392     })
393 }
394 
395 impl Read for NamedPipe {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>396     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
397         // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
398         with_threadlocal_overlapped(|overlapped| unsafe {
399             self.0
400                 .read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
401         })
402     }
403 }
404 impl<'a> Read for &'a NamedPipe {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>405     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
406         // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
407         with_threadlocal_overlapped(|overlapped| unsafe {
408             self.0
409                 .read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
410         })
411     }
412 }
413 
414 impl Write for NamedPipe {
write(&mut self, buf: &[u8]) -> io::Result<usize>415     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
416         // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
417         with_threadlocal_overlapped(|overlapped| unsafe {
418             self.0
419                 .write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
420         })
421     }
flush(&mut self) -> io::Result<()>422     fn flush(&mut self) -> io::Result<()> {
423         <&NamedPipe as Write>::flush(&mut &*self)
424     }
425 }
426 impl<'a> Write for &'a NamedPipe {
write(&mut self, buf: &[u8]) -> io::Result<usize>427     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
428         // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
429         with_threadlocal_overlapped(|overlapped| unsafe {
430             self.0
431                 .write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
432         })
433     }
flush(&mut self) -> io::Result<()>434     fn flush(&mut self) -> io::Result<()> {
435         crate::cvt(unsafe { FlushFileBuffers(self.0.raw()) }).map(|_| ())
436     }
437 }
438 
439 impl AsRawHandle for NamedPipe {
as_raw_handle(&self) -> HANDLE440     fn as_raw_handle(&self) -> HANDLE {
441         self.0.raw()
442     }
443 }
444 impl FromRawHandle for NamedPipe {
from_raw_handle(handle: HANDLE) -> NamedPipe445     unsafe fn from_raw_handle(handle: HANDLE) -> NamedPipe {
446         NamedPipe(Handle::new(handle))
447     }
448 }
449 impl IntoRawHandle for NamedPipe {
into_raw_handle(self) -> HANDLE450     fn into_raw_handle(self) -> HANDLE {
451         self.0.into_raw()
452     }
453 }
454 
flag(slot: &mut DWORD, on: bool, val: DWORD)455 fn flag(slot: &mut DWORD, on: bool, val: DWORD) {
456     if on {
457         *slot |= val;
458     } else {
459         *slot &= !val;
460     }
461 }
462 
463 impl NamedPipeBuilder {
464     /// Creates a new named pipe builder with the default settings.
new<A: AsRef<OsStr>>(addr: A) -> NamedPipeBuilder465     pub fn new<A: AsRef<OsStr>>(addr: A) -> NamedPipeBuilder {
466         NamedPipeBuilder {
467             name: addr.as_ref().encode_wide().chain(Some(0)).collect(),
468             dwOpenMode: PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
469             dwPipeMode: PIPE_TYPE_BYTE,
470             nMaxInstances: PIPE_UNLIMITED_INSTANCES,
471             nOutBufferSize: 65536,
472             nInBufferSize: 65536,
473             nDefaultTimeOut: 0,
474         }
475     }
476 
477     /// Indicates whether data is allowed to flow from the client to the server.
inbound(&mut self, allowed: bool) -> &mut Self478     pub fn inbound(&mut self, allowed: bool) -> &mut Self {
479         flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_INBOUND);
480         self
481     }
482 
483     /// Indicates whether data is allowed to flow from the server to the client.
outbound(&mut self, allowed: bool) -> &mut Self484     pub fn outbound(&mut self, allowed: bool) -> &mut Self {
485         flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_OUTBOUND);
486         self
487     }
488 
489     /// Indicates that this pipe must be the first instance.
490     ///
491     /// If set to true, then creation will fail if there's already an instance
492     /// elsewhere.
first(&mut self, first: bool) -> &mut Self493     pub fn first(&mut self, first: bool) -> &mut Self {
494         flag(&mut self.dwOpenMode, first, FILE_FLAG_FIRST_PIPE_INSTANCE);
495         self
496     }
497 
498     /// Indicates whether this server can accept remote clients or not.
accept_remote(&mut self, accept: bool) -> &mut Self499     pub fn accept_remote(&mut self, accept: bool) -> &mut Self {
500         flag(&mut self.dwPipeMode, !accept, PIPE_REJECT_REMOTE_CLIENTS);
501         self
502     }
503 
504     /// Specifies the maximum number of instances of the server pipe that are
505     /// allowed.
506     ///
507     /// The first instance of a pipe can specify this value. A value of 255
508     /// indicates that there is no limit to the number of instances.
max_instances(&mut self, instances: u8) -> &mut Self509     pub fn max_instances(&mut self, instances: u8) -> &mut Self {
510         self.nMaxInstances = instances as DWORD;
511         self
512     }
513 
514     /// Specifies the number of bytes to reserver for the output buffer
out_buffer_size(&mut self, buffer: u32) -> &mut Self515     pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
516         self.nOutBufferSize = buffer as DWORD;
517         self
518     }
519 
520     /// Specifies the number of bytes to reserver for the input buffer
in_buffer_size(&mut self, buffer: u32) -> &mut Self521     pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
522         self.nInBufferSize = buffer as DWORD;
523         self
524     }
525 
526     /// Using the options in this builder, attempt to create a new named pipe.
527     ///
528     /// This function will call the `CreateNamedPipe` function and return the
529     /// result.
create(&mut self) -> io::Result<NamedPipe>530     pub fn create(&mut self) -> io::Result<NamedPipe> {
531         unsafe { self.with_security_attributes(::std::ptr::null_mut()) }
532     }
533 
534     /// Using the options in the builder and the provided security attributes, attempt to create a
535     /// new named pipe. This function has to be called with a valid pointer to a
536     /// `SECURITY_ATTRIBUTES` struct that will stay valid for the lifetime of this function or a
537     /// null pointer.
538     ///
539     /// This function will call the `CreateNamedPipe` function and return the
540     /// result.
with_security_attributes( &mut self, attrs: *mut SECURITY_ATTRIBUTES, ) -> io::Result<NamedPipe>541     pub unsafe fn with_security_attributes(
542         &mut self,
543         attrs: *mut SECURITY_ATTRIBUTES,
544     ) -> io::Result<NamedPipe> {
545         let h = CreateNamedPipeW(
546             self.name.as_ptr(),
547             self.dwOpenMode,
548             self.dwPipeMode,
549             self.nMaxInstances,
550             self.nOutBufferSize,
551             self.nInBufferSize,
552             self.nDefaultTimeOut,
553             attrs,
554         );
555 
556         if h == INVALID_HANDLE_VALUE {
557             Err(io::Error::last_os_error())
558         } else {
559             Ok(NamedPipe(Handle::new(h)))
560         }
561     }
562 }
563 
564 #[cfg(test)]
565 mod tests {
566     use std::fs::{File, OpenOptions};
567     use std::io::prelude::*;
568     use std::sync::mpsc::channel;
569     use std::thread;
570     use std::time::Duration;
571 
572     use rand::{distributions::Alphanumeric, thread_rng, Rng};
573 
574     use super::{anonymous, NamedPipe, NamedPipeBuilder};
575     use crate::iocp::CompletionPort;
576     use crate::Overlapped;
577 
name() -> String578     fn name() -> String {
579         let name = thread_rng()
580             .sample_iter(Alphanumeric)
581             .take(30)
582             .map(char::from)
583             .collect::<String>();
584         format!(r"\\.\pipe\{}", name)
585     }
586 
587     #[test]
anon()588     fn anon() {
589         let (mut read, mut write) = t!(anonymous(256));
590         assert_eq!(t!(write.write(&[1, 2, 3])), 3);
591         let mut b = [0; 10];
592         assert_eq!(t!(read.read(&mut b)), 3);
593         assert_eq!(&b[..3], &[1, 2, 3]);
594     }
595 
596     #[test]
named_not_first()597     fn named_not_first() {
598         let name = name();
599         let _a = t!(NamedPipe::new(&name));
600         assert!(NamedPipe::new(&name).is_err());
601 
602         t!(NamedPipeBuilder::new(&name).first(false).create());
603     }
604 
605     #[test]
named_connect()606     fn named_connect() {
607         let name = name();
608         let a = t!(NamedPipe::new(&name));
609 
610         let t = thread::spawn(move || {
611             t!(File::open(name));
612         });
613 
614         t!(a.connect());
615         t!(a.disconnect());
616         t!(t.join());
617     }
618 
619     #[test]
named_wait()620     fn named_wait() {
621         let name = name();
622         let a = t!(NamedPipe::new(&name));
623 
624         let (tx, rx) = channel();
625         let t = thread::spawn(move || {
626             t!(NamedPipe::wait(&name, None));
627             t!(File::open(&name));
628             assert!(NamedPipe::wait(&name, Some(Duration::from_millis(1))).is_err());
629             t!(tx.send(()));
630         });
631 
632         t!(a.connect());
633         t!(rx.recv());
634         t!(a.disconnect());
635         t!(t.join());
636     }
637 
638     #[test]
named_connect_overlapped()639     fn named_connect_overlapped() {
640         let name = name();
641         let a = t!(NamedPipe::new(&name));
642 
643         let t = thread::spawn(move || {
644             t!(File::open(name));
645         });
646 
647         let cp = t!(CompletionPort::new(1));
648         t!(cp.add_handle(2, &a));
649 
650         let over = Overlapped::zero();
651         unsafe {
652             t!(a.connect_overlapped(over.raw()));
653         }
654 
655         let status = t!(cp.get(None));
656         assert_eq!(status.bytes_transferred(), 0);
657         assert_eq!(status.token(), 2);
658         assert_eq!(status.overlapped(), over.raw());
659         t!(t.join());
660     }
661 
662     #[test]
named_read_write()663     fn named_read_write() {
664         let name = name();
665         let mut a = t!(NamedPipe::new(&name));
666 
667         let t = thread::spawn(move || {
668             let mut f = t!(OpenOptions::new().read(true).write(true).open(name));
669             t!(f.write_all(&[1, 2, 3]));
670             let mut b = [0; 10];
671             assert_eq!(t!(f.read(&mut b)), 3);
672             assert_eq!(&b[..3], &[1, 2, 3]);
673         });
674 
675         t!(a.connect());
676         let mut b = [0; 10];
677         assert_eq!(t!(a.read(&mut b)), 3);
678         assert_eq!(&b[..3], &[1, 2, 3]);
679         t!(a.write_all(&[1, 2, 3]));
680         t!(a.flush());
681         t!(a.disconnect());
682         t!(t.join());
683     }
684 
685     #[test]
named_read_write_multi()686     fn named_read_write_multi() {
687         for _ in 0..5 {
688             named_read_write()
689         }
690     }
691 
692     #[test]
named_read_write_multi_same_thread()693     fn named_read_write_multi_same_thread() {
694         let name1 = name();
695         let mut a1 = t!(NamedPipe::new(&name1));
696         let name2 = name();
697         let mut a2 = t!(NamedPipe::new(&name2));
698 
699         let t = thread::spawn(move || {
700             let mut f = t!(OpenOptions::new().read(true).write(true).open(name1));
701             t!(f.write_all(&[1, 2, 3]));
702             let mut b = [0; 10];
703             assert_eq!(t!(f.read(&mut b)), 3);
704             assert_eq!(&b[..3], &[1, 2, 3]);
705 
706             let mut f = t!(OpenOptions::new().read(true).write(true).open(name2));
707             t!(f.write_all(&[1, 2, 3]));
708             let mut b = [0; 10];
709             assert_eq!(t!(f.read(&mut b)), 3);
710             assert_eq!(&b[..3], &[1, 2, 3]);
711         });
712 
713         t!(a1.connect());
714         let mut b = [0; 10];
715         assert_eq!(t!(a1.read(&mut b)), 3);
716         assert_eq!(&b[..3], &[1, 2, 3]);
717         t!(a1.write_all(&[1, 2, 3]));
718         t!(a1.flush());
719         t!(a1.disconnect());
720 
721         t!(a2.connect());
722         let mut b = [0; 10];
723         assert_eq!(t!(a2.read(&mut b)), 3);
724         assert_eq!(&b[..3], &[1, 2, 3]);
725         t!(a2.write_all(&[1, 2, 3]));
726         t!(a2.flush());
727         t!(a2.disconnect());
728 
729         t!(t.join());
730     }
731 
732     #[test]
named_read_overlapped()733     fn named_read_overlapped() {
734         let name = name();
735         let a = t!(NamedPipe::new(&name));
736 
737         let t = thread::spawn(move || {
738             let mut f = t!(File::create(name));
739             t!(f.write_all(&[1, 2, 3]));
740         });
741 
742         let cp = t!(CompletionPort::new(1));
743         t!(cp.add_handle(3, &a));
744         t!(a.connect());
745 
746         let mut b = [0; 10];
747         let over = Overlapped::zero();
748         unsafe {
749             t!(a.read_overlapped(&mut b, over.raw()));
750         }
751         let status = t!(cp.get(None));
752         assert_eq!(status.bytes_transferred(), 3);
753         assert_eq!(status.token(), 3);
754         assert_eq!(status.overlapped(), over.raw());
755         assert_eq!(&b[..3], &[1, 2, 3]);
756 
757         t!(t.join());
758     }
759 
760     #[test]
named_write_overlapped()761     fn named_write_overlapped() {
762         let name = name();
763         let a = t!(NamedPipe::new(&name));
764 
765         let t = thread::spawn(move || {
766             let mut f = t!(super::connect(name));
767             let mut b = [0; 10];
768             assert_eq!(t!(f.read(&mut b)), 3);
769             assert_eq!(&b[..3], &[1, 2, 3])
770         });
771 
772         let cp = t!(CompletionPort::new(1));
773         t!(cp.add_handle(3, &a));
774         t!(a.connect());
775 
776         let over = Overlapped::zero();
777         unsafe {
778             t!(a.write_overlapped(&[1, 2, 3], over.raw()));
779         }
780 
781         let status = t!(cp.get(None));
782         assert_eq!(status.bytes_transferred(), 3);
783         assert_eq!(status.token(), 3);
784         assert_eq!(status.overlapped(), over.raw());
785 
786         t!(t.join());
787     }
788 }
789