1 //! Interprocess Communication pipes
2 //!
3 //! A pipe is a section of shared memory that processes use for communication.
4 //! The process that creates a pipe is the _pipe server_. A process that connects
5 //! to a pipe is a _pipe client_. One process writes information to the pipe, then
6 //! the other process reads the information from the pipe. This overview
7 //! describes how to create, manage, and use pipes.
8 //!
9 //! There are two types of pipes: [anonymous pipes](#fn.anonymous.html) and
10 //! [named pipes](#fn.named.html). Anonymous pipes require less overhead than
11 //! named pipes, but offer limited services.
12 //!
13 //! # Anonymous pipes
14 //!
15 //! An anonymous pipe is an unnamed, one-way pipe that typically transfers data
16 //! between a parent process and a child process. Anonymous pipes are always
17 //! local; they cannot be used for communication over a network.
18 //!
19 //! # Named pipes
20 //!
21 //! A *named pipe* is a named, one-way or duplex pipe for communication between
22 //! the pipe server and one or more pipe clients. All instances of a named pipe
23 //! share the same pipe name, but each instance has its own buffers and handles,
24 //! and provides a separate conduit for client/server communication. The use of
25 //! instances enables multiple pipe clients to use the same named pipe
26 //! simultaneously.
27 //!
28 //! Any process can access named pipes, subject to security checks, making named
29 //! pipes an easy form of communication between related or unrelated processes.
30 //!
31 //! Any process can act as both a server and a client, making peer-to-peer
32 //! communication possible. As used here, the term pipe server refers to a
33 //! process that creates a named pipe, and the term pipe client refers to a
34 //! process that connects to an instance of a named pipe.
35 //!
36 //! Named pipes can be used to provide communication between processes on the
37 //! same computer or between processes on different computers across a network.
38 //! If the server service is running, all named pipes are accessible remotely. If
39 //! you intend to use a named pipe locally only, deny access to NT
40 //! AUTHORITY\\NETWORK or switch to local RPC.
41 //!
42 //! # References
43 //!
44 //! - [win32 pipe docs](https://github.com/MicrosoftDocs/win32/blob/docs/desktop-src/ipc/pipes.md)
45
46 use std::cell::RefCell;
47 use std::ffi::OsStr;
48 use std::fs::{File, OpenOptions};
49 use std::io;
50 use std::io::prelude::*;
51 use std::os::windows::ffi::*;
52 use std::os::windows::io::*;
53 use std::time::Duration;
54
55 use crate::handle::Handle;
56 use crate::overlapped::Overlapped;
57 use winapi::shared::minwindef::*;
58 use winapi::shared::ntdef::HANDLE;
59 use winapi::shared::winerror::*;
60 use winapi::um::fileapi::*;
61 use winapi::um::handleapi::*;
62 use winapi::um::ioapiset::*;
63 use winapi::um::minwinbase::*;
64 use winapi::um::namedpipeapi::*;
65 use winapi::um::winbase::*;
66
67 /// Readable half of an anonymous pipe.
68 #[derive(Debug)]
69 pub struct AnonRead(Handle);
70
71 /// Writable half of an anonymous pipe.
72 #[derive(Debug)]
73 pub struct AnonWrite(Handle);
74
75 /// A named pipe that can accept connections.
76 #[derive(Debug)]
77 pub struct NamedPipe(Handle);
78
79 /// A builder structure for creating a new named pipe.
80 #[derive(Debug)]
81 pub struct NamedPipeBuilder {
82 name: Vec<u16>,
83 dwOpenMode: DWORD,
84 dwPipeMode: DWORD,
85 nMaxInstances: DWORD,
86 nOutBufferSize: DWORD,
87 nInBufferSize: DWORD,
88 nDefaultTimeOut: DWORD,
89 }
90
91 /// Creates a new anonymous in-memory pipe, returning the read/write ends of the
92 /// pipe.
93 ///
94 /// The buffer size for this pipe may also be specified, but the system will
95 /// normally use this as a suggestion and it's not guaranteed that the buffer
96 /// will be precisely this size.
anonymous(buffer_size: u32) -> io::Result<(AnonRead, AnonWrite)>97 pub fn anonymous(buffer_size: u32) -> io::Result<(AnonRead, AnonWrite)> {
98 let mut read = 0 as HANDLE;
99 let mut write = 0 as HANDLE;
100 crate::cvt(unsafe { CreatePipe(&mut read, &mut write, 0 as *mut _, buffer_size) })?;
101 Ok((AnonRead(Handle::new(read)), AnonWrite(Handle::new(write))))
102 }
103
104 impl Read for AnonRead {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>105 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
106 self.0.read(buf)
107 }
108 }
109 impl<'a> Read for &'a AnonRead {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>110 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
111 self.0.read(buf)
112 }
113 }
114
115 impl AsRawHandle for AnonRead {
as_raw_handle(&self) -> HANDLE116 fn as_raw_handle(&self) -> HANDLE {
117 self.0.raw()
118 }
119 }
120 impl FromRawHandle for AnonRead {
from_raw_handle(handle: HANDLE) -> AnonRead121 unsafe fn from_raw_handle(handle: HANDLE) -> AnonRead {
122 AnonRead(Handle::new(handle))
123 }
124 }
125 impl IntoRawHandle for AnonRead {
into_raw_handle(self) -> HANDLE126 fn into_raw_handle(self) -> HANDLE {
127 self.0.into_raw()
128 }
129 }
130
131 impl Write for AnonWrite {
write(&mut self, buf: &[u8]) -> io::Result<usize>132 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
133 self.0.write(buf)
134 }
flush(&mut self) -> io::Result<()>135 fn flush(&mut self) -> io::Result<()> {
136 Ok(())
137 }
138 }
139 impl<'a> Write for &'a AnonWrite {
write(&mut self, buf: &[u8]) -> io::Result<usize>140 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
141 self.0.write(buf)
142 }
flush(&mut self) -> io::Result<()>143 fn flush(&mut self) -> io::Result<()> {
144 Ok(())
145 }
146 }
147
148 impl AsRawHandle for AnonWrite {
as_raw_handle(&self) -> HANDLE149 fn as_raw_handle(&self) -> HANDLE {
150 self.0.raw()
151 }
152 }
153 impl FromRawHandle for AnonWrite {
from_raw_handle(handle: HANDLE) -> AnonWrite154 unsafe fn from_raw_handle(handle: HANDLE) -> AnonWrite {
155 AnonWrite(Handle::new(handle))
156 }
157 }
158 impl IntoRawHandle for AnonWrite {
into_raw_handle(self) -> HANDLE159 fn into_raw_handle(self) -> HANDLE {
160 self.0.into_raw()
161 }
162 }
163
164 /// A convenience function to connect to a named pipe.
165 ///
166 /// This function will block the calling process until it can connect to the
167 /// pipe server specified by `addr`. This will use `NamedPipe::wait` internally
168 /// to block until it can connect.
connect<A: AsRef<OsStr>>(addr: A) -> io::Result<File>169 pub fn connect<A: AsRef<OsStr>>(addr: A) -> io::Result<File> {
170 _connect(addr.as_ref())
171 }
172
_connect(addr: &OsStr) -> io::Result<File>173 fn _connect(addr: &OsStr) -> io::Result<File> {
174 let mut r = OpenOptions::new();
175 let mut w = OpenOptions::new();
176 let mut rw = OpenOptions::new();
177 r.read(true);
178 w.write(true);
179 rw.read(true).write(true);
180 loop {
181 let res = rw
182 .open(addr)
183 .or_else(|_| r.open(addr))
184 .or_else(|_| w.open(addr));
185 match res {
186 Ok(f) => return Ok(f),
187 Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => {}
188 Err(e) => return Err(e),
189 }
190
191 NamedPipe::wait(addr, Some(Duration::new(20, 0)))?;
192 }
193 }
194
195 impl NamedPipe {
196 /// Creates a new initial named pipe.
197 ///
198 /// This function is equivalent to:
199 ///
200 /// ```
201 /// use miow::pipe::NamedPipeBuilder;
202 ///
203 /// # let addr = "foo";
204 /// NamedPipeBuilder::new(addr)
205 /// .first(true)
206 /// .inbound(true)
207 /// .outbound(true)
208 /// .out_buffer_size(65536)
209 /// .in_buffer_size(65536)
210 /// .create();
211 /// ```
new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe>212 pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
213 NamedPipeBuilder::new(addr).create()
214 }
215
216 /// Waits until either a time-out interval elapses or an instance of the
217 /// specified named pipe is available for connection.
218 ///
219 /// If this function succeeds the process can create a `File` to connect to
220 /// the named pipe.
wait<A: AsRef<OsStr>>(addr: A, timeout: Option<Duration>) -> io::Result<()>221 pub fn wait<A: AsRef<OsStr>>(addr: A, timeout: Option<Duration>) -> io::Result<()> {
222 NamedPipe::_wait(addr.as_ref(), timeout)
223 }
224
_wait(addr: &OsStr, timeout: Option<Duration>) -> io::Result<()>225 fn _wait(addr: &OsStr, timeout: Option<Duration>) -> io::Result<()> {
226 let addr = addr.encode_wide().chain(Some(0)).collect::<Vec<_>>();
227 let timeout = crate::dur2ms(timeout);
228 crate::cvt(unsafe { WaitNamedPipeW(addr.as_ptr(), timeout) }).map(|_| ())
229 }
230
231 /// Connects this named pipe to a client, blocking until one becomes
232 /// available.
233 ///
234 /// This function will call the `ConnectNamedPipe` function to await for a
235 /// client to connect. This can be called immediately after the pipe is
236 /// created, or after it has been disconnected from a previous client.
connect(&self) -> io::Result<()>237 pub fn connect(&self) -> io::Result<()> {
238 match crate::cvt(unsafe { ConnectNamedPipe(self.0.raw(), 0 as *mut _) }) {
239 Ok(_) => Ok(()),
240 Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(()),
241 Err(e) => Err(e),
242 }
243 }
244
245 /// Issue a connection request with the specified overlapped operation.
246 ///
247 /// This function will issue a request to connect a client to this server,
248 /// returning immediately after starting the overlapped operation.
249 ///
250 /// If this function immediately succeeds then `Ok(true)` is returned. If
251 /// the overlapped operation is enqueued and pending, then `Ok(false)` is
252 /// returned. Otherwise an error is returned indicating what went wrong.
253 ///
254 /// # Unsafety
255 ///
256 /// This function is unsafe because the kernel requires that the
257 /// `overlapped` pointer is valid until the end of the I/O operation. The
258 /// kernel also requires that `overlapped` is unique for this I/O operation
259 /// and is not in use for any other I/O.
260 ///
261 /// To safely use this function callers must ensure that this pointer is
262 /// valid until the I/O operation is completed, typically via completion
263 /// ports and waiting to receive the completion notification on the port.
connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool>264 pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool> {
265 match crate::cvt(ConnectNamedPipe(self.0.raw(), overlapped)) {
266 Ok(_) => Ok(true),
267 Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(true),
268 Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => Ok(false),
269 Err(ref e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => Ok(true),
270 Err(e) => Err(e),
271 }
272 }
273
274 /// Disconnects this named pipe from any connected client.
disconnect(&self) -> io::Result<()>275 pub fn disconnect(&self) -> io::Result<()> {
276 crate::cvt(unsafe { DisconnectNamedPipe(self.0.raw()) }).map(|_| ())
277 }
278
279 /// Issues an overlapped read operation to occur on this pipe.
280 ///
281 /// This function will issue an asynchronous read to occur in an overlapped
282 /// fashion, returning immediately. The `buf` provided will be filled in
283 /// with data and the request is tracked by the `overlapped` function
284 /// provided.
285 ///
286 /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
287 /// `n` is the number of bytes read. If an asynchronous operation is
288 /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
289 /// it is returned.
290 ///
291 /// When this operation completes (or if it completes immediately), another
292 /// mechanism must be used to learn how many bytes were transferred (such as
293 /// looking at the filed in the IOCP status message).
294 ///
295 /// # Unsafety
296 ///
297 /// This function is unsafe because the kernel requires that the `buf` and
298 /// `overlapped` pointers to be valid until the end of the I/O operation.
299 /// The kernel also requires that `overlapped` is unique for this I/O
300 /// operation and is not in use for any other I/O.
301 ///
302 /// To safely use this function callers must ensure that the pointers are
303 /// valid until the I/O operation is completed, typically via completion
304 /// ports and waiting to receive the completion notification on the port.
read_overlapped( &self, buf: &mut [u8], overlapped: *mut OVERLAPPED, ) -> io::Result<Option<usize>>305 pub unsafe fn read_overlapped(
306 &self,
307 buf: &mut [u8],
308 overlapped: *mut OVERLAPPED,
309 ) -> io::Result<Option<usize>> {
310 self.0.read_overlapped(buf, overlapped)
311 }
312
313 /// Issues an overlapped write operation to occur on this pipe.
314 ///
315 /// This function will issue an asynchronous write to occur in an overlapped
316 /// fashion, returning immediately. The `buf` provided will be filled in
317 /// with data and the request is tracked by the `overlapped` function
318 /// provided.
319 ///
320 /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
321 /// `n` is the number of bytes written. If an asynchronous operation is
322 /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
323 /// it is returned.
324 ///
325 /// When this operation completes (or if it completes immediately), another
326 /// mechanism must be used to learn how many bytes were transferred (such as
327 /// looking at the filed in the IOCP status message).
328 ///
329 /// # Unsafety
330 ///
331 /// This function is unsafe because the kernel requires that the `buf` and
332 /// `overlapped` pointers to be valid until the end of the I/O operation.
333 /// The kernel also requires that `overlapped` is unique for this I/O
334 /// operation and is not in use for any other I/O.
335 ///
336 /// To safely use this function callers must ensure that the pointers are
337 /// valid until the I/O operation is completed, typically via completion
338 /// ports and waiting to receive the completion notification on the port.
write_overlapped( &self, buf: &[u8], overlapped: *mut OVERLAPPED, ) -> io::Result<Option<usize>>339 pub unsafe fn write_overlapped(
340 &self,
341 buf: &[u8],
342 overlapped: *mut OVERLAPPED,
343 ) -> io::Result<Option<usize>> {
344 self.0.write_overlapped(buf, overlapped)
345 }
346
347 /// Calls the `GetOverlappedResult` function to get the result of an
348 /// overlapped operation for this handle.
349 ///
350 /// This function takes the `OVERLAPPED` argument which must have been used
351 /// to initiate an overlapped I/O operation, and returns either the
352 /// successful number of bytes transferred during the operation or an error
353 /// if one occurred.
354 ///
355 /// # Unsafety
356 ///
357 /// This function is unsafe as `overlapped` must have previously been used
358 /// to execute an operation for this handle, and it must also be a valid
359 /// pointer to an `Overlapped` instance.
360 ///
361 /// # Panics
362 ///
363 /// This function will panic
result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize>364 pub unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize> {
365 let mut transferred = 0;
366 let r = GetOverlappedResult(self.0.raw(), overlapped, &mut transferred, FALSE);
367 if r == 0 {
368 Err(io::Error::last_os_error())
369 } else {
370 Ok(transferred as usize)
371 }
372 }
373 }
374
375 thread_local! {
376 static NAMED_PIPE_OVERLAPPED: RefCell<Option<Overlapped>> = RefCell::new(None);
377 }
378
379 /// Call a function with a threadlocal `Overlapped`. The function `f` should be
380 /// sure that the event is reset, either manually or by a thread being released.
with_threadlocal_overlapped<F>(f: F) -> io::Result<usize> where F: FnOnce(&Overlapped) -> io::Result<usize>,381 fn with_threadlocal_overlapped<F>(f: F) -> io::Result<usize>
382 where
383 F: FnOnce(&Overlapped) -> io::Result<usize>,
384 {
385 NAMED_PIPE_OVERLAPPED.with(|overlapped| {
386 let mut mborrow = overlapped.borrow_mut();
387 if let None = *mborrow {
388 let op = Overlapped::initialize_with_autoreset_event()?;
389 *mborrow = Some(op);
390 }
391 f(mborrow.as_ref().unwrap())
392 })
393 }
394
395 impl Read for NamedPipe {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>396 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
397 // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
398 with_threadlocal_overlapped(|overlapped| unsafe {
399 self.0
400 .read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
401 })
402 }
403 }
404 impl<'a> Read for &'a NamedPipe {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>405 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
406 // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
407 with_threadlocal_overlapped(|overlapped| unsafe {
408 self.0
409 .read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
410 })
411 }
412 }
413
414 impl Write for NamedPipe {
write(&mut self, buf: &[u8]) -> io::Result<usize>415 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
416 // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
417 with_threadlocal_overlapped(|overlapped| unsafe {
418 self.0
419 .write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
420 })
421 }
flush(&mut self) -> io::Result<()>422 fn flush(&mut self) -> io::Result<()> {
423 <&NamedPipe as Write>::flush(&mut &*self)
424 }
425 }
426 impl<'a> Write for &'a NamedPipe {
write(&mut self, buf: &[u8]) -> io::Result<usize>427 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
428 // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
429 with_threadlocal_overlapped(|overlapped| unsafe {
430 self.0
431 .write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
432 })
433 }
flush(&mut self) -> io::Result<()>434 fn flush(&mut self) -> io::Result<()> {
435 crate::cvt(unsafe { FlushFileBuffers(self.0.raw()) }).map(|_| ())
436 }
437 }
438
439 impl AsRawHandle for NamedPipe {
as_raw_handle(&self) -> HANDLE440 fn as_raw_handle(&self) -> HANDLE {
441 self.0.raw()
442 }
443 }
444 impl FromRawHandle for NamedPipe {
from_raw_handle(handle: HANDLE) -> NamedPipe445 unsafe fn from_raw_handle(handle: HANDLE) -> NamedPipe {
446 NamedPipe(Handle::new(handle))
447 }
448 }
449 impl IntoRawHandle for NamedPipe {
into_raw_handle(self) -> HANDLE450 fn into_raw_handle(self) -> HANDLE {
451 self.0.into_raw()
452 }
453 }
454
flag(slot: &mut DWORD, on: bool, val: DWORD)455 fn flag(slot: &mut DWORD, on: bool, val: DWORD) {
456 if on {
457 *slot |= val;
458 } else {
459 *slot &= !val;
460 }
461 }
462
463 impl NamedPipeBuilder {
464 /// Creates a new named pipe builder with the default settings.
new<A: AsRef<OsStr>>(addr: A) -> NamedPipeBuilder465 pub fn new<A: AsRef<OsStr>>(addr: A) -> NamedPipeBuilder {
466 NamedPipeBuilder {
467 name: addr.as_ref().encode_wide().chain(Some(0)).collect(),
468 dwOpenMode: PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
469 dwPipeMode: PIPE_TYPE_BYTE,
470 nMaxInstances: PIPE_UNLIMITED_INSTANCES,
471 nOutBufferSize: 65536,
472 nInBufferSize: 65536,
473 nDefaultTimeOut: 0,
474 }
475 }
476
477 /// Indicates whether data is allowed to flow from the client to the server.
inbound(&mut self, allowed: bool) -> &mut Self478 pub fn inbound(&mut self, allowed: bool) -> &mut Self {
479 flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_INBOUND);
480 self
481 }
482
483 /// Indicates whether data is allowed to flow from the server to the client.
outbound(&mut self, allowed: bool) -> &mut Self484 pub fn outbound(&mut self, allowed: bool) -> &mut Self {
485 flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_OUTBOUND);
486 self
487 }
488
489 /// Indicates that this pipe must be the first instance.
490 ///
491 /// If set to true, then creation will fail if there's already an instance
492 /// elsewhere.
first(&mut self, first: bool) -> &mut Self493 pub fn first(&mut self, first: bool) -> &mut Self {
494 flag(&mut self.dwOpenMode, first, FILE_FLAG_FIRST_PIPE_INSTANCE);
495 self
496 }
497
498 /// Indicates whether this server can accept remote clients or not.
accept_remote(&mut self, accept: bool) -> &mut Self499 pub fn accept_remote(&mut self, accept: bool) -> &mut Self {
500 flag(&mut self.dwPipeMode, !accept, PIPE_REJECT_REMOTE_CLIENTS);
501 self
502 }
503
504 /// Specifies the maximum number of instances of the server pipe that are
505 /// allowed.
506 ///
507 /// The first instance of a pipe can specify this value. A value of 255
508 /// indicates that there is no limit to the number of instances.
max_instances(&mut self, instances: u8) -> &mut Self509 pub fn max_instances(&mut self, instances: u8) -> &mut Self {
510 self.nMaxInstances = instances as DWORD;
511 self
512 }
513
514 /// Specifies the number of bytes to reserver for the output buffer
out_buffer_size(&mut self, buffer: u32) -> &mut Self515 pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
516 self.nOutBufferSize = buffer as DWORD;
517 self
518 }
519
520 /// Specifies the number of bytes to reserver for the input buffer
in_buffer_size(&mut self, buffer: u32) -> &mut Self521 pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
522 self.nInBufferSize = buffer as DWORD;
523 self
524 }
525
526 /// Using the options in this builder, attempt to create a new named pipe.
527 ///
528 /// This function will call the `CreateNamedPipe` function and return the
529 /// result.
create(&mut self) -> io::Result<NamedPipe>530 pub fn create(&mut self) -> io::Result<NamedPipe> {
531 unsafe { self.with_security_attributes(::std::ptr::null_mut()) }
532 }
533
534 /// Using the options in the builder and the provided security attributes, attempt to create a
535 /// new named pipe. This function has to be called with a valid pointer to a
536 /// `SECURITY_ATTRIBUTES` struct that will stay valid for the lifetime of this function or a
537 /// null pointer.
538 ///
539 /// This function will call the `CreateNamedPipe` function and return the
540 /// result.
with_security_attributes( &mut self, attrs: *mut SECURITY_ATTRIBUTES, ) -> io::Result<NamedPipe>541 pub unsafe fn with_security_attributes(
542 &mut self,
543 attrs: *mut SECURITY_ATTRIBUTES,
544 ) -> io::Result<NamedPipe> {
545 let h = CreateNamedPipeW(
546 self.name.as_ptr(),
547 self.dwOpenMode,
548 self.dwPipeMode,
549 self.nMaxInstances,
550 self.nOutBufferSize,
551 self.nInBufferSize,
552 self.nDefaultTimeOut,
553 attrs,
554 );
555
556 if h == INVALID_HANDLE_VALUE {
557 Err(io::Error::last_os_error())
558 } else {
559 Ok(NamedPipe(Handle::new(h)))
560 }
561 }
562 }
563
564 #[cfg(test)]
565 mod tests {
566 use std::fs::{File, OpenOptions};
567 use std::io::prelude::*;
568 use std::sync::mpsc::channel;
569 use std::thread;
570 use std::time::Duration;
571
572 use rand::{distributions::Alphanumeric, thread_rng, Rng};
573
574 use super::{anonymous, NamedPipe, NamedPipeBuilder};
575 use crate::iocp::CompletionPort;
576 use crate::Overlapped;
577
name() -> String578 fn name() -> String {
579 let name = thread_rng()
580 .sample_iter(Alphanumeric)
581 .take(30)
582 .map(char::from)
583 .collect::<String>();
584 format!(r"\\.\pipe\{}", name)
585 }
586
587 #[test]
anon()588 fn anon() {
589 let (mut read, mut write) = t!(anonymous(256));
590 assert_eq!(t!(write.write(&[1, 2, 3])), 3);
591 let mut b = [0; 10];
592 assert_eq!(t!(read.read(&mut b)), 3);
593 assert_eq!(&b[..3], &[1, 2, 3]);
594 }
595
596 #[test]
named_not_first()597 fn named_not_first() {
598 let name = name();
599 let _a = t!(NamedPipe::new(&name));
600 assert!(NamedPipe::new(&name).is_err());
601
602 t!(NamedPipeBuilder::new(&name).first(false).create());
603 }
604
605 #[test]
named_connect()606 fn named_connect() {
607 let name = name();
608 let a = t!(NamedPipe::new(&name));
609
610 let t = thread::spawn(move || {
611 t!(File::open(name));
612 });
613
614 t!(a.connect());
615 t!(a.disconnect());
616 t!(t.join());
617 }
618
619 #[test]
named_wait()620 fn named_wait() {
621 let name = name();
622 let a = t!(NamedPipe::new(&name));
623
624 let (tx, rx) = channel();
625 let t = thread::spawn(move || {
626 t!(NamedPipe::wait(&name, None));
627 t!(File::open(&name));
628 assert!(NamedPipe::wait(&name, Some(Duration::from_millis(1))).is_err());
629 t!(tx.send(()));
630 });
631
632 t!(a.connect());
633 t!(rx.recv());
634 t!(a.disconnect());
635 t!(t.join());
636 }
637
638 #[test]
named_connect_overlapped()639 fn named_connect_overlapped() {
640 let name = name();
641 let a = t!(NamedPipe::new(&name));
642
643 let t = thread::spawn(move || {
644 t!(File::open(name));
645 });
646
647 let cp = t!(CompletionPort::new(1));
648 t!(cp.add_handle(2, &a));
649
650 let over = Overlapped::zero();
651 unsafe {
652 t!(a.connect_overlapped(over.raw()));
653 }
654
655 let status = t!(cp.get(None));
656 assert_eq!(status.bytes_transferred(), 0);
657 assert_eq!(status.token(), 2);
658 assert_eq!(status.overlapped(), over.raw());
659 t!(t.join());
660 }
661
662 #[test]
named_read_write()663 fn named_read_write() {
664 let name = name();
665 let mut a = t!(NamedPipe::new(&name));
666
667 let t = thread::spawn(move || {
668 let mut f = t!(OpenOptions::new().read(true).write(true).open(name));
669 t!(f.write_all(&[1, 2, 3]));
670 let mut b = [0; 10];
671 assert_eq!(t!(f.read(&mut b)), 3);
672 assert_eq!(&b[..3], &[1, 2, 3]);
673 });
674
675 t!(a.connect());
676 let mut b = [0; 10];
677 assert_eq!(t!(a.read(&mut b)), 3);
678 assert_eq!(&b[..3], &[1, 2, 3]);
679 t!(a.write_all(&[1, 2, 3]));
680 t!(a.flush());
681 t!(a.disconnect());
682 t!(t.join());
683 }
684
685 #[test]
named_read_write_multi()686 fn named_read_write_multi() {
687 for _ in 0..5 {
688 named_read_write()
689 }
690 }
691
692 #[test]
named_read_write_multi_same_thread()693 fn named_read_write_multi_same_thread() {
694 let name1 = name();
695 let mut a1 = t!(NamedPipe::new(&name1));
696 let name2 = name();
697 let mut a2 = t!(NamedPipe::new(&name2));
698
699 let t = thread::spawn(move || {
700 let mut f = t!(OpenOptions::new().read(true).write(true).open(name1));
701 t!(f.write_all(&[1, 2, 3]));
702 let mut b = [0; 10];
703 assert_eq!(t!(f.read(&mut b)), 3);
704 assert_eq!(&b[..3], &[1, 2, 3]);
705
706 let mut f = t!(OpenOptions::new().read(true).write(true).open(name2));
707 t!(f.write_all(&[1, 2, 3]));
708 let mut b = [0; 10];
709 assert_eq!(t!(f.read(&mut b)), 3);
710 assert_eq!(&b[..3], &[1, 2, 3]);
711 });
712
713 t!(a1.connect());
714 let mut b = [0; 10];
715 assert_eq!(t!(a1.read(&mut b)), 3);
716 assert_eq!(&b[..3], &[1, 2, 3]);
717 t!(a1.write_all(&[1, 2, 3]));
718 t!(a1.flush());
719 t!(a1.disconnect());
720
721 t!(a2.connect());
722 let mut b = [0; 10];
723 assert_eq!(t!(a2.read(&mut b)), 3);
724 assert_eq!(&b[..3], &[1, 2, 3]);
725 t!(a2.write_all(&[1, 2, 3]));
726 t!(a2.flush());
727 t!(a2.disconnect());
728
729 t!(t.join());
730 }
731
732 #[test]
named_read_overlapped()733 fn named_read_overlapped() {
734 let name = name();
735 let a = t!(NamedPipe::new(&name));
736
737 let t = thread::spawn(move || {
738 let mut f = t!(File::create(name));
739 t!(f.write_all(&[1, 2, 3]));
740 });
741
742 let cp = t!(CompletionPort::new(1));
743 t!(cp.add_handle(3, &a));
744 t!(a.connect());
745
746 let mut b = [0; 10];
747 let over = Overlapped::zero();
748 unsafe {
749 t!(a.read_overlapped(&mut b, over.raw()));
750 }
751 let status = t!(cp.get(None));
752 assert_eq!(status.bytes_transferred(), 3);
753 assert_eq!(status.token(), 3);
754 assert_eq!(status.overlapped(), over.raw());
755 assert_eq!(&b[..3], &[1, 2, 3]);
756
757 t!(t.join());
758 }
759
760 #[test]
named_write_overlapped()761 fn named_write_overlapped() {
762 let name = name();
763 let a = t!(NamedPipe::new(&name));
764
765 let t = thread::spawn(move || {
766 let mut f = t!(super::connect(name));
767 let mut b = [0; 10];
768 assert_eq!(t!(f.read(&mut b)), 3);
769 assert_eq!(&b[..3], &[1, 2, 3])
770 });
771
772 let cp = t!(CompletionPort::new(1));
773 t!(cp.add_handle(3, &a));
774 t!(a.connect());
775
776 let over = Overlapped::zero();
777 unsafe {
778 t!(a.write_overlapped(&[1, 2, 3], over.raw()));
779 }
780
781 let status = t!(cp.get(None));
782 assert_eq!(status.bytes_transferred(), 3);
783 assert_eq!(status.token(), 3);
784 assert_eq!(status.overlapped(), over.raw());
785
786 t!(t.join());
787 }
788 }
789