1 //! Tokio support for [Windows named pipes].
2 //!
3 //! [Windows named pipes]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
4 
5 use std::ffi::c_void;
6 use std::ffi::OsStr;
7 use std::io::{self, Read, Write};
8 use std::pin::Pin;
9 use std::ptr;
10 use std::task::{Context, Poll};
11 
12 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
13 use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
14 
15 // Hide imports which are not used when generating documentation.
16 #[cfg(not(docsrs))]
17 mod doc {
18     pub(super) use crate::os::windows::ffi::OsStrExt;
19     pub(super) use crate::winapi::shared::minwindef::{DWORD, FALSE};
20     pub(super) use crate::winapi::um::fileapi;
21     pub(super) use crate::winapi::um::handleapi;
22     pub(super) use crate::winapi::um::namedpipeapi;
23     pub(super) use crate::winapi::um::winbase;
24     pub(super) use crate::winapi::um::winnt;
25 
26     pub(super) use mio::windows as mio_windows;
27 }
28 
29 // NB: none of these shows up in public API, so don't document them.
30 #[cfg(docsrs)]
31 mod doc {
32     pub type DWORD = crate::doc::NotDefinedHere;
33 
34     pub(super) mod mio_windows {
35         pub type NamedPipe = crate::doc::NotDefinedHere;
36     }
37 }
38 
39 use self::doc::*;
40 
41 /// A [Windows named pipe] server.
42 ///
43 /// Accepting client connections involves creating a server with
44 /// [`ServerOptions::create`] and waiting for clients to connect using
45 /// [`NamedPipeServer::connect`].
46 ///
47 /// To avoid having clients sporadically fail with
48 /// [`std::io::ErrorKind::NotFound`] when they connect to a server, we must
49 /// ensure that at least one server instance is available at all times. This
50 /// means that the typical listen loop for a server is a bit involved, because
51 /// we have to ensure that we never drop a server accidentally while a client
52 /// might connect.
53 ///
54 /// So a correctly implemented server looks like this:
55 ///
56 /// ```no_run
57 /// use std::io;
58 /// use tokio::net::windows::named_pipe::ServerOptions;
59 ///
60 /// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-server";
61 ///
62 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
63 /// // The first server needs to be constructed early so that clients can
64 /// // be correctly connected. Otherwise calling .wait will cause the client to
65 /// // error.
66 /// //
67 /// // Here we also make use of `first_pipe_instance`, which will ensure that
68 /// // there are no other servers up and running already.
69 /// let mut server = ServerOptions::new()
70 ///     .first_pipe_instance(true)
71 ///     .create(PIPE_NAME)?;
72 ///
73 /// // Spawn the server loop.
74 /// let server = tokio::spawn(async move {
75 ///     loop {
76 ///         // Wait for a client to connect.
77 ///         let connected = server.connect().await?;
78 ///
79 ///         // Construct the next server to be connected before sending the one
80 ///         // we already have of onto a task. This ensures that the server
81 ///         // isn't closed (after it's done in the task) before a new one is
82 ///         // available. Otherwise the client might error with
83 ///         // `io::ErrorKind::NotFound`.
84 ///         server = ServerOptions::new().create(PIPE_NAME)?;
85 ///
86 ///         let client = tokio::spawn(async move {
87 ///             /* use the connected client */
88 /// #           Ok::<_, std::io::Error>(())
89 ///         });
90 /// #       if true { break } // needed for type inference to work
91 ///     }
92 ///
93 ///     Ok::<_, io::Error>(())
94 /// });
95 ///
96 /// /* do something else not server related here */
97 /// # Ok(()) }
98 /// ```
99 ///
100 /// [`ERROR_PIPE_BUSY`]: crate::winapi::shared::winerror::ERROR_PIPE_BUSY
101 /// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
102 #[derive(Debug)]
103 pub struct NamedPipeServer {
104     io: PollEvented<mio_windows::NamedPipe>,
105 }
106 
107 impl NamedPipeServer {
108     /// Constructs a new named pipe server from the specified raw handle.
109     ///
110     /// This function will consume ownership of the handle given, passing
111     /// responsibility for closing the handle to the returned object.
112     ///
113     /// This function is also unsafe as the primitives currently returned have
114     /// the contract that they are the sole owner of the file descriptor they
115     /// are wrapping. Usage of this function could accidentally allow violating
116     /// this contract which can cause memory unsafety in code that relies on it
117     /// being true.
118     ///
119     /// # Errors
120     ///
121     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
122     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
123     ///
124     /// [Tokio Runtime]: crate::runtime::Runtime
125     /// [enabled I/O]: crate::runtime::Builder::enable_io
from_raw_handle(handle: RawHandle) -> io::Result<Self>126     pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
127         let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
128 
129         Ok(Self {
130             io: PollEvented::new(named_pipe)?,
131         })
132     }
133 
134     /// Retrieves information about the named pipe the server is associated
135     /// with.
136     ///
137     /// ```no_run
138     /// use tokio::net::windows::named_pipe::{PipeEnd, PipeMode, ServerOptions};
139     ///
140     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-info";
141     ///
142     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
143     /// let server = ServerOptions::new()
144     ///     .pipe_mode(PipeMode::Message)
145     ///     .max_instances(5)
146     ///     .create(PIPE_NAME)?;
147     ///
148     /// let server_info = server.info()?;
149     ///
150     /// assert_eq!(server_info.end, PipeEnd::Server);
151     /// assert_eq!(server_info.mode, PipeMode::Message);
152     /// assert_eq!(server_info.max_instances, 5);
153     /// # Ok(()) }
154     /// ```
info(&self) -> io::Result<PipeInfo>155     pub fn info(&self) -> io::Result<PipeInfo> {
156         // Safety: we're ensuring the lifetime of the named pipe.
157         unsafe { named_pipe_info(self.io.as_raw_handle()) }
158     }
159 
160     /// Enables a named pipe server process to wait for a client process to
161     /// connect to an instance of a named pipe. A client process connects by
162     /// creating a named pipe with the same name.
163     ///
164     /// This corresponds to the [`ConnectNamedPipe`] system call.
165     ///
166     /// # Cancel safety
167     ///
168     /// This method is cancellation safe in the sense that if it is used as the
169     /// event in a [`select!`](crate::select) statement and some other branch
170     /// completes first, then no connection events have been lost.
171     ///
172     /// [`ConnectNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe
173     ///
174     /// # Example
175     ///
176     /// ```no_run
177     /// use tokio::net::windows::named_pipe::ServerOptions;
178     ///
179     /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
180     ///
181     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
182     /// let pipe = ServerOptions::new().create(PIPE_NAME)?;
183     ///
184     /// // Wait for a client to connect.
185     /// pipe.connect().await?;
186     ///
187     /// // Use the connected client...
188     /// # Ok(()) }
189     /// ```
connect(&self) -> io::Result<()>190     pub async fn connect(&self) -> io::Result<()> {
191         loop {
192             match self.io.connect() {
193                 Ok(()) => break,
194                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
195                     self.io.registration().readiness(Interest::WRITABLE).await?;
196                 }
197                 Err(e) => return Err(e),
198             }
199         }
200 
201         Ok(())
202     }
203 
204     /// Disconnects the server end of a named pipe instance from a client
205     /// process.
206     ///
207     /// ```
208     /// use tokio::io::AsyncWriteExt;
209     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
210     /// use winapi::shared::winerror;
211     ///
212     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-disconnect";
213     ///
214     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
215     /// let server = ServerOptions::new()
216     ///     .create(PIPE_NAME)?;
217     ///
218     /// let mut client = ClientOptions::new()
219     ///     .open(PIPE_NAME)?;
220     ///
221     /// // Wait for a client to become connected.
222     /// server.connect().await?;
223     ///
224     /// // Forcibly disconnect the client.
225     /// server.disconnect()?;
226     ///
227     /// // Write fails with an OS-specific error after client has been
228     /// // disconnected.
229     /// let e = client.write(b"ping").await.unwrap_err();
230     /// assert_eq!(e.raw_os_error(), Some(winerror::ERROR_PIPE_NOT_CONNECTED as i32));
231     /// # Ok(()) }
232     /// ```
disconnect(&self) -> io::Result<()>233     pub fn disconnect(&self) -> io::Result<()> {
234         self.io.disconnect()
235     }
236 
237     /// Waits for any of the requested ready states.
238     ///
239     /// This function is usually paired with `try_read()` or `try_write()`. It
240     /// can be used to concurrently read / write to the same pipe on a single
241     /// task without splitting the pipe.
242     ///
243     /// # Examples
244     ///
245     /// Concurrently read and write to the pipe on the same task without
246     /// splitting.
247     ///
248     /// ```no_run
249     /// use tokio::io::Interest;
250     /// use tokio::net::windows::named_pipe;
251     /// use std::error::Error;
252     /// use std::io;
253     ///
254     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-ready";
255     ///
256     /// #[tokio::main]
257     /// async fn main() -> Result<(), Box<dyn Error>> {
258     ///     let server = named_pipe::ServerOptions::new()
259     ///         .create(PIPE_NAME)?;
260     ///
261     ///     loop {
262     ///         let ready = server.ready(Interest::READABLE | Interest::WRITABLE).await?;
263     ///
264     ///         if ready.is_readable() {
265     ///             let mut data = vec![0; 1024];
266     ///             // Try to read data, this may still fail with `WouldBlock`
267     ///             // if the readiness event is a false positive.
268     ///             match server.try_read(&mut data) {
269     ///                 Ok(n) => {
270     ///                     println!("read {} bytes", n);
271     ///                 }
272     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
273     ///                     continue;
274     ///                 }
275     ///                 Err(e) => {
276     ///                     return Err(e.into());
277     ///                 }
278     ///             }
279     ///         }
280     ///
281     ///         if ready.is_writable() {
282     ///             // Try to write data, this may still fail with `WouldBlock`
283     ///             // if the readiness event is a false positive.
284     ///             match server.try_write(b"hello world") {
285     ///                 Ok(n) => {
286     ///                     println!("write {} bytes", n);
287     ///                 }
288     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
289     ///                     continue;
290     ///                 }
291     ///                 Err(e) => {
292     ///                     return Err(e.into());
293     ///                 }
294     ///             }
295     ///         }
296     ///     }
297     /// }
298     /// ```
ready(&self, interest: Interest) -> io::Result<Ready>299     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
300         let event = self.io.registration().readiness(interest).await?;
301         Ok(event.ready)
302     }
303 
304     /// Waits for the pipe to become readable.
305     ///
306     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
307     /// paired with `try_read()`.
308     ///
309     /// # Examples
310     ///
311     /// ```no_run
312     /// use tokio::net::windows::named_pipe;
313     /// use std::error::Error;
314     /// use std::io;
315     ///
316     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-readable";
317     ///
318     /// #[tokio::main]
319     /// async fn main() -> Result<(), Box<dyn Error>> {
320     ///     let server = named_pipe::ServerOptions::new()
321     ///         .create(PIPE_NAME)?;
322     ///
323     ///     let mut msg = vec![0; 1024];
324     ///
325     ///     loop {
326     ///         // Wait for the pipe to be readable
327     ///         server.readable().await?;
328     ///
329     ///         // Try to read data, this may still fail with `WouldBlock`
330     ///         // if the readiness event is a false positive.
331     ///         match server.try_read(&mut msg) {
332     ///             Ok(n) => {
333     ///                 msg.truncate(n);
334     ///                 break;
335     ///             }
336     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
337     ///                 continue;
338     ///             }
339     ///             Err(e) => {
340     ///                 return Err(e.into());
341     ///             }
342     ///         }
343     ///     }
344     ///
345     ///     println!("GOT = {:?}", msg);
346     ///     Ok(())
347     /// }
348     /// ```
readable(&self) -> io::Result<()>349     pub async fn readable(&self) -> io::Result<()> {
350         self.ready(Interest::READABLE).await?;
351         Ok(())
352     }
353 
354     /// Polls for read readiness.
355     ///
356     /// If the pipe is not currently ready for reading, this method will
357     /// store a clone of the `Waker` from the provided `Context`. When the pipe
358     /// becomes ready for reading, `Waker::wake` will be called on the waker.
359     ///
360     /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
361     /// the `Waker` from the `Context` passed to the most recent call is
362     /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
363     /// second, independent waker.)
364     ///
365     /// This function is intended for cases where creating and pinning a future
366     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
367     /// preferred, as this supports polling from multiple tasks at once.
368     ///
369     /// # Return value
370     ///
371     /// The function returns:
372     ///
373     /// * `Poll::Pending` if the pipe is not ready for reading.
374     /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
375     /// * `Poll::Ready(Err(e))` if an error is encountered.
376     ///
377     /// # Errors
378     ///
379     /// This function may encounter any standard I/O error except `WouldBlock`.
380     ///
381     /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>382     pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
383         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
384     }
385 
386     /// Tries to read data from the pipe into the provided buffer, returning how
387     /// many bytes were read.
388     ///
389     /// Receives any pending data from the pipe but does not wait for new data
390     /// to arrive. On success, returns the number of bytes read. Because
391     /// `try_read()` is non-blocking, the buffer does not have to be stored by
392     /// the async task and can exist entirely on the stack.
393     ///
394     /// Usually, [`readable()`] or [`ready()`] is used with this function.
395     ///
396     /// [`readable()`]: NamedPipeServer::readable()
397     /// [`ready()`]: NamedPipeServer::ready()
398     ///
399     /// # Return
400     ///
401     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
402     /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
403     /// and will no longer yield data. If the pipe is not ready to read data
404     /// `Err(io::ErrorKind::WouldBlock)` is returned.
405     ///
406     /// # Examples
407     ///
408     /// ```no_run
409     /// use tokio::net::windows::named_pipe;
410     /// use std::error::Error;
411     /// use std::io;
412     ///
413     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read";
414     ///
415     /// #[tokio::main]
416     /// async fn main() -> Result<(), Box<dyn Error>> {
417     ///     let server = named_pipe::ServerOptions::new()
418     ///         .create(PIPE_NAME)?;
419     ///
420     ///     loop {
421     ///         // Wait for the pipe to be readable
422     ///         server.readable().await?;
423     ///
424     ///         // Creating the buffer **after** the `await` prevents it from
425     ///         // being stored in the async task.
426     ///         let mut buf = [0; 4096];
427     ///
428     ///         // Try to read data, this may still fail with `WouldBlock`
429     ///         // if the readiness event is a false positive.
430     ///         match server.try_read(&mut buf) {
431     ///             Ok(0) => break,
432     ///             Ok(n) => {
433     ///                 println!("read {} bytes", n);
434     ///             }
435     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
436     ///                 continue;
437     ///             }
438     ///             Err(e) => {
439     ///                 return Err(e.into());
440     ///             }
441     ///         }
442     ///     }
443     ///
444     ///     Ok(())
445     /// }
446     /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>447     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
448         self.io
449             .registration()
450             .try_io(Interest::READABLE, || (&*self.io).read(buf))
451     }
452 
453     /// Tries to read data from the pipe into the provided buffers, returning
454     /// how many bytes were read.
455     ///
456     /// Data is copied to fill each buffer in order, with the final buffer
457     /// written to possibly being only partially filled. This method behaves
458     /// equivalently to a single call to [`try_read()`] with concatenated
459     /// buffers.
460     ///
461     /// Receives any pending data from the pipe but does not wait for new data
462     /// to arrive. On success, returns the number of bytes read. Because
463     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
464     /// stored by the async task and can exist entirely on the stack.
465     ///
466     /// Usually, [`readable()`] or [`ready()`] is used with this function.
467     ///
468     /// [`try_read()`]: NamedPipeServer::try_read()
469     /// [`readable()`]: NamedPipeServer::readable()
470     /// [`ready()`]: NamedPipeServer::ready()
471     ///
472     /// # Return
473     ///
474     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
475     /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
476     /// and will no longer yield data. If the pipe is not ready to read data
477     /// `Err(io::ErrorKind::WouldBlock)` is returned.
478     ///
479     /// # Examples
480     ///
481     /// ```no_run
482     /// use tokio::net::windows::named_pipe;
483     /// use std::error::Error;
484     /// use std::io::{self, IoSliceMut};
485     ///
486     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read-vectored";
487     ///
488     /// #[tokio::main]
489     /// async fn main() -> Result<(), Box<dyn Error>> {
490     ///     let server = named_pipe::ServerOptions::new()
491     ///         .create(PIPE_NAME)?;
492     ///
493     ///     loop {
494     ///         // Wait for the pipe to be readable
495     ///         server.readable().await?;
496     ///
497     ///         // Creating the buffer **after** the `await` prevents it from
498     ///         // being stored in the async task.
499     ///         let mut buf_a = [0; 512];
500     ///         let mut buf_b = [0; 1024];
501     ///         let mut bufs = [
502     ///             IoSliceMut::new(&mut buf_a),
503     ///             IoSliceMut::new(&mut buf_b),
504     ///         ];
505     ///
506     ///         // Try to read data, this may still fail with `WouldBlock`
507     ///         // if the readiness event is a false positive.
508     ///         match server.try_read_vectored(&mut bufs) {
509     ///             Ok(0) => break,
510     ///             Ok(n) => {
511     ///                 println!("read {} bytes", n);
512     ///             }
513     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
514     ///                 continue;
515     ///             }
516     ///             Err(e) => {
517     ///                 return Err(e.into());
518     ///             }
519     ///         }
520     ///     }
521     ///
522     ///     Ok(())
523     /// }
524     /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>525     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
526         self.io
527             .registration()
528             .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
529     }
530 
531     /// Waits for the pipe to become writable.
532     ///
533     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
534     /// paired with `try_write()`.
535     ///
536     /// # Examples
537     ///
538     /// ```no_run
539     /// use tokio::net::windows::named_pipe;
540     /// use std::error::Error;
541     /// use std::io;
542     ///
543     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-writable";
544     ///
545     /// #[tokio::main]
546     /// async fn main() -> Result<(), Box<dyn Error>> {
547     ///     let server = named_pipe::ServerOptions::new()
548     ///         .create(PIPE_NAME)?;
549     ///
550     ///     loop {
551     ///         // Wait for the pipe to be writable
552     ///         server.writable().await?;
553     ///
554     ///         // Try to write data, this may still fail with `WouldBlock`
555     ///         // if the readiness event is a false positive.
556     ///         match server.try_write(b"hello world") {
557     ///             Ok(n) => {
558     ///                 break;
559     ///             }
560     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
561     ///                 continue;
562     ///             }
563     ///             Err(e) => {
564     ///                 return Err(e.into());
565     ///             }
566     ///         }
567     ///     }
568     ///
569     ///     Ok(())
570     /// }
571     /// ```
writable(&self) -> io::Result<()>572     pub async fn writable(&self) -> io::Result<()> {
573         self.ready(Interest::WRITABLE).await?;
574         Ok(())
575     }
576 
577     /// Polls for write readiness.
578     ///
579     /// If the pipe is not currently ready for writing, this method will
580     /// store a clone of the `Waker` from the provided `Context`. When the pipe
581     /// becomes ready for writing, `Waker::wake` will be called on the waker.
582     ///
583     /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
584     /// the `Waker` from the `Context` passed to the most recent call is
585     /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
586     /// second, independent waker.)
587     ///
588     /// This function is intended for cases where creating and pinning a future
589     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
590     /// preferred, as this supports polling from multiple tasks at once.
591     ///
592     /// # Return value
593     ///
594     /// The function returns:
595     ///
596     /// * `Poll::Pending` if the pipe is not ready for writing.
597     /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
598     /// * `Poll::Ready(Err(e))` if an error is encountered.
599     ///
600     /// # Errors
601     ///
602     /// This function may encounter any standard I/O error except `WouldBlock`.
603     ///
604     /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>605     pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
606         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
607     }
608 
609     /// Tries to write a buffer to the pipe, returning how many bytes were
610     /// written.
611     ///
612     /// The function will attempt to write the entire contents of `buf`, but
613     /// only part of the buffer may be written.
614     ///
615     /// This function is usually paired with `writable()`.
616     ///
617     /// # Return
618     ///
619     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
620     /// number of bytes written. If the pipe is not ready to write data,
621     /// `Err(io::ErrorKind::WouldBlock)` is returned.
622     ///
623     /// # Examples
624     ///
625     /// ```no_run
626     /// use tokio::net::windows::named_pipe;
627     /// use std::error::Error;
628     /// use std::io;
629     ///
630     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write";
631     ///
632     /// #[tokio::main]
633     /// async fn main() -> Result<(), Box<dyn Error>> {
634     ///     let server = named_pipe::ServerOptions::new()
635     ///         .create(PIPE_NAME)?;
636     ///
637     ///     loop {
638     ///         // Wait for the pipe to be writable
639     ///         server.writable().await?;
640     ///
641     ///         // Try to write data, this may still fail with `WouldBlock`
642     ///         // if the readiness event is a false positive.
643     ///         match server.try_write(b"hello world") {
644     ///             Ok(n) => {
645     ///                 break;
646     ///             }
647     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
648     ///                 continue;
649     ///             }
650     ///             Err(e) => {
651     ///                 return Err(e.into());
652     ///             }
653     ///         }
654     ///     }
655     ///
656     ///     Ok(())
657     /// }
658     /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>659     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
660         self.io
661             .registration()
662             .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
663     }
664 
665     /// Tries to write several buffers to the pipe, returning how many bytes
666     /// were written.
667     ///
668     /// Data is written from each buffer in order, with the final buffer read
669     /// from possible being only partially consumed. This method behaves
670     /// equivalently to a single call to [`try_write()`] with concatenated
671     /// buffers.
672     ///
673     /// This function is usually paired with `writable()`.
674     ///
675     /// [`try_write()`]: NamedPipeServer::try_write()
676     ///
677     /// # Return
678     ///
679     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
680     /// number of bytes written. If the pipe is not ready to write data,
681     /// `Err(io::ErrorKind::WouldBlock)` is returned.
682     ///
683     /// # Examples
684     ///
685     /// ```no_run
686     /// use tokio::net::windows::named_pipe;
687     /// use std::error::Error;
688     /// use std::io;
689     ///
690     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write-vectored";
691     ///
692     /// #[tokio::main]
693     /// async fn main() -> Result<(), Box<dyn Error>> {
694     ///     let server = named_pipe::ServerOptions::new()
695     ///         .create(PIPE_NAME)?;
696     ///
697     ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
698     ///
699     ///     loop {
700     ///         // Wait for the pipe to be writable
701     ///         server.writable().await?;
702     ///
703     ///         // Try to write data, this may still fail with `WouldBlock`
704     ///         // if the readiness event is a false positive.
705     ///         match server.try_write_vectored(&bufs) {
706     ///             Ok(n) => {
707     ///                 break;
708     ///             }
709     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
710     ///                 continue;
711     ///             }
712     ///             Err(e) => {
713     ///                 return Err(e.into());
714     ///             }
715     ///         }
716     ///     }
717     ///
718     ///     Ok(())
719     /// }
720     /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>721     pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
722         self.io
723             .registration()
724             .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
725     }
726 
727     /// Tries to read or write from the socket using a user-provided IO operation.
728     ///
729     /// If the socket is ready, the provided closure is called. The closure
730     /// should attempt to perform IO operation from the socket by manually
731     /// calling the appropriate syscall. If the operation fails because the
732     /// socket is not actually ready, then the closure should return a
733     /// `WouldBlock` error and the readiness flag is cleared. The return value
734     /// of the closure is then returned by `try_io`.
735     ///
736     /// If the socket is not ready, then the closure is not called
737     /// and a `WouldBlock` error is returned.
738     ///
739     /// The closure should only return a `WouldBlock` error if it has performed
740     /// an IO operation on the socket that failed due to the socket not being
741     /// ready. Returning a `WouldBlock` error in any other situation will
742     /// incorrectly clear the readiness flag, which can cause the socket to
743     /// behave incorrectly.
744     ///
745     /// The closure should not perform the IO operation using any of the
746     /// methods defined on the Tokio `NamedPipeServer` type, as this will mess with
747     /// the readiness flag and can cause the socket to behave incorrectly.
748     ///
749     /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
750     ///
751     /// [`readable()`]: NamedPipeServer::readable()
752     /// [`writable()`]: NamedPipeServer::writable()
753     /// [`ready()`]: NamedPipeServer::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>754     pub fn try_io<R>(
755         &self,
756         interest: Interest,
757         f: impl FnOnce() -> io::Result<R>,
758     ) -> io::Result<R> {
759         self.io.registration().try_io(interest, f)
760     }
761 }
762 
763 impl AsyncRead for NamedPipeServer {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>764     fn poll_read(
765         self: Pin<&mut Self>,
766         cx: &mut Context<'_>,
767         buf: &mut ReadBuf<'_>,
768     ) -> Poll<io::Result<()>> {
769         unsafe { self.io.poll_read(cx, buf) }
770     }
771 }
772 
773 impl AsyncWrite for NamedPipeServer {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>774     fn poll_write(
775         self: Pin<&mut Self>,
776         cx: &mut Context<'_>,
777         buf: &[u8],
778     ) -> Poll<io::Result<usize>> {
779         self.io.poll_write(cx, buf)
780     }
781 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>782     fn poll_write_vectored(
783         self: Pin<&mut Self>,
784         cx: &mut Context<'_>,
785         bufs: &[io::IoSlice<'_>],
786     ) -> Poll<io::Result<usize>> {
787         self.io.poll_write_vectored(cx, bufs)
788     }
789 
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>790     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
791         Poll::Ready(Ok(()))
792     }
793 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>794     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
795         self.poll_flush(cx)
796     }
797 }
798 
799 impl AsRawHandle for NamedPipeServer {
as_raw_handle(&self) -> RawHandle800     fn as_raw_handle(&self) -> RawHandle {
801         self.io.as_raw_handle()
802     }
803 }
804 
805 /// A [Windows named pipe] client.
806 ///
807 /// Constructed using [`ClientOptions::open`].
808 ///
809 /// Connecting a client correctly involves a few steps. When connecting through
810 /// [`ClientOptions::open`], it might error indicating one of two things:
811 ///
812 /// * [`std::io::ErrorKind::NotFound`] - There is no server available.
813 /// * [`ERROR_PIPE_BUSY`] - There is a server available, but it is busy. Sleep
814 ///   for a while and try again.
815 ///
816 /// So a correctly implemented client looks like this:
817 ///
818 /// ```no_run
819 /// use std::time::Duration;
820 /// use tokio::net::windows::named_pipe::ClientOptions;
821 /// use tokio::time;
822 /// use winapi::shared::winerror;
823 ///
824 /// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-client";
825 ///
826 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
827 /// let client = loop {
828 ///     match ClientOptions::new().open(PIPE_NAME) {
829 ///         Ok(client) => break client,
830 ///         Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
831 ///         Err(e) => return Err(e),
832 ///     }
833 ///
834 ///     time::sleep(Duration::from_millis(50)).await;
835 /// };
836 ///
837 /// /* use the connected client */
838 /// # Ok(()) }
839 /// ```
840 ///
841 /// [`ERROR_PIPE_BUSY`]: crate::winapi::shared::winerror::ERROR_PIPE_BUSY
842 /// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
843 #[derive(Debug)]
844 pub struct NamedPipeClient {
845     io: PollEvented<mio_windows::NamedPipe>,
846 }
847 
848 impl NamedPipeClient {
849     /// Constructs a new named pipe client from the specified raw handle.
850     ///
851     /// This function will consume ownership of the handle given, passing
852     /// responsibility for closing the handle to the returned object.
853     ///
854     /// This function is also unsafe as the primitives currently returned have
855     /// the contract that they are the sole owner of the file descriptor they
856     /// are wrapping. Usage of this function could accidentally allow violating
857     /// this contract which can cause memory unsafety in code that relies on it
858     /// being true.
859     ///
860     /// # Errors
861     ///
862     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
863     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
864     ///
865     /// [Tokio Runtime]: crate::runtime::Runtime
866     /// [enabled I/O]: crate::runtime::Builder::enable_io
from_raw_handle(handle: RawHandle) -> io::Result<Self>867     pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
868         let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
869 
870         Ok(Self {
871             io: PollEvented::new(named_pipe)?,
872         })
873     }
874 
875     /// Retrieves information about the named pipe the client is associated
876     /// with.
877     ///
878     /// ```no_run
879     /// use tokio::net::windows::named_pipe::{ClientOptions, PipeEnd, PipeMode};
880     ///
881     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-info";
882     ///
883     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
884     /// let client = ClientOptions::new()
885     ///     .open(PIPE_NAME)?;
886     ///
887     /// let client_info = client.info()?;
888     ///
889     /// assert_eq!(client_info.end, PipeEnd::Client);
890     /// assert_eq!(client_info.mode, PipeMode::Message);
891     /// assert_eq!(client_info.max_instances, 5);
892     /// # Ok(()) }
893     /// ```
info(&self) -> io::Result<PipeInfo>894     pub fn info(&self) -> io::Result<PipeInfo> {
895         // Safety: we're ensuring the lifetime of the named pipe.
896         unsafe { named_pipe_info(self.io.as_raw_handle()) }
897     }
898 
899     /// Waits for any of the requested ready states.
900     ///
901     /// This function is usually paired with `try_read()` or `try_write()`. It
902     /// can be used to concurrently read / write to the same pipe on a single
903     /// task without splitting the pipe.
904     ///
905     /// # Examples
906     ///
907     /// Concurrently read and write to the pipe on the same task without
908     /// splitting.
909     ///
910     /// ```no_run
911     /// use tokio::io::Interest;
912     /// use tokio::net::windows::named_pipe;
913     /// use std::error::Error;
914     /// use std::io;
915     ///
916     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-ready";
917     ///
918     /// #[tokio::main]
919     /// async fn main() -> Result<(), Box<dyn Error>> {
920     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
921     ///
922     ///     loop {
923     ///         let ready = client.ready(Interest::READABLE | Interest::WRITABLE).await?;
924     ///
925     ///         if ready.is_readable() {
926     ///             let mut data = vec![0; 1024];
927     ///             // Try to read data, this may still fail with `WouldBlock`
928     ///             // if the readiness event is a false positive.
929     ///             match client.try_read(&mut data) {
930     ///                 Ok(n) => {
931     ///                     println!("read {} bytes", n);
932     ///                 }
933     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
934     ///                     continue;
935     ///                 }
936     ///                 Err(e) => {
937     ///                     return Err(e.into());
938     ///                 }
939     ///             }
940     ///         }
941     ///
942     ///         if ready.is_writable() {
943     ///             // Try to write data, this may still fail with `WouldBlock`
944     ///             // if the readiness event is a false positive.
945     ///             match client.try_write(b"hello world") {
946     ///                 Ok(n) => {
947     ///                     println!("write {} bytes", n);
948     ///                 }
949     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
950     ///                     continue;
951     ///                 }
952     ///                 Err(e) => {
953     ///                     return Err(e.into());
954     ///                 }
955     ///             }
956     ///         }
957     ///     }
958     /// }
959     /// ```
ready(&self, interest: Interest) -> io::Result<Ready>960     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
961         let event = self.io.registration().readiness(interest).await?;
962         Ok(event.ready)
963     }
964 
965     /// Waits for the pipe to become readable.
966     ///
967     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
968     /// paired with `try_read()`.
969     ///
970     /// # Examples
971     ///
972     /// ```no_run
973     /// use tokio::net::windows::named_pipe;
974     /// use std::error::Error;
975     /// use std::io;
976     ///
977     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
978     ///
979     /// #[tokio::main]
980     /// async fn main() -> Result<(), Box<dyn Error>> {
981     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
982     ///
983     ///     let mut msg = vec![0; 1024];
984     ///
985     ///     loop {
986     ///         // Wait for the pipe to be readable
987     ///         client.readable().await?;
988     ///
989     ///         // Try to read data, this may still fail with `WouldBlock`
990     ///         // if the readiness event is a false positive.
991     ///         match client.try_read(&mut msg) {
992     ///             Ok(n) => {
993     ///                 msg.truncate(n);
994     ///                 break;
995     ///             }
996     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
997     ///                 continue;
998     ///             }
999     ///             Err(e) => {
1000     ///                 return Err(e.into());
1001     ///             }
1002     ///         }
1003     ///     }
1004     ///
1005     ///     println!("GOT = {:?}", msg);
1006     ///     Ok(())
1007     /// }
1008     /// ```
readable(&self) -> io::Result<()>1009     pub async fn readable(&self) -> io::Result<()> {
1010         self.ready(Interest::READABLE).await?;
1011         Ok(())
1012     }
1013 
1014     /// Polls for read readiness.
1015     ///
1016     /// If the pipe is not currently ready for reading, this method will
1017     /// store a clone of the `Waker` from the provided `Context`. When the pipe
1018     /// becomes ready for reading, `Waker::wake` will be called on the waker.
1019     ///
1020     /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
1021     /// the `Waker` from the `Context` passed to the most recent call is
1022     /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
1023     /// second, independent waker.)
1024     ///
1025     /// This function is intended for cases where creating and pinning a future
1026     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
1027     /// preferred, as this supports polling from multiple tasks at once.
1028     ///
1029     /// # Return value
1030     ///
1031     /// The function returns:
1032     ///
1033     /// * `Poll::Pending` if the pipe is not ready for reading.
1034     /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
1035     /// * `Poll::Ready(Err(e))` if an error is encountered.
1036     ///
1037     /// # Errors
1038     ///
1039     /// This function may encounter any standard I/O error except `WouldBlock`.
1040     ///
1041     /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>1042     pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1043         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
1044     }
1045 
1046     /// Tries to read data from the pipe into the provided buffer, returning how
1047     /// many bytes were read.
1048     ///
1049     /// Receives any pending data from the pipe but does not wait for new data
1050     /// to arrive. On success, returns the number of bytes read. Because
1051     /// `try_read()` is non-blocking, the buffer does not have to be stored by
1052     /// the async task and can exist entirely on the stack.
1053     ///
1054     /// Usually, [`readable()`] or [`ready()`] is used with this function.
1055     ///
1056     /// [`readable()`]: NamedPipeClient::readable()
1057     /// [`ready()`]: NamedPipeClient::ready()
1058     ///
1059     /// # Return
1060     ///
1061     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1062     /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
1063     /// and will no longer yield data. If the pipe is not ready to read data
1064     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1065     ///
1066     /// # Examples
1067     ///
1068     /// ```no_run
1069     /// use tokio::net::windows::named_pipe;
1070     /// use std::error::Error;
1071     /// use std::io;
1072     ///
1073     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read";
1074     ///
1075     /// #[tokio::main]
1076     /// async fn main() -> Result<(), Box<dyn Error>> {
1077     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1078     ///
1079     ///     loop {
1080     ///         // Wait for the pipe to be readable
1081     ///         client.readable().await?;
1082     ///
1083     ///         // Creating the buffer **after** the `await` prevents it from
1084     ///         // being stored in the async task.
1085     ///         let mut buf = [0; 4096];
1086     ///
1087     ///         // Try to read data, this may still fail with `WouldBlock`
1088     ///         // if the readiness event is a false positive.
1089     ///         match client.try_read(&mut buf) {
1090     ///             Ok(0) => break,
1091     ///             Ok(n) => {
1092     ///                 println!("read {} bytes", n);
1093     ///             }
1094     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1095     ///                 continue;
1096     ///             }
1097     ///             Err(e) => {
1098     ///                 return Err(e.into());
1099     ///             }
1100     ///         }
1101     ///     }
1102     ///
1103     ///     Ok(())
1104     /// }
1105     /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>1106     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
1107         self.io
1108             .registration()
1109             .try_io(Interest::READABLE, || (&*self.io).read(buf))
1110     }
1111 
1112     /// Tries to read data from the pipe into the provided buffers, returning
1113     /// how many bytes were read.
1114     ///
1115     /// Data is copied to fill each buffer in order, with the final buffer
1116     /// written to possibly being only partially filled. This method behaves
1117     /// equivalently to a single call to [`try_read()`] with concatenated
1118     /// buffers.
1119     ///
1120     /// Receives any pending data from the pipe but does not wait for new data
1121     /// to arrive. On success, returns the number of bytes read. Because
1122     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1123     /// stored by the async task and can exist entirely on the stack.
1124     ///
1125     /// Usually, [`readable()`] or [`ready()`] is used with this function.
1126     ///
1127     /// [`try_read()`]: NamedPipeClient::try_read()
1128     /// [`readable()`]: NamedPipeClient::readable()
1129     /// [`ready()`]: NamedPipeClient::ready()
1130     ///
1131     /// # Return
1132     ///
1133     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1134     /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
1135     /// and will no longer yield data. If the pipe is not ready to read data
1136     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1137     ///
1138     /// # Examples
1139     ///
1140     /// ```no_run
1141     /// use tokio::net::windows::named_pipe;
1142     /// use std::error::Error;
1143     /// use std::io::{self, IoSliceMut};
1144     ///
1145     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read-vectored";
1146     ///
1147     /// #[tokio::main]
1148     /// async fn main() -> Result<(), Box<dyn Error>> {
1149     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1150     ///
1151     ///     loop {
1152     ///         // Wait for the pipe to be readable
1153     ///         client.readable().await?;
1154     ///
1155     ///         // Creating the buffer **after** the `await` prevents it from
1156     ///         // being stored in the async task.
1157     ///         let mut buf_a = [0; 512];
1158     ///         let mut buf_b = [0; 1024];
1159     ///         let mut bufs = [
1160     ///             IoSliceMut::new(&mut buf_a),
1161     ///             IoSliceMut::new(&mut buf_b),
1162     ///         ];
1163     ///
1164     ///         // Try to read data, this may still fail with `WouldBlock`
1165     ///         // if the readiness event is a false positive.
1166     ///         match client.try_read_vectored(&mut bufs) {
1167     ///             Ok(0) => break,
1168     ///             Ok(n) => {
1169     ///                 println!("read {} bytes", n);
1170     ///             }
1171     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1172     ///                 continue;
1173     ///             }
1174     ///             Err(e) => {
1175     ///                 return Err(e.into());
1176     ///             }
1177     ///         }
1178     ///     }
1179     ///
1180     ///     Ok(())
1181     /// }
1182     /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>1183     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1184         self.io
1185             .registration()
1186             .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1187     }
1188 
1189     /// Waits for the pipe to become writable.
1190     ///
1191     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
1192     /// paired with `try_write()`.
1193     ///
1194     /// # Examples
1195     ///
1196     /// ```no_run
1197     /// use tokio::net::windows::named_pipe;
1198     /// use std::error::Error;
1199     /// use std::io;
1200     ///
1201     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-writable";
1202     ///
1203     /// #[tokio::main]
1204     /// async fn main() -> Result<(), Box<dyn Error>> {
1205     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1206     ///
1207     ///     loop {
1208     ///         // Wait for the pipe to be writable
1209     ///         client.writable().await?;
1210     ///
1211     ///         // Try to write data, this may still fail with `WouldBlock`
1212     ///         // if the readiness event is a false positive.
1213     ///         match client.try_write(b"hello world") {
1214     ///             Ok(n) => {
1215     ///                 break;
1216     ///             }
1217     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1218     ///                 continue;
1219     ///             }
1220     ///             Err(e) => {
1221     ///                 return Err(e.into());
1222     ///             }
1223     ///         }
1224     ///     }
1225     ///
1226     ///     Ok(())
1227     /// }
1228     /// ```
writable(&self) -> io::Result<()>1229     pub async fn writable(&self) -> io::Result<()> {
1230         self.ready(Interest::WRITABLE).await?;
1231         Ok(())
1232     }
1233 
1234     /// Polls for write readiness.
1235     ///
1236     /// If the pipe is not currently ready for writing, this method will
1237     /// store a clone of the `Waker` from the provided `Context`. When the pipe
1238     /// becomes ready for writing, `Waker::wake` will be called on the waker.
1239     ///
1240     /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
1241     /// the `Waker` from the `Context` passed to the most recent call is
1242     /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
1243     /// second, independent waker.)
1244     ///
1245     /// This function is intended for cases where creating and pinning a future
1246     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
1247     /// preferred, as this supports polling from multiple tasks at once.
1248     ///
1249     /// # Return value
1250     ///
1251     /// The function returns:
1252     ///
1253     /// * `Poll::Pending` if the pipe is not ready for writing.
1254     /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
1255     /// * `Poll::Ready(Err(e))` if an error is encountered.
1256     ///
1257     /// # Errors
1258     ///
1259     /// This function may encounter any standard I/O error except `WouldBlock`.
1260     ///
1261     /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>1262     pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1263         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
1264     }
1265 
1266     /// Tries to write a buffer to the pipe, returning how many bytes were
1267     /// written.
1268     ///
1269     /// The function will attempt to write the entire contents of `buf`, but
1270     /// only part of the buffer may be written.
1271     ///
1272     /// This function is usually paired with `writable()`.
1273     ///
1274     /// # Return
1275     ///
1276     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1277     /// number of bytes written. If the pipe is not ready to write data,
1278     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1279     ///
1280     /// # Examples
1281     ///
1282     /// ```no_run
1283     /// use tokio::net::windows::named_pipe;
1284     /// use std::error::Error;
1285     /// use std::io;
1286     ///
1287     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write";
1288     ///
1289     /// #[tokio::main]
1290     /// async fn main() -> Result<(), Box<dyn Error>> {
1291     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1292     ///
1293     ///     loop {
1294     ///         // Wait for the pipe to be writable
1295     ///         client.writable().await?;
1296     ///
1297     ///         // Try to write data, this may still fail with `WouldBlock`
1298     ///         // if the readiness event is a false positive.
1299     ///         match client.try_write(b"hello world") {
1300     ///             Ok(n) => {
1301     ///                 break;
1302     ///             }
1303     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1304     ///                 continue;
1305     ///             }
1306     ///             Err(e) => {
1307     ///                 return Err(e.into());
1308     ///             }
1309     ///         }
1310     ///     }
1311     ///
1312     ///     Ok(())
1313     /// }
1314     /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>1315     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
1316         self.io
1317             .registration()
1318             .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
1319     }
1320 
1321     /// Tries to write several buffers to the pipe, returning how many bytes
1322     /// were written.
1323     ///
1324     /// Data is written from each buffer in order, with the final buffer read
1325     /// from possible being only partially consumed. This method behaves
1326     /// equivalently to a single call to [`try_write()`] with concatenated
1327     /// buffers.
1328     ///
1329     /// This function is usually paired with `writable()`.
1330     ///
1331     /// [`try_write()`]: NamedPipeClient::try_write()
1332     ///
1333     /// # Return
1334     ///
1335     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1336     /// number of bytes written. If the pipe is not ready to write data,
1337     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1338     ///
1339     /// # Examples
1340     ///
1341     /// ```no_run
1342     /// use tokio::net::windows::named_pipe;
1343     /// use std::error::Error;
1344     /// use std::io;
1345     ///
1346     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write-vectored";
1347     ///
1348     /// #[tokio::main]
1349     /// async fn main() -> Result<(), Box<dyn Error>> {
1350     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1351     ///
1352     ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
1353     ///
1354     ///     loop {
1355     ///         // Wait for the pipe to be writable
1356     ///         client.writable().await?;
1357     ///
1358     ///         // Try to write data, this may still fail with `WouldBlock`
1359     ///         // if the readiness event is a false positive.
1360     ///         match client.try_write_vectored(&bufs) {
1361     ///             Ok(n) => {
1362     ///                 break;
1363     ///             }
1364     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1365     ///                 continue;
1366     ///             }
1367     ///             Err(e) => {
1368     ///                 return Err(e.into());
1369     ///             }
1370     ///         }
1371     ///     }
1372     ///
1373     ///     Ok(())
1374     /// }
1375     /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>1376     pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
1377         self.io
1378             .registration()
1379             .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
1380     }
1381 
1382     /// Tries to read or write from the socket using a user-provided IO operation.
1383     ///
1384     /// If the socket is ready, the provided closure is called. The closure
1385     /// should attempt to perform IO operation from the socket by manually
1386     /// calling the appropriate syscall. If the operation fails because the
1387     /// socket is not actually ready, then the closure should return a
1388     /// `WouldBlock` error and the readiness flag is cleared. The return value
1389     /// of the closure is then returned by `try_io`.
1390     ///
1391     /// If the socket is not ready, then the closure is not called
1392     /// and a `WouldBlock` error is returned.
1393     ///
1394     /// The closure should only return a `WouldBlock` error if it has performed
1395     /// an IO operation on the socket that failed due to the socket not being
1396     /// ready. Returning a `WouldBlock` error in any other situation will
1397     /// incorrectly clear the readiness flag, which can cause the socket to
1398     /// behave incorrectly.
1399     ///
1400     /// The closure should not perform the IO operation using any of the methods
1401     /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1402     /// readiness flag and can cause the socket to behave incorrectly.
1403     ///
1404     /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1405     ///
1406     /// [`readable()`]: NamedPipeClient::readable()
1407     /// [`writable()`]: NamedPipeClient::writable()
1408     /// [`ready()`]: NamedPipeClient::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>1409     pub fn try_io<R>(
1410         &self,
1411         interest: Interest,
1412         f: impl FnOnce() -> io::Result<R>,
1413     ) -> io::Result<R> {
1414         self.io.registration().try_io(interest, f)
1415     }
1416 }
1417 
1418 impl AsyncRead for NamedPipeClient {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1419     fn poll_read(
1420         self: Pin<&mut Self>,
1421         cx: &mut Context<'_>,
1422         buf: &mut ReadBuf<'_>,
1423     ) -> Poll<io::Result<()>> {
1424         unsafe { self.io.poll_read(cx, buf) }
1425     }
1426 }
1427 
1428 impl AsyncWrite for NamedPipeClient {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1429     fn poll_write(
1430         self: Pin<&mut Self>,
1431         cx: &mut Context<'_>,
1432         buf: &[u8],
1433     ) -> Poll<io::Result<usize>> {
1434         self.io.poll_write(cx, buf)
1435     }
1436 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1437     fn poll_write_vectored(
1438         self: Pin<&mut Self>,
1439         cx: &mut Context<'_>,
1440         bufs: &[io::IoSlice<'_>],
1441     ) -> Poll<io::Result<usize>> {
1442         self.io.poll_write_vectored(cx, bufs)
1443     }
1444 
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>1445     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1446         Poll::Ready(Ok(()))
1447     }
1448 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>1449     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1450         self.poll_flush(cx)
1451     }
1452 }
1453 
1454 impl AsRawHandle for NamedPipeClient {
as_raw_handle(&self) -> RawHandle1455     fn as_raw_handle(&self) -> RawHandle {
1456         self.io.as_raw_handle()
1457     }
1458 }
1459 
1460 // Helper to set a boolean flag as a bitfield.
1461 macro_rules! bool_flag {
1462     ($f:expr, $t:expr, $flag:expr) => {{
1463         let current = $f;
1464 
1465         if $t {
1466             $f = current | $flag;
1467         } else {
1468             $f = current & !$flag;
1469         };
1470     }};
1471 }
1472 
1473 /// A builder structure for construct a named pipe with named pipe-specific
1474 /// options. This is required to use for named pipe servers who wants to modify
1475 /// pipe-related options.
1476 ///
1477 /// See [`ServerOptions::create`].
1478 #[derive(Debug, Clone)]
1479 pub struct ServerOptions {
1480     open_mode: DWORD,
1481     pipe_mode: DWORD,
1482     max_instances: DWORD,
1483     out_buffer_size: DWORD,
1484     in_buffer_size: DWORD,
1485     default_timeout: DWORD,
1486 }
1487 
1488 impl ServerOptions {
1489     /// Creates a new named pipe builder with the default settings.
1490     ///
1491     /// ```
1492     /// use tokio::net::windows::named_pipe::ServerOptions;
1493     ///
1494     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-new";
1495     ///
1496     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1497     /// let server = ServerOptions::new().create(PIPE_NAME)?;
1498     /// # Ok(()) }
1499     /// ```
new() -> ServerOptions1500     pub fn new() -> ServerOptions {
1501         ServerOptions {
1502             open_mode: winbase::PIPE_ACCESS_DUPLEX | winbase::FILE_FLAG_OVERLAPPED,
1503             pipe_mode: winbase::PIPE_TYPE_BYTE | winbase::PIPE_REJECT_REMOTE_CLIENTS,
1504             max_instances: winbase::PIPE_UNLIMITED_INSTANCES,
1505             out_buffer_size: 65536,
1506             in_buffer_size: 65536,
1507             default_timeout: 0,
1508         }
1509     }
1510 
1511     /// The pipe mode.
1512     ///
1513     /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
1514     /// documentation of what each mode means.
1515     ///
1516     /// This corresponding to specifying [`dwPipeMode`].
1517     ///
1518     /// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self1519     pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
1520         self.pipe_mode = match pipe_mode {
1521             PipeMode::Byte => winbase::PIPE_TYPE_BYTE,
1522             PipeMode::Message => winbase::PIPE_TYPE_MESSAGE,
1523         };
1524 
1525         self
1526     }
1527 
1528     /// The flow of data in the pipe goes from client to server only.
1529     ///
1530     /// This corresponds to setting [`PIPE_ACCESS_INBOUND`].
1531     ///
1532     /// [`PIPE_ACCESS_INBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_inbound
1533     ///
1534     /// # Errors
1535     ///
1536     /// Server side prevents connecting by denying inbound access, client errors
1537     /// with [`std::io::ErrorKind::PermissionDenied`] when attempting to create
1538     /// the connection.
1539     ///
1540     /// ```
1541     /// use std::io;
1542     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1543     ///
1544     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err1";
1545     ///
1546     /// # #[tokio::main] async fn main() -> io::Result<()> {
1547     /// let _server = ServerOptions::new()
1548     ///     .access_inbound(false)
1549     ///     .create(PIPE_NAME)?;
1550     ///
1551     /// let e = ClientOptions::new()
1552     ///     .open(PIPE_NAME)
1553     ///     .unwrap_err();
1554     ///
1555     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1556     /// # Ok(()) }
1557     /// ```
1558     ///
1559     /// Disabling writing allows a client to connect, but errors with
1560     /// [`std::io::ErrorKind::PermissionDenied`] if a write is attempted.
1561     ///
1562     /// ```
1563     /// use std::io;
1564     /// use tokio::io::AsyncWriteExt;
1565     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1566     ///
1567     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err2";
1568     ///
1569     /// # #[tokio::main] async fn main() -> io::Result<()> {
1570     /// let server = ServerOptions::new()
1571     ///     .access_inbound(false)
1572     ///     .create(PIPE_NAME)?;
1573     ///
1574     /// let mut client = ClientOptions::new()
1575     ///     .write(false)
1576     ///     .open(PIPE_NAME)?;
1577     ///
1578     /// server.connect().await?;
1579     ///
1580     /// let e = client.write(b"ping").await.unwrap_err();
1581     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1582     /// # Ok(()) }
1583     /// ```
1584     ///
1585     /// # Examples
1586     ///
1587     /// A unidirectional named pipe that only supports server-to-client
1588     /// communication.
1589     ///
1590     /// ```
1591     /// use std::io;
1592     /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1593     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1594     ///
1595     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound";
1596     ///
1597     /// # #[tokio::main] async fn main() -> io::Result<()> {
1598     /// let mut server = ServerOptions::new()
1599     ///     .access_inbound(false)
1600     ///     .create(PIPE_NAME)?;
1601     ///
1602     /// let mut client = ClientOptions::new()
1603     ///     .write(false)
1604     ///     .open(PIPE_NAME)?;
1605     ///
1606     /// server.connect().await?;
1607     ///
1608     /// let write = server.write_all(b"ping");
1609     ///
1610     /// let mut buf = [0u8; 4];
1611     /// let read = client.read_exact(&mut buf);
1612     ///
1613     /// let ((), read) = tokio::try_join!(write, read)?;
1614     ///
1615     /// assert_eq!(read, 4);
1616     /// assert_eq!(&buf[..], b"ping");
1617     /// # Ok(()) }
1618     /// ```
access_inbound(&mut self, allowed: bool) -> &mut Self1619     pub fn access_inbound(&mut self, allowed: bool) -> &mut Self {
1620         bool_flag!(self.open_mode, allowed, winbase::PIPE_ACCESS_INBOUND);
1621         self
1622     }
1623 
1624     /// The flow of data in the pipe goes from server to client only.
1625     ///
1626     /// This corresponds to setting [`PIPE_ACCESS_OUTBOUND`].
1627     ///
1628     /// [`PIPE_ACCESS_OUTBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_outbound
1629     ///
1630     /// # Errors
1631     ///
1632     /// Server side prevents connecting by denying outbound access, client
1633     /// errors with [`std::io::ErrorKind::PermissionDenied`] when attempting to
1634     /// create the connection.
1635     ///
1636     /// ```
1637     /// use std::io;
1638     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1639     ///
1640     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err1";
1641     ///
1642     /// # #[tokio::main] async fn main() -> io::Result<()> {
1643     /// let server = ServerOptions::new()
1644     ///     .access_outbound(false)
1645     ///     .create(PIPE_NAME)?;
1646     ///
1647     /// let e = ClientOptions::new()
1648     ///     .open(PIPE_NAME)
1649     ///     .unwrap_err();
1650     ///
1651     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1652     /// # Ok(()) }
1653     /// ```
1654     ///
1655     /// Disabling reading allows a client to connect, but attempting to read
1656     /// will error with [`std::io::ErrorKind::PermissionDenied`].
1657     ///
1658     /// ```
1659     /// use std::io;
1660     /// use tokio::io::AsyncReadExt;
1661     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1662     ///
1663     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err2";
1664     ///
1665     /// # #[tokio::main] async fn main() -> io::Result<()> {
1666     /// let server = ServerOptions::new()
1667     ///     .access_outbound(false)
1668     ///     .create(PIPE_NAME)?;
1669     ///
1670     /// let mut client = ClientOptions::new()
1671     ///     .read(false)
1672     ///     .open(PIPE_NAME)?;
1673     ///
1674     /// server.connect().await?;
1675     ///
1676     /// let mut buf = [0u8; 4];
1677     /// let e = client.read(&mut buf).await.unwrap_err();
1678     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1679     /// # Ok(()) }
1680     /// ```
1681     ///
1682     /// # Examples
1683     ///
1684     /// A unidirectional named pipe that only supports client-to-server
1685     /// communication.
1686     ///
1687     /// ```
1688     /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1689     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1690     ///
1691     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound";
1692     ///
1693     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1694     /// let mut server = ServerOptions::new()
1695     ///     .access_outbound(false)
1696     ///     .create(PIPE_NAME)?;
1697     ///
1698     /// let mut client = ClientOptions::new()
1699     ///     .read(false)
1700     ///     .open(PIPE_NAME)?;
1701     ///
1702     /// server.connect().await?;
1703     ///
1704     /// let write = client.write_all(b"ping");
1705     ///
1706     /// let mut buf = [0u8; 4];
1707     /// let read = server.read_exact(&mut buf);
1708     ///
1709     /// let ((), read) = tokio::try_join!(write, read)?;
1710     ///
1711     /// println!("done reading and writing");
1712     ///
1713     /// assert_eq!(read, 4);
1714     /// assert_eq!(&buf[..], b"ping");
1715     /// # Ok(()) }
1716     /// ```
access_outbound(&mut self, allowed: bool) -> &mut Self1717     pub fn access_outbound(&mut self, allowed: bool) -> &mut Self {
1718         bool_flag!(self.open_mode, allowed, winbase::PIPE_ACCESS_OUTBOUND);
1719         self
1720     }
1721 
1722     /// If you attempt to create multiple instances of a pipe with this flag
1723     /// set, creation of the first server instance succeeds, but creation of any
1724     /// subsequent instances will fail with
1725     /// [`std::io::ErrorKind::PermissionDenied`].
1726     ///
1727     /// This option is intended to be used with servers that want to ensure that
1728     /// they are the only process listening for clients on a given named pipe.
1729     /// This is accomplished by enabling it for the first server instance
1730     /// created in a process.
1731     ///
1732     /// This corresponds to setting [`FILE_FLAG_FIRST_PIPE_INSTANCE`].
1733     ///
1734     /// # Errors
1735     ///
1736     /// If this option is set and more than one instance of the server for a
1737     /// given named pipe exists, calling [`create`] will fail with
1738     /// [`std::io::ErrorKind::PermissionDenied`].
1739     ///
1740     /// ```
1741     /// use std::io;
1742     /// use tokio::net::windows::named_pipe::ServerOptions;
1743     ///
1744     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance-error";
1745     ///
1746     /// # #[tokio::main] async fn main() -> io::Result<()> {
1747     /// let server1 = ServerOptions::new()
1748     ///     .first_pipe_instance(true)
1749     ///     .create(PIPE_NAME)?;
1750     ///
1751     /// // Second server errs, since it's not the first instance.
1752     /// let e = ServerOptions::new()
1753     ///     .first_pipe_instance(true)
1754     ///     .create(PIPE_NAME)
1755     ///     .unwrap_err();
1756     ///
1757     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1758     /// # Ok(()) }
1759     /// ```
1760     ///
1761     /// # Examples
1762     ///
1763     /// ```
1764     /// use std::io;
1765     /// use tokio::net::windows::named_pipe::ServerOptions;
1766     ///
1767     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance";
1768     ///
1769     /// # #[tokio::main] async fn main() -> io::Result<()> {
1770     /// let mut builder = ServerOptions::new();
1771     /// builder.first_pipe_instance(true);
1772     ///
1773     /// let server = builder.create(PIPE_NAME)?;
1774     /// let e = builder.create(PIPE_NAME).unwrap_err();
1775     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1776     /// drop(server);
1777     ///
1778     /// // OK: since, we've closed the other instance.
1779     /// let _server2 = builder.create(PIPE_NAME)?;
1780     /// # Ok(()) }
1781     /// ```
1782     ///
1783     /// [`create`]: ServerOptions::create
1784     /// [`FILE_FLAG_FIRST_PIPE_INSTANCE`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_first_pipe_instance
first_pipe_instance(&mut self, first: bool) -> &mut Self1785     pub fn first_pipe_instance(&mut self, first: bool) -> &mut Self {
1786         bool_flag!(
1787             self.open_mode,
1788             first,
1789             winbase::FILE_FLAG_FIRST_PIPE_INSTANCE
1790         );
1791         self
1792     }
1793 
1794     /// Indicates whether this server can accept remote clients or not. Remote
1795     /// clients are disabled by default.
1796     ///
1797     /// This corresponds to setting [`PIPE_REJECT_REMOTE_CLIENTS`].
1798     ///
1799     /// [`PIPE_REJECT_REMOTE_CLIENTS`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_reject_remote_clients
reject_remote_clients(&mut self, reject: bool) -> &mut Self1800     pub fn reject_remote_clients(&mut self, reject: bool) -> &mut Self {
1801         bool_flag!(self.pipe_mode, reject, winbase::PIPE_REJECT_REMOTE_CLIENTS);
1802         self
1803     }
1804 
1805     /// The maximum number of instances that can be created for this pipe. The
1806     /// first instance of the pipe can specify this value; the same number must
1807     /// be specified for other instances of the pipe. Acceptable values are in
1808     /// the range 1 through 254. The default value is unlimited.
1809     ///
1810     /// This corresponds to specifying [`nMaxInstances`].
1811     ///
1812     /// [`nMaxInstances`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
1813     ///
1814     /// # Errors
1815     ///
1816     /// The same numbers of `max_instances` have to be used by all servers. Any
1817     /// additional servers trying to be built which uses a mismatching value
1818     /// might error.
1819     ///
1820     /// ```
1821     /// use std::io;
1822     /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
1823     /// use winapi::shared::winerror;
1824     ///
1825     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-max-instances";
1826     ///
1827     /// # #[tokio::main] async fn main() -> io::Result<()> {
1828     /// let mut server = ServerOptions::new();
1829     /// server.max_instances(2);
1830     ///
1831     /// let s1 = server.create(PIPE_NAME)?;
1832     /// let c1 = ClientOptions::new().open(PIPE_NAME);
1833     ///
1834     /// let s2 = server.create(PIPE_NAME)?;
1835     /// let c2 = ClientOptions::new().open(PIPE_NAME);
1836     ///
1837     /// // Too many servers!
1838     /// let e = server.create(PIPE_NAME).unwrap_err();
1839     /// assert_eq!(e.raw_os_error(), Some(winerror::ERROR_PIPE_BUSY as i32));
1840     ///
1841     /// // Still too many servers even if we specify a higher value!
1842     /// let e = server.max_instances(100).create(PIPE_NAME).unwrap_err();
1843     /// assert_eq!(e.raw_os_error(), Some(winerror::ERROR_PIPE_BUSY as i32));
1844     /// # Ok(()) }
1845     /// ```
1846     ///
1847     /// # Panics
1848     ///
1849     /// This function will panic if more than 254 instances are specified. If
1850     /// you do not wish to set an instance limit, leave it unspecified.
1851     ///
1852     /// ```should_panic
1853     /// use tokio::net::windows::named_pipe::ServerOptions;
1854     ///
1855     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1856     /// let builder = ServerOptions::new().max_instances(255);
1857     /// # Ok(()) }
1858     /// ```
max_instances(&mut self, instances: usize) -> &mut Self1859     pub fn max_instances(&mut self, instances: usize) -> &mut Self {
1860         assert!(instances < 255, "cannot specify more than 254 instances");
1861         self.max_instances = instances as DWORD;
1862         self
1863     }
1864 
1865     /// The number of bytes to reserve for the output buffer.
1866     ///
1867     /// This corresponds to specifying [`nOutBufferSize`].
1868     ///
1869     /// [`nOutBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
out_buffer_size(&mut self, buffer: u32) -> &mut Self1870     pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
1871         self.out_buffer_size = buffer as DWORD;
1872         self
1873     }
1874 
1875     /// The number of bytes to reserve for the input buffer.
1876     ///
1877     /// This corresponds to specifying [`nInBufferSize`].
1878     ///
1879     /// [`nInBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
in_buffer_size(&mut self, buffer: u32) -> &mut Self1880     pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
1881         self.in_buffer_size = buffer as DWORD;
1882         self
1883     }
1884 
1885     /// Creates the named pipe identified by `addr` for use as a server.
1886     ///
1887     /// This uses the [`CreateNamedPipe`] function.
1888     ///
1889     /// [`CreateNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
1890     ///
1891     /// # Errors
1892     ///
1893     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
1894     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
1895     ///
1896     /// [Tokio Runtime]: crate::runtime::Runtime
1897     /// [enabled I/O]: crate::runtime::Builder::enable_io
1898     ///
1899     /// # Examples
1900     ///
1901     /// ```
1902     /// use tokio::net::windows::named_pipe::ServerOptions;
1903     ///
1904     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-create";
1905     ///
1906     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1907     /// let server = ServerOptions::new().create(PIPE_NAME)?;
1908     /// # Ok(()) }
1909     /// ```
create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer>1910     pub fn create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer> {
1911         // Safety: We're calling create_with_security_attributes_raw w/ a null
1912         // pointer which disables it.
1913         unsafe { self.create_with_security_attributes_raw(addr, ptr::null_mut()) }
1914     }
1915 
1916     /// Creates the named pipe identified by `addr` for use as a server.
1917     ///
1918     /// This is the same as [`create`] except that it supports providing the raw
1919     /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
1920     /// as the `lpSecurityAttributes` argument to [`CreateFile`].
1921     ///
1922     /// # Errors
1923     ///
1924     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
1925     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
1926     ///
1927     /// [Tokio Runtime]: crate::runtime::Runtime
1928     /// [enabled I/O]: crate::runtime::Builder::enable_io
1929     ///
1930     /// # Safety
1931     ///
1932     /// The `attrs` argument must either be null or point at a valid instance of
1933     /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
1934     /// behavior is identical to calling the [`create`] method.
1935     ///
1936     /// [`create`]: ServerOptions::create
1937     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
1938     /// [`SECURITY_ATTRIBUTES`]: crate::winapi::um::minwinbase::SECURITY_ATTRIBUTES
create_with_security_attributes_raw( &self, addr: impl AsRef<OsStr>, attrs: *mut c_void, ) -> io::Result<NamedPipeServer>1939     pub unsafe fn create_with_security_attributes_raw(
1940         &self,
1941         addr: impl AsRef<OsStr>,
1942         attrs: *mut c_void,
1943     ) -> io::Result<NamedPipeServer> {
1944         let addr = encode_addr(addr);
1945 
1946         let h = namedpipeapi::CreateNamedPipeW(
1947             addr.as_ptr(),
1948             self.open_mode,
1949             self.pipe_mode,
1950             self.max_instances,
1951             self.out_buffer_size,
1952             self.in_buffer_size,
1953             self.default_timeout,
1954             attrs as *mut _,
1955         );
1956 
1957         if h == handleapi::INVALID_HANDLE_VALUE {
1958             return Err(io::Error::last_os_error());
1959         }
1960 
1961         NamedPipeServer::from_raw_handle(h)
1962     }
1963 }
1964 
1965 /// A builder suitable for building and interacting with named pipes from the
1966 /// client side.
1967 ///
1968 /// See [`ClientOptions::open`].
1969 #[derive(Debug, Clone)]
1970 pub struct ClientOptions {
1971     desired_access: DWORD,
1972     security_qos_flags: DWORD,
1973 }
1974 
1975 impl ClientOptions {
1976     /// Creates a new named pipe builder with the default settings.
1977     ///
1978     /// ```
1979     /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
1980     ///
1981     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-new";
1982     ///
1983     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1984     /// // Server must be created in order for the client creation to succeed.
1985     /// let server = ServerOptions::new().create(PIPE_NAME)?;
1986     /// let client = ClientOptions::new().open(PIPE_NAME)?;
1987     /// # Ok(()) }
1988     /// ```
new() -> Self1989     pub fn new() -> Self {
1990         Self {
1991             desired_access: winnt::GENERIC_READ | winnt::GENERIC_WRITE,
1992             security_qos_flags: winbase::SECURITY_IDENTIFICATION | winbase::SECURITY_SQOS_PRESENT,
1993         }
1994     }
1995 
1996     /// If the client supports reading data. This is enabled by default.
1997     ///
1998     /// This corresponds to setting [`GENERIC_READ`] in the call to [`CreateFile`].
1999     ///
2000     /// [`GENERIC_READ`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2001     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
read(&mut self, allowed: bool) -> &mut Self2002     pub fn read(&mut self, allowed: bool) -> &mut Self {
2003         bool_flag!(self.desired_access, allowed, winnt::GENERIC_READ);
2004         self
2005     }
2006 
2007     /// If the created pipe supports writing data. This is enabled by default.
2008     ///
2009     /// This corresponds to setting [`GENERIC_WRITE`] in the call to [`CreateFile`].
2010     ///
2011     /// [`GENERIC_WRITE`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2012     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
write(&mut self, allowed: bool) -> &mut Self2013     pub fn write(&mut self, allowed: bool) -> &mut Self {
2014         bool_flag!(self.desired_access, allowed, winnt::GENERIC_WRITE);
2015         self
2016     }
2017 
2018     /// Sets qos flags which are combined with other flags and attributes in the
2019     /// call to [`CreateFile`].
2020     ///
2021     /// By default `security_qos_flags` is set to [`SECURITY_IDENTIFICATION`],
2022     /// calling this function would override that value completely with the
2023     /// argument specified.
2024     ///
2025     /// When `security_qos_flags` is not set, a malicious program can gain the
2026     /// elevated privileges of a privileged Rust process when it allows opening
2027     /// user-specified paths, by tricking it into opening a named pipe. So
2028     /// arguably `security_qos_flags` should also be set when opening arbitrary
2029     /// paths. However the bits can then conflict with other flags, specifically
2030     /// `FILE_FLAG_OPEN_NO_RECALL`.
2031     ///
2032     /// For information about possible values, see [Impersonation Levels] on the
2033     /// Windows Dev Center site. The `SECURITY_SQOS_PRESENT` flag is set
2034     /// automatically when using this method.
2035     ///
2036     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2037     /// [`SECURITY_IDENTIFICATION`]: crate::winapi::um::winbase::SECURITY_IDENTIFICATION
2038     /// [Impersonation Levels]: https://docs.microsoft.com/en-us/windows/win32/api/winnt/ne-winnt-security_impersonation_level
security_qos_flags(&mut self, flags: u32) -> &mut Self2039     pub fn security_qos_flags(&mut self, flags: u32) -> &mut Self {
2040         // See: https://github.com/rust-lang/rust/pull/58216
2041         self.security_qos_flags = flags | winbase::SECURITY_SQOS_PRESENT;
2042         self
2043     }
2044 
2045     /// Opens the named pipe identified by `addr`.
2046     ///
2047     /// This opens the client using [`CreateFile`] with the
2048     /// `dwCreationDisposition` option set to `OPEN_EXISTING`.
2049     ///
2050     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2051     ///
2052     /// # Errors
2053     ///
2054     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2055     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2056     ///
2057     /// There are a few errors you need to take into account when creating a
2058     /// named pipe on the client side:
2059     ///
2060     /// * [`std::io::ErrorKind::NotFound`] - This indicates that the named pipe
2061     ///   does not exist. Presumably the server is not up.
2062     /// * [`ERROR_PIPE_BUSY`] - This error is raised when the named pipe exists,
2063     ///   but the server is not currently waiting for a connection. Please see the
2064     ///   examples for how to check for this error.
2065     ///
2066     /// [`ERROR_PIPE_BUSY`]: crate::winapi::shared::winerror::ERROR_PIPE_BUSY
2067     /// [`winapi`]: crate::winapi
2068     /// [enabled I/O]: crate::runtime::Builder::enable_io
2069     /// [Tokio Runtime]: crate::runtime::Runtime
2070     ///
2071     /// A connect loop that waits until a pipe becomes available looks like
2072     /// this:
2073     ///
2074     /// ```no_run
2075     /// use std::time::Duration;
2076     /// use tokio::net::windows::named_pipe::ClientOptions;
2077     /// use tokio::time;
2078     /// use winapi::shared::winerror;
2079     ///
2080     /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
2081     ///
2082     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2083     /// let client = loop {
2084     ///     match ClientOptions::new().open(PIPE_NAME) {
2085     ///         Ok(client) => break client,
2086     ///         Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
2087     ///         Err(e) => return Err(e),
2088     ///     }
2089     ///
2090     ///     time::sleep(Duration::from_millis(50)).await;
2091     /// };
2092     ///
2093     /// // use the connected client.
2094     /// # Ok(()) }
2095     /// ```
open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient>2096     pub fn open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient> {
2097         // Safety: We're calling open_with_security_attributes_raw w/ a null
2098         // pointer which disables it.
2099         unsafe { self.open_with_security_attributes_raw(addr, ptr::null_mut()) }
2100     }
2101 
2102     /// Opens the named pipe identified by `addr`.
2103     ///
2104     /// This is the same as [`open`] except that it supports providing the raw
2105     /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2106     /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2107     ///
2108     /// # Safety
2109     ///
2110     /// The `attrs` argument must either be null or point at a valid instance of
2111     /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2112     /// behavior is identical to calling the [`open`] method.
2113     ///
2114     /// [`open`]: ClientOptions::open
2115     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2116     /// [`SECURITY_ATTRIBUTES`]: crate::winapi::um::minwinbase::SECURITY_ATTRIBUTES
open_with_security_attributes_raw( &self, addr: impl AsRef<OsStr>, attrs: *mut c_void, ) -> io::Result<NamedPipeClient>2117     pub unsafe fn open_with_security_attributes_raw(
2118         &self,
2119         addr: impl AsRef<OsStr>,
2120         attrs: *mut c_void,
2121     ) -> io::Result<NamedPipeClient> {
2122         let addr = encode_addr(addr);
2123 
2124         // NB: We could use a platform specialized `OpenOptions` here, but since
2125         // we have access to winapi it ultimately doesn't hurt to use
2126         // `CreateFile` explicitly since it allows the use of our already
2127         // well-structured wide `addr` to pass into CreateFileW.
2128         let h = fileapi::CreateFileW(
2129             addr.as_ptr(),
2130             self.desired_access,
2131             0,
2132             attrs as *mut _,
2133             fileapi::OPEN_EXISTING,
2134             self.get_flags(),
2135             ptr::null_mut(),
2136         );
2137 
2138         if h == handleapi::INVALID_HANDLE_VALUE {
2139             return Err(io::Error::last_os_error());
2140         }
2141 
2142         NamedPipeClient::from_raw_handle(h)
2143     }
2144 
get_flags(&self) -> u322145     fn get_flags(&self) -> u32 {
2146         self.security_qos_flags | winbase::FILE_FLAG_OVERLAPPED
2147     }
2148 }
2149 
2150 /// The pipe mode of a named pipe.
2151 ///
2152 /// Set through [`ServerOptions::pipe_mode`].
2153 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2154 #[non_exhaustive]
2155 pub enum PipeMode {
2156     /// Data is written to the pipe as a stream of bytes. The pipe does not
2157     /// distinguish bytes written during different write operations.
2158     ///
2159     /// Corresponds to [`PIPE_TYPE_BYTE`][crate::winapi::um::winbase::PIPE_TYPE_BYTE].
2160     Byte,
2161     /// Data is written to the pipe as a stream of messages. The pipe treats the
2162     /// bytes written during each write operation as a message unit. Any reading
2163     /// on a named pipe returns [`ERROR_MORE_DATA`] when a message is not read
2164     /// completely.
2165     ///
2166     /// Corresponds to [`PIPE_TYPE_MESSAGE`][crate::winapi::um::winbase::PIPE_TYPE_MESSAGE].
2167     ///
2168     /// [`ERROR_MORE_DATA`]: crate::winapi::shared::winerror::ERROR_MORE_DATA
2169     Message,
2170 }
2171 
2172 /// Indicates the end of a named pipe.
2173 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2174 #[non_exhaustive]
2175 pub enum PipeEnd {
2176     /// The named pipe refers to the client end of a named pipe instance.
2177     ///
2178     /// Corresponds to [`PIPE_CLIENT_END`][crate::winapi::um::winbase::PIPE_CLIENT_END].
2179     Client,
2180     /// The named pipe refers to the server end of a named pipe instance.
2181     ///
2182     /// Corresponds to [`PIPE_SERVER_END`][crate::winapi::um::winbase::PIPE_SERVER_END].
2183     Server,
2184 }
2185 
2186 /// Information about a named pipe.
2187 ///
2188 /// Constructed through [`NamedPipeServer::info`] or [`NamedPipeClient::info`].
2189 #[derive(Debug)]
2190 #[non_exhaustive]
2191 pub struct PipeInfo {
2192     /// Indicates the mode of a named pipe.
2193     pub mode: PipeMode,
2194     /// Indicates the end of a named pipe.
2195     pub end: PipeEnd,
2196     /// The maximum number of instances that can be created for this pipe.
2197     pub max_instances: u32,
2198     /// The number of bytes to reserve for the output buffer.
2199     pub out_buffer_size: u32,
2200     /// The number of bytes to reserve for the input buffer.
2201     pub in_buffer_size: u32,
2202 }
2203 
2204 /// Encodes an address so that it is a null-terminated wide string.
encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]>2205 fn encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]> {
2206     let len = addr.as_ref().encode_wide().count();
2207     let mut vec = Vec::with_capacity(len + 1);
2208     vec.extend(addr.as_ref().encode_wide());
2209     vec.push(0);
2210     vec.into_boxed_slice()
2211 }
2212 
2213 /// Internal function to get the info out of a raw named pipe.
named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo>2214 unsafe fn named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo> {
2215     let mut flags = 0;
2216     let mut out_buffer_size = 0;
2217     let mut in_buffer_size = 0;
2218     let mut max_instances = 0;
2219 
2220     let result = namedpipeapi::GetNamedPipeInfo(
2221         handle,
2222         &mut flags,
2223         &mut out_buffer_size,
2224         &mut in_buffer_size,
2225         &mut max_instances,
2226     );
2227 
2228     if result == FALSE {
2229         return Err(io::Error::last_os_error());
2230     }
2231 
2232     let mut end = PipeEnd::Client;
2233     let mut mode = PipeMode::Byte;
2234 
2235     if flags & winbase::PIPE_SERVER_END != 0 {
2236         end = PipeEnd::Server;
2237     }
2238 
2239     if flags & winbase::PIPE_TYPE_MESSAGE != 0 {
2240         mode = PipeMode::Message;
2241     }
2242 
2243     Ok(PipeInfo {
2244         end,
2245         mode,
2246         out_buffer_size,
2247         in_buffer_size,
2248         max_instances,
2249     })
2250 }
2251