1 //! Tokio support for [Windows named pipes].
2 //!
3 //! [Windows named pipes]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
4
5 use std::ffi::c_void;
6 use std::ffi::OsStr;
7 use std::io::{self, Read, Write};
8 use std::pin::Pin;
9 use std::ptr;
10 use std::task::{Context, Poll};
11
12 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
13 use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
14
15 // Hide imports which are not used when generating documentation.
16 #[cfg(not(docsrs))]
17 mod doc {
18 pub(super) use crate::os::windows::ffi::OsStrExt;
19 pub(super) use crate::winapi::shared::minwindef::{DWORD, FALSE};
20 pub(super) use crate::winapi::um::fileapi;
21 pub(super) use crate::winapi::um::handleapi;
22 pub(super) use crate::winapi::um::namedpipeapi;
23 pub(super) use crate::winapi::um::winbase;
24 pub(super) use crate::winapi::um::winnt;
25
26 pub(super) use mio::windows as mio_windows;
27 }
28
29 // NB: none of these shows up in public API, so don't document them.
30 #[cfg(docsrs)]
31 mod doc {
32 pub type DWORD = crate::doc::NotDefinedHere;
33
34 pub(super) mod mio_windows {
35 pub type NamedPipe = crate::doc::NotDefinedHere;
36 }
37 }
38
39 use self::doc::*;
40
41 /// A [Windows named pipe] server.
42 ///
43 /// Accepting client connections involves creating a server with
44 /// [`ServerOptions::create`] and waiting for clients to connect using
45 /// [`NamedPipeServer::connect`].
46 ///
47 /// To avoid having clients sporadically fail with
48 /// [`std::io::ErrorKind::NotFound`] when they connect to a server, we must
49 /// ensure that at least one server instance is available at all times. This
50 /// means that the typical listen loop for a server is a bit involved, because
51 /// we have to ensure that we never drop a server accidentally while a client
52 /// might connect.
53 ///
54 /// So a correctly implemented server looks like this:
55 ///
56 /// ```no_run
57 /// use std::io;
58 /// use tokio::net::windows::named_pipe::ServerOptions;
59 ///
60 /// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-server";
61 ///
62 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
63 /// // The first server needs to be constructed early so that clients can
64 /// // be correctly connected. Otherwise calling .wait will cause the client to
65 /// // error.
66 /// //
67 /// // Here we also make use of `first_pipe_instance`, which will ensure that
68 /// // there are no other servers up and running already.
69 /// let mut server = ServerOptions::new()
70 /// .first_pipe_instance(true)
71 /// .create(PIPE_NAME)?;
72 ///
73 /// // Spawn the server loop.
74 /// let server = tokio::spawn(async move {
75 /// loop {
76 /// // Wait for a client to connect.
77 /// let connected = server.connect().await?;
78 ///
79 /// // Construct the next server to be connected before sending the one
80 /// // we already have of onto a task. This ensures that the server
81 /// // isn't closed (after it's done in the task) before a new one is
82 /// // available. Otherwise the client might error with
83 /// // `io::ErrorKind::NotFound`.
84 /// server = ServerOptions::new().create(PIPE_NAME)?;
85 ///
86 /// let client = tokio::spawn(async move {
87 /// /* use the connected client */
88 /// # Ok::<_, std::io::Error>(())
89 /// });
90 /// # if true { break } // needed for type inference to work
91 /// }
92 ///
93 /// Ok::<_, io::Error>(())
94 /// });
95 ///
96 /// /* do something else not server related here */
97 /// # Ok(()) }
98 /// ```
99 ///
100 /// [`ERROR_PIPE_BUSY`]: crate::winapi::shared::winerror::ERROR_PIPE_BUSY
101 /// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
102 #[derive(Debug)]
103 pub struct NamedPipeServer {
104 io: PollEvented<mio_windows::NamedPipe>,
105 }
106
107 impl NamedPipeServer {
108 /// Constructs a new named pipe server from the specified raw handle.
109 ///
110 /// This function will consume ownership of the handle given, passing
111 /// responsibility for closing the handle to the returned object.
112 ///
113 /// This function is also unsafe as the primitives currently returned have
114 /// the contract that they are the sole owner of the file descriptor they
115 /// are wrapping. Usage of this function could accidentally allow violating
116 /// this contract which can cause memory unsafety in code that relies on it
117 /// being true.
118 ///
119 /// # Errors
120 ///
121 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
122 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
123 ///
124 /// [Tokio Runtime]: crate::runtime::Runtime
125 /// [enabled I/O]: crate::runtime::Builder::enable_io
from_raw_handle(handle: RawHandle) -> io::Result<Self>126 pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
127 let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
128
129 Ok(Self {
130 io: PollEvented::new(named_pipe)?,
131 })
132 }
133
134 /// Retrieves information about the named pipe the server is associated
135 /// with.
136 ///
137 /// ```no_run
138 /// use tokio::net::windows::named_pipe::{PipeEnd, PipeMode, ServerOptions};
139 ///
140 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-info";
141 ///
142 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
143 /// let server = ServerOptions::new()
144 /// .pipe_mode(PipeMode::Message)
145 /// .max_instances(5)
146 /// .create(PIPE_NAME)?;
147 ///
148 /// let server_info = server.info()?;
149 ///
150 /// assert_eq!(server_info.end, PipeEnd::Server);
151 /// assert_eq!(server_info.mode, PipeMode::Message);
152 /// assert_eq!(server_info.max_instances, 5);
153 /// # Ok(()) }
154 /// ```
info(&self) -> io::Result<PipeInfo>155 pub fn info(&self) -> io::Result<PipeInfo> {
156 // Safety: we're ensuring the lifetime of the named pipe.
157 unsafe { named_pipe_info(self.io.as_raw_handle()) }
158 }
159
160 /// Enables a named pipe server process to wait for a client process to
161 /// connect to an instance of a named pipe. A client process connects by
162 /// creating a named pipe with the same name.
163 ///
164 /// This corresponds to the [`ConnectNamedPipe`] system call.
165 ///
166 /// # Cancel safety
167 ///
168 /// This method is cancellation safe in the sense that if it is used as the
169 /// event in a [`select!`](crate::select) statement and some other branch
170 /// completes first, then no connection events have been lost.
171 ///
172 /// [`ConnectNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe
173 ///
174 /// # Example
175 ///
176 /// ```no_run
177 /// use tokio::net::windows::named_pipe::ServerOptions;
178 ///
179 /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
180 ///
181 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
182 /// let pipe = ServerOptions::new().create(PIPE_NAME)?;
183 ///
184 /// // Wait for a client to connect.
185 /// pipe.connect().await?;
186 ///
187 /// // Use the connected client...
188 /// # Ok(()) }
189 /// ```
connect(&self) -> io::Result<()>190 pub async fn connect(&self) -> io::Result<()> {
191 loop {
192 match self.io.connect() {
193 Ok(()) => break,
194 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
195 self.io.registration().readiness(Interest::WRITABLE).await?;
196 }
197 Err(e) => return Err(e),
198 }
199 }
200
201 Ok(())
202 }
203
204 /// Disconnects the server end of a named pipe instance from a client
205 /// process.
206 ///
207 /// ```
208 /// use tokio::io::AsyncWriteExt;
209 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
210 /// use winapi::shared::winerror;
211 ///
212 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-disconnect";
213 ///
214 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
215 /// let server = ServerOptions::new()
216 /// .create(PIPE_NAME)?;
217 ///
218 /// let mut client = ClientOptions::new()
219 /// .open(PIPE_NAME)?;
220 ///
221 /// // Wait for a client to become connected.
222 /// server.connect().await?;
223 ///
224 /// // Forcibly disconnect the client.
225 /// server.disconnect()?;
226 ///
227 /// // Write fails with an OS-specific error after client has been
228 /// // disconnected.
229 /// let e = client.write(b"ping").await.unwrap_err();
230 /// assert_eq!(e.raw_os_error(), Some(winerror::ERROR_PIPE_NOT_CONNECTED as i32));
231 /// # Ok(()) }
232 /// ```
disconnect(&self) -> io::Result<()>233 pub fn disconnect(&self) -> io::Result<()> {
234 self.io.disconnect()
235 }
236
237 /// Waits for any of the requested ready states.
238 ///
239 /// This function is usually paired with `try_read()` or `try_write()`. It
240 /// can be used to concurrently read / write to the same pipe on a single
241 /// task without splitting the pipe.
242 ///
243 /// # Examples
244 ///
245 /// Concurrently read and write to the pipe on the same task without
246 /// splitting.
247 ///
248 /// ```no_run
249 /// use tokio::io::Interest;
250 /// use tokio::net::windows::named_pipe;
251 /// use std::error::Error;
252 /// use std::io;
253 ///
254 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-ready";
255 ///
256 /// #[tokio::main]
257 /// async fn main() -> Result<(), Box<dyn Error>> {
258 /// let server = named_pipe::ServerOptions::new()
259 /// .create(PIPE_NAME)?;
260 ///
261 /// loop {
262 /// let ready = server.ready(Interest::READABLE | Interest::WRITABLE).await?;
263 ///
264 /// if ready.is_readable() {
265 /// let mut data = vec![0; 1024];
266 /// // Try to read data, this may still fail with `WouldBlock`
267 /// // if the readiness event is a false positive.
268 /// match server.try_read(&mut data) {
269 /// Ok(n) => {
270 /// println!("read {} bytes", n);
271 /// }
272 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
273 /// continue;
274 /// }
275 /// Err(e) => {
276 /// return Err(e.into());
277 /// }
278 /// }
279 /// }
280 ///
281 /// if ready.is_writable() {
282 /// // Try to write data, this may still fail with `WouldBlock`
283 /// // if the readiness event is a false positive.
284 /// match server.try_write(b"hello world") {
285 /// Ok(n) => {
286 /// println!("write {} bytes", n);
287 /// }
288 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
289 /// continue;
290 /// }
291 /// Err(e) => {
292 /// return Err(e.into());
293 /// }
294 /// }
295 /// }
296 /// }
297 /// }
298 /// ```
ready(&self, interest: Interest) -> io::Result<Ready>299 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
300 let event = self.io.registration().readiness(interest).await?;
301 Ok(event.ready)
302 }
303
304 /// Waits for the pipe to become readable.
305 ///
306 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
307 /// paired with `try_read()`.
308 ///
309 /// # Examples
310 ///
311 /// ```no_run
312 /// use tokio::net::windows::named_pipe;
313 /// use std::error::Error;
314 /// use std::io;
315 ///
316 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-readable";
317 ///
318 /// #[tokio::main]
319 /// async fn main() -> Result<(), Box<dyn Error>> {
320 /// let server = named_pipe::ServerOptions::new()
321 /// .create(PIPE_NAME)?;
322 ///
323 /// let mut msg = vec![0; 1024];
324 ///
325 /// loop {
326 /// // Wait for the pipe to be readable
327 /// server.readable().await?;
328 ///
329 /// // Try to read data, this may still fail with `WouldBlock`
330 /// // if the readiness event is a false positive.
331 /// match server.try_read(&mut msg) {
332 /// Ok(n) => {
333 /// msg.truncate(n);
334 /// break;
335 /// }
336 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
337 /// continue;
338 /// }
339 /// Err(e) => {
340 /// return Err(e.into());
341 /// }
342 /// }
343 /// }
344 ///
345 /// println!("GOT = {:?}", msg);
346 /// Ok(())
347 /// }
348 /// ```
readable(&self) -> io::Result<()>349 pub async fn readable(&self) -> io::Result<()> {
350 self.ready(Interest::READABLE).await?;
351 Ok(())
352 }
353
354 /// Polls for read readiness.
355 ///
356 /// If the pipe is not currently ready for reading, this method will
357 /// store a clone of the `Waker` from the provided `Context`. When the pipe
358 /// becomes ready for reading, `Waker::wake` will be called on the waker.
359 ///
360 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
361 /// the `Waker` from the `Context` passed to the most recent call is
362 /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
363 /// second, independent waker.)
364 ///
365 /// This function is intended for cases where creating and pinning a future
366 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
367 /// preferred, as this supports polling from multiple tasks at once.
368 ///
369 /// # Return value
370 ///
371 /// The function returns:
372 ///
373 /// * `Poll::Pending` if the pipe is not ready for reading.
374 /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
375 /// * `Poll::Ready(Err(e))` if an error is encountered.
376 ///
377 /// # Errors
378 ///
379 /// This function may encounter any standard I/O error except `WouldBlock`.
380 ///
381 /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>382 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
383 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
384 }
385
386 /// Tries to read data from the pipe into the provided buffer, returning how
387 /// many bytes were read.
388 ///
389 /// Receives any pending data from the pipe but does not wait for new data
390 /// to arrive. On success, returns the number of bytes read. Because
391 /// `try_read()` is non-blocking, the buffer does not have to be stored by
392 /// the async task and can exist entirely on the stack.
393 ///
394 /// Usually, [`readable()`] or [`ready()`] is used with this function.
395 ///
396 /// [`readable()`]: NamedPipeServer::readable()
397 /// [`ready()`]: NamedPipeServer::ready()
398 ///
399 /// # Return
400 ///
401 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
402 /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
403 /// and will no longer yield data. If the pipe is not ready to read data
404 /// `Err(io::ErrorKind::WouldBlock)` is returned.
405 ///
406 /// # Examples
407 ///
408 /// ```no_run
409 /// use tokio::net::windows::named_pipe;
410 /// use std::error::Error;
411 /// use std::io;
412 ///
413 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read";
414 ///
415 /// #[tokio::main]
416 /// async fn main() -> Result<(), Box<dyn Error>> {
417 /// let server = named_pipe::ServerOptions::new()
418 /// .create(PIPE_NAME)?;
419 ///
420 /// loop {
421 /// // Wait for the pipe to be readable
422 /// server.readable().await?;
423 ///
424 /// // Creating the buffer **after** the `await` prevents it from
425 /// // being stored in the async task.
426 /// let mut buf = [0; 4096];
427 ///
428 /// // Try to read data, this may still fail with `WouldBlock`
429 /// // if the readiness event is a false positive.
430 /// match server.try_read(&mut buf) {
431 /// Ok(0) => break,
432 /// Ok(n) => {
433 /// println!("read {} bytes", n);
434 /// }
435 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
436 /// continue;
437 /// }
438 /// Err(e) => {
439 /// return Err(e.into());
440 /// }
441 /// }
442 /// }
443 ///
444 /// Ok(())
445 /// }
446 /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>447 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
448 self.io
449 .registration()
450 .try_io(Interest::READABLE, || (&*self.io).read(buf))
451 }
452
453 /// Tries to read data from the pipe into the provided buffers, returning
454 /// how many bytes were read.
455 ///
456 /// Data is copied to fill each buffer in order, with the final buffer
457 /// written to possibly being only partially filled. This method behaves
458 /// equivalently to a single call to [`try_read()`] with concatenated
459 /// buffers.
460 ///
461 /// Receives any pending data from the pipe but does not wait for new data
462 /// to arrive. On success, returns the number of bytes read. Because
463 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
464 /// stored by the async task and can exist entirely on the stack.
465 ///
466 /// Usually, [`readable()`] or [`ready()`] is used with this function.
467 ///
468 /// [`try_read()`]: NamedPipeServer::try_read()
469 /// [`readable()`]: NamedPipeServer::readable()
470 /// [`ready()`]: NamedPipeServer::ready()
471 ///
472 /// # Return
473 ///
474 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
475 /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
476 /// and will no longer yield data. If the pipe is not ready to read data
477 /// `Err(io::ErrorKind::WouldBlock)` is returned.
478 ///
479 /// # Examples
480 ///
481 /// ```no_run
482 /// use tokio::net::windows::named_pipe;
483 /// use std::error::Error;
484 /// use std::io::{self, IoSliceMut};
485 ///
486 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read-vectored";
487 ///
488 /// #[tokio::main]
489 /// async fn main() -> Result<(), Box<dyn Error>> {
490 /// let server = named_pipe::ServerOptions::new()
491 /// .create(PIPE_NAME)?;
492 ///
493 /// loop {
494 /// // Wait for the pipe to be readable
495 /// server.readable().await?;
496 ///
497 /// // Creating the buffer **after** the `await` prevents it from
498 /// // being stored in the async task.
499 /// let mut buf_a = [0; 512];
500 /// let mut buf_b = [0; 1024];
501 /// let mut bufs = [
502 /// IoSliceMut::new(&mut buf_a),
503 /// IoSliceMut::new(&mut buf_b),
504 /// ];
505 ///
506 /// // Try to read data, this may still fail with `WouldBlock`
507 /// // if the readiness event is a false positive.
508 /// match server.try_read_vectored(&mut bufs) {
509 /// Ok(0) => break,
510 /// Ok(n) => {
511 /// println!("read {} bytes", n);
512 /// }
513 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
514 /// continue;
515 /// }
516 /// Err(e) => {
517 /// return Err(e.into());
518 /// }
519 /// }
520 /// }
521 ///
522 /// Ok(())
523 /// }
524 /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>525 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
526 self.io
527 .registration()
528 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
529 }
530
531 /// Waits for the pipe to become writable.
532 ///
533 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
534 /// paired with `try_write()`.
535 ///
536 /// # Examples
537 ///
538 /// ```no_run
539 /// use tokio::net::windows::named_pipe;
540 /// use std::error::Error;
541 /// use std::io;
542 ///
543 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-writable";
544 ///
545 /// #[tokio::main]
546 /// async fn main() -> Result<(), Box<dyn Error>> {
547 /// let server = named_pipe::ServerOptions::new()
548 /// .create(PIPE_NAME)?;
549 ///
550 /// loop {
551 /// // Wait for the pipe to be writable
552 /// server.writable().await?;
553 ///
554 /// // Try to write data, this may still fail with `WouldBlock`
555 /// // if the readiness event is a false positive.
556 /// match server.try_write(b"hello world") {
557 /// Ok(n) => {
558 /// break;
559 /// }
560 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
561 /// continue;
562 /// }
563 /// Err(e) => {
564 /// return Err(e.into());
565 /// }
566 /// }
567 /// }
568 ///
569 /// Ok(())
570 /// }
571 /// ```
writable(&self) -> io::Result<()>572 pub async fn writable(&self) -> io::Result<()> {
573 self.ready(Interest::WRITABLE).await?;
574 Ok(())
575 }
576
577 /// Polls for write readiness.
578 ///
579 /// If the pipe is not currently ready for writing, this method will
580 /// store a clone of the `Waker` from the provided `Context`. When the pipe
581 /// becomes ready for writing, `Waker::wake` will be called on the waker.
582 ///
583 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
584 /// the `Waker` from the `Context` passed to the most recent call is
585 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
586 /// second, independent waker.)
587 ///
588 /// This function is intended for cases where creating and pinning a future
589 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
590 /// preferred, as this supports polling from multiple tasks at once.
591 ///
592 /// # Return value
593 ///
594 /// The function returns:
595 ///
596 /// * `Poll::Pending` if the pipe is not ready for writing.
597 /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
598 /// * `Poll::Ready(Err(e))` if an error is encountered.
599 ///
600 /// # Errors
601 ///
602 /// This function may encounter any standard I/O error except `WouldBlock`.
603 ///
604 /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>605 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
606 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
607 }
608
609 /// Tries to write a buffer to the pipe, returning how many bytes were
610 /// written.
611 ///
612 /// The function will attempt to write the entire contents of `buf`, but
613 /// only part of the buffer may be written.
614 ///
615 /// This function is usually paired with `writable()`.
616 ///
617 /// # Return
618 ///
619 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
620 /// number of bytes written. If the pipe is not ready to write data,
621 /// `Err(io::ErrorKind::WouldBlock)` is returned.
622 ///
623 /// # Examples
624 ///
625 /// ```no_run
626 /// use tokio::net::windows::named_pipe;
627 /// use std::error::Error;
628 /// use std::io;
629 ///
630 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write";
631 ///
632 /// #[tokio::main]
633 /// async fn main() -> Result<(), Box<dyn Error>> {
634 /// let server = named_pipe::ServerOptions::new()
635 /// .create(PIPE_NAME)?;
636 ///
637 /// loop {
638 /// // Wait for the pipe to be writable
639 /// server.writable().await?;
640 ///
641 /// // Try to write data, this may still fail with `WouldBlock`
642 /// // if the readiness event is a false positive.
643 /// match server.try_write(b"hello world") {
644 /// Ok(n) => {
645 /// break;
646 /// }
647 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
648 /// continue;
649 /// }
650 /// Err(e) => {
651 /// return Err(e.into());
652 /// }
653 /// }
654 /// }
655 ///
656 /// Ok(())
657 /// }
658 /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>659 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
660 self.io
661 .registration()
662 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
663 }
664
665 /// Tries to write several buffers to the pipe, returning how many bytes
666 /// were written.
667 ///
668 /// Data is written from each buffer in order, with the final buffer read
669 /// from possible being only partially consumed. This method behaves
670 /// equivalently to a single call to [`try_write()`] with concatenated
671 /// buffers.
672 ///
673 /// This function is usually paired with `writable()`.
674 ///
675 /// [`try_write()`]: NamedPipeServer::try_write()
676 ///
677 /// # Return
678 ///
679 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
680 /// number of bytes written. If the pipe is not ready to write data,
681 /// `Err(io::ErrorKind::WouldBlock)` is returned.
682 ///
683 /// # Examples
684 ///
685 /// ```no_run
686 /// use tokio::net::windows::named_pipe;
687 /// use std::error::Error;
688 /// use std::io;
689 ///
690 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write-vectored";
691 ///
692 /// #[tokio::main]
693 /// async fn main() -> Result<(), Box<dyn Error>> {
694 /// let server = named_pipe::ServerOptions::new()
695 /// .create(PIPE_NAME)?;
696 ///
697 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
698 ///
699 /// loop {
700 /// // Wait for the pipe to be writable
701 /// server.writable().await?;
702 ///
703 /// // Try to write data, this may still fail with `WouldBlock`
704 /// // if the readiness event is a false positive.
705 /// match server.try_write_vectored(&bufs) {
706 /// Ok(n) => {
707 /// break;
708 /// }
709 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
710 /// continue;
711 /// }
712 /// Err(e) => {
713 /// return Err(e.into());
714 /// }
715 /// }
716 /// }
717 ///
718 /// Ok(())
719 /// }
720 /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>721 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
722 self.io
723 .registration()
724 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
725 }
726
727 /// Tries to read or write from the socket using a user-provided IO operation.
728 ///
729 /// If the socket is ready, the provided closure is called. The closure
730 /// should attempt to perform IO operation from the socket by manually
731 /// calling the appropriate syscall. If the operation fails because the
732 /// socket is not actually ready, then the closure should return a
733 /// `WouldBlock` error and the readiness flag is cleared. The return value
734 /// of the closure is then returned by `try_io`.
735 ///
736 /// If the socket is not ready, then the closure is not called
737 /// and a `WouldBlock` error is returned.
738 ///
739 /// The closure should only return a `WouldBlock` error if it has performed
740 /// an IO operation on the socket that failed due to the socket not being
741 /// ready. Returning a `WouldBlock` error in any other situation will
742 /// incorrectly clear the readiness flag, which can cause the socket to
743 /// behave incorrectly.
744 ///
745 /// The closure should not perform the IO operation using any of the
746 /// methods defined on the Tokio `NamedPipeServer` type, as this will mess with
747 /// the readiness flag and can cause the socket to behave incorrectly.
748 ///
749 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
750 ///
751 /// [`readable()`]: NamedPipeServer::readable()
752 /// [`writable()`]: NamedPipeServer::writable()
753 /// [`ready()`]: NamedPipeServer::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>754 pub fn try_io<R>(
755 &self,
756 interest: Interest,
757 f: impl FnOnce() -> io::Result<R>,
758 ) -> io::Result<R> {
759 self.io.registration().try_io(interest, f)
760 }
761 }
762
763 impl AsyncRead for NamedPipeServer {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>764 fn poll_read(
765 self: Pin<&mut Self>,
766 cx: &mut Context<'_>,
767 buf: &mut ReadBuf<'_>,
768 ) -> Poll<io::Result<()>> {
769 unsafe { self.io.poll_read(cx, buf) }
770 }
771 }
772
773 impl AsyncWrite for NamedPipeServer {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>774 fn poll_write(
775 self: Pin<&mut Self>,
776 cx: &mut Context<'_>,
777 buf: &[u8],
778 ) -> Poll<io::Result<usize>> {
779 self.io.poll_write(cx, buf)
780 }
781
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>782 fn poll_write_vectored(
783 self: Pin<&mut Self>,
784 cx: &mut Context<'_>,
785 bufs: &[io::IoSlice<'_>],
786 ) -> Poll<io::Result<usize>> {
787 self.io.poll_write_vectored(cx, bufs)
788 }
789
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>790 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
791 Poll::Ready(Ok(()))
792 }
793
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>794 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
795 self.poll_flush(cx)
796 }
797 }
798
799 impl AsRawHandle for NamedPipeServer {
as_raw_handle(&self) -> RawHandle800 fn as_raw_handle(&self) -> RawHandle {
801 self.io.as_raw_handle()
802 }
803 }
804
805 /// A [Windows named pipe] client.
806 ///
807 /// Constructed using [`ClientOptions::open`].
808 ///
809 /// Connecting a client correctly involves a few steps. When connecting through
810 /// [`ClientOptions::open`], it might error indicating one of two things:
811 ///
812 /// * [`std::io::ErrorKind::NotFound`] - There is no server available.
813 /// * [`ERROR_PIPE_BUSY`] - There is a server available, but it is busy. Sleep
814 /// for a while and try again.
815 ///
816 /// So a correctly implemented client looks like this:
817 ///
818 /// ```no_run
819 /// use std::time::Duration;
820 /// use tokio::net::windows::named_pipe::ClientOptions;
821 /// use tokio::time;
822 /// use winapi::shared::winerror;
823 ///
824 /// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-client";
825 ///
826 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
827 /// let client = loop {
828 /// match ClientOptions::new().open(PIPE_NAME) {
829 /// Ok(client) => break client,
830 /// Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
831 /// Err(e) => return Err(e),
832 /// }
833 ///
834 /// time::sleep(Duration::from_millis(50)).await;
835 /// };
836 ///
837 /// /* use the connected client */
838 /// # Ok(()) }
839 /// ```
840 ///
841 /// [`ERROR_PIPE_BUSY`]: crate::winapi::shared::winerror::ERROR_PIPE_BUSY
842 /// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
843 #[derive(Debug)]
844 pub struct NamedPipeClient {
845 io: PollEvented<mio_windows::NamedPipe>,
846 }
847
848 impl NamedPipeClient {
849 /// Constructs a new named pipe client from the specified raw handle.
850 ///
851 /// This function will consume ownership of the handle given, passing
852 /// responsibility for closing the handle to the returned object.
853 ///
854 /// This function is also unsafe as the primitives currently returned have
855 /// the contract that they are the sole owner of the file descriptor they
856 /// are wrapping. Usage of this function could accidentally allow violating
857 /// this contract which can cause memory unsafety in code that relies on it
858 /// being true.
859 ///
860 /// # Errors
861 ///
862 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
863 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
864 ///
865 /// [Tokio Runtime]: crate::runtime::Runtime
866 /// [enabled I/O]: crate::runtime::Builder::enable_io
from_raw_handle(handle: RawHandle) -> io::Result<Self>867 pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
868 let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
869
870 Ok(Self {
871 io: PollEvented::new(named_pipe)?,
872 })
873 }
874
875 /// Retrieves information about the named pipe the client is associated
876 /// with.
877 ///
878 /// ```no_run
879 /// use tokio::net::windows::named_pipe::{ClientOptions, PipeEnd, PipeMode};
880 ///
881 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-info";
882 ///
883 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
884 /// let client = ClientOptions::new()
885 /// .open(PIPE_NAME)?;
886 ///
887 /// let client_info = client.info()?;
888 ///
889 /// assert_eq!(client_info.end, PipeEnd::Client);
890 /// assert_eq!(client_info.mode, PipeMode::Message);
891 /// assert_eq!(client_info.max_instances, 5);
892 /// # Ok(()) }
893 /// ```
info(&self) -> io::Result<PipeInfo>894 pub fn info(&self) -> io::Result<PipeInfo> {
895 // Safety: we're ensuring the lifetime of the named pipe.
896 unsafe { named_pipe_info(self.io.as_raw_handle()) }
897 }
898
899 /// Waits for any of the requested ready states.
900 ///
901 /// This function is usually paired with `try_read()` or `try_write()`. It
902 /// can be used to concurrently read / write to the same pipe on a single
903 /// task without splitting the pipe.
904 ///
905 /// # Examples
906 ///
907 /// Concurrently read and write to the pipe on the same task without
908 /// splitting.
909 ///
910 /// ```no_run
911 /// use tokio::io::Interest;
912 /// use tokio::net::windows::named_pipe;
913 /// use std::error::Error;
914 /// use std::io;
915 ///
916 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-ready";
917 ///
918 /// #[tokio::main]
919 /// async fn main() -> Result<(), Box<dyn Error>> {
920 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
921 ///
922 /// loop {
923 /// let ready = client.ready(Interest::READABLE | Interest::WRITABLE).await?;
924 ///
925 /// if ready.is_readable() {
926 /// let mut data = vec![0; 1024];
927 /// // Try to read data, this may still fail with `WouldBlock`
928 /// // if the readiness event is a false positive.
929 /// match client.try_read(&mut data) {
930 /// Ok(n) => {
931 /// println!("read {} bytes", n);
932 /// }
933 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
934 /// continue;
935 /// }
936 /// Err(e) => {
937 /// return Err(e.into());
938 /// }
939 /// }
940 /// }
941 ///
942 /// if ready.is_writable() {
943 /// // Try to write data, this may still fail with `WouldBlock`
944 /// // if the readiness event is a false positive.
945 /// match client.try_write(b"hello world") {
946 /// Ok(n) => {
947 /// println!("write {} bytes", n);
948 /// }
949 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
950 /// continue;
951 /// }
952 /// Err(e) => {
953 /// return Err(e.into());
954 /// }
955 /// }
956 /// }
957 /// }
958 /// }
959 /// ```
ready(&self, interest: Interest) -> io::Result<Ready>960 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
961 let event = self.io.registration().readiness(interest).await?;
962 Ok(event.ready)
963 }
964
965 /// Waits for the pipe to become readable.
966 ///
967 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
968 /// paired with `try_read()`.
969 ///
970 /// # Examples
971 ///
972 /// ```no_run
973 /// use tokio::net::windows::named_pipe;
974 /// use std::error::Error;
975 /// use std::io;
976 ///
977 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
978 ///
979 /// #[tokio::main]
980 /// async fn main() -> Result<(), Box<dyn Error>> {
981 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
982 ///
983 /// let mut msg = vec![0; 1024];
984 ///
985 /// loop {
986 /// // Wait for the pipe to be readable
987 /// client.readable().await?;
988 ///
989 /// // Try to read data, this may still fail with `WouldBlock`
990 /// // if the readiness event is a false positive.
991 /// match client.try_read(&mut msg) {
992 /// Ok(n) => {
993 /// msg.truncate(n);
994 /// break;
995 /// }
996 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
997 /// continue;
998 /// }
999 /// Err(e) => {
1000 /// return Err(e.into());
1001 /// }
1002 /// }
1003 /// }
1004 ///
1005 /// println!("GOT = {:?}", msg);
1006 /// Ok(())
1007 /// }
1008 /// ```
readable(&self) -> io::Result<()>1009 pub async fn readable(&self) -> io::Result<()> {
1010 self.ready(Interest::READABLE).await?;
1011 Ok(())
1012 }
1013
1014 /// Polls for read readiness.
1015 ///
1016 /// If the pipe is not currently ready for reading, this method will
1017 /// store a clone of the `Waker` from the provided `Context`. When the pipe
1018 /// becomes ready for reading, `Waker::wake` will be called on the waker.
1019 ///
1020 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
1021 /// the `Waker` from the `Context` passed to the most recent call is
1022 /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
1023 /// second, independent waker.)
1024 ///
1025 /// This function is intended for cases where creating and pinning a future
1026 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
1027 /// preferred, as this supports polling from multiple tasks at once.
1028 ///
1029 /// # Return value
1030 ///
1031 /// The function returns:
1032 ///
1033 /// * `Poll::Pending` if the pipe is not ready for reading.
1034 /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
1035 /// * `Poll::Ready(Err(e))` if an error is encountered.
1036 ///
1037 /// # Errors
1038 ///
1039 /// This function may encounter any standard I/O error except `WouldBlock`.
1040 ///
1041 /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>1042 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1043 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
1044 }
1045
1046 /// Tries to read data from the pipe into the provided buffer, returning how
1047 /// many bytes were read.
1048 ///
1049 /// Receives any pending data from the pipe but does not wait for new data
1050 /// to arrive. On success, returns the number of bytes read. Because
1051 /// `try_read()` is non-blocking, the buffer does not have to be stored by
1052 /// the async task and can exist entirely on the stack.
1053 ///
1054 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1055 ///
1056 /// [`readable()`]: NamedPipeClient::readable()
1057 /// [`ready()`]: NamedPipeClient::ready()
1058 ///
1059 /// # Return
1060 ///
1061 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1062 /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
1063 /// and will no longer yield data. If the pipe is not ready to read data
1064 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1065 ///
1066 /// # Examples
1067 ///
1068 /// ```no_run
1069 /// use tokio::net::windows::named_pipe;
1070 /// use std::error::Error;
1071 /// use std::io;
1072 ///
1073 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read";
1074 ///
1075 /// #[tokio::main]
1076 /// async fn main() -> Result<(), Box<dyn Error>> {
1077 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1078 ///
1079 /// loop {
1080 /// // Wait for the pipe to be readable
1081 /// client.readable().await?;
1082 ///
1083 /// // Creating the buffer **after** the `await` prevents it from
1084 /// // being stored in the async task.
1085 /// let mut buf = [0; 4096];
1086 ///
1087 /// // Try to read data, this may still fail with `WouldBlock`
1088 /// // if the readiness event is a false positive.
1089 /// match client.try_read(&mut buf) {
1090 /// Ok(0) => break,
1091 /// Ok(n) => {
1092 /// println!("read {} bytes", n);
1093 /// }
1094 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1095 /// continue;
1096 /// }
1097 /// Err(e) => {
1098 /// return Err(e.into());
1099 /// }
1100 /// }
1101 /// }
1102 ///
1103 /// Ok(())
1104 /// }
1105 /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>1106 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
1107 self.io
1108 .registration()
1109 .try_io(Interest::READABLE, || (&*self.io).read(buf))
1110 }
1111
1112 /// Tries to read data from the pipe into the provided buffers, returning
1113 /// how many bytes were read.
1114 ///
1115 /// Data is copied to fill each buffer in order, with the final buffer
1116 /// written to possibly being only partially filled. This method behaves
1117 /// equivalently to a single call to [`try_read()`] with concatenated
1118 /// buffers.
1119 ///
1120 /// Receives any pending data from the pipe but does not wait for new data
1121 /// to arrive. On success, returns the number of bytes read. Because
1122 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1123 /// stored by the async task and can exist entirely on the stack.
1124 ///
1125 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1126 ///
1127 /// [`try_read()`]: NamedPipeClient::try_read()
1128 /// [`readable()`]: NamedPipeClient::readable()
1129 /// [`ready()`]: NamedPipeClient::ready()
1130 ///
1131 /// # Return
1132 ///
1133 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1134 /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
1135 /// and will no longer yield data. If the pipe is not ready to read data
1136 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1137 ///
1138 /// # Examples
1139 ///
1140 /// ```no_run
1141 /// use tokio::net::windows::named_pipe;
1142 /// use std::error::Error;
1143 /// use std::io::{self, IoSliceMut};
1144 ///
1145 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read-vectored";
1146 ///
1147 /// #[tokio::main]
1148 /// async fn main() -> Result<(), Box<dyn Error>> {
1149 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1150 ///
1151 /// loop {
1152 /// // Wait for the pipe to be readable
1153 /// client.readable().await?;
1154 ///
1155 /// // Creating the buffer **after** the `await` prevents it from
1156 /// // being stored in the async task.
1157 /// let mut buf_a = [0; 512];
1158 /// let mut buf_b = [0; 1024];
1159 /// let mut bufs = [
1160 /// IoSliceMut::new(&mut buf_a),
1161 /// IoSliceMut::new(&mut buf_b),
1162 /// ];
1163 ///
1164 /// // Try to read data, this may still fail with `WouldBlock`
1165 /// // if the readiness event is a false positive.
1166 /// match client.try_read_vectored(&mut bufs) {
1167 /// Ok(0) => break,
1168 /// Ok(n) => {
1169 /// println!("read {} bytes", n);
1170 /// }
1171 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1172 /// continue;
1173 /// }
1174 /// Err(e) => {
1175 /// return Err(e.into());
1176 /// }
1177 /// }
1178 /// }
1179 ///
1180 /// Ok(())
1181 /// }
1182 /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>1183 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1184 self.io
1185 .registration()
1186 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1187 }
1188
1189 /// Waits for the pipe to become writable.
1190 ///
1191 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
1192 /// paired with `try_write()`.
1193 ///
1194 /// # Examples
1195 ///
1196 /// ```no_run
1197 /// use tokio::net::windows::named_pipe;
1198 /// use std::error::Error;
1199 /// use std::io;
1200 ///
1201 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-writable";
1202 ///
1203 /// #[tokio::main]
1204 /// async fn main() -> Result<(), Box<dyn Error>> {
1205 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1206 ///
1207 /// loop {
1208 /// // Wait for the pipe to be writable
1209 /// client.writable().await?;
1210 ///
1211 /// // Try to write data, this may still fail with `WouldBlock`
1212 /// // if the readiness event is a false positive.
1213 /// match client.try_write(b"hello world") {
1214 /// Ok(n) => {
1215 /// break;
1216 /// }
1217 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1218 /// continue;
1219 /// }
1220 /// Err(e) => {
1221 /// return Err(e.into());
1222 /// }
1223 /// }
1224 /// }
1225 ///
1226 /// Ok(())
1227 /// }
1228 /// ```
writable(&self) -> io::Result<()>1229 pub async fn writable(&self) -> io::Result<()> {
1230 self.ready(Interest::WRITABLE).await?;
1231 Ok(())
1232 }
1233
1234 /// Polls for write readiness.
1235 ///
1236 /// If the pipe is not currently ready for writing, this method will
1237 /// store a clone of the `Waker` from the provided `Context`. When the pipe
1238 /// becomes ready for writing, `Waker::wake` will be called on the waker.
1239 ///
1240 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
1241 /// the `Waker` from the `Context` passed to the most recent call is
1242 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
1243 /// second, independent waker.)
1244 ///
1245 /// This function is intended for cases where creating and pinning a future
1246 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
1247 /// preferred, as this supports polling from multiple tasks at once.
1248 ///
1249 /// # Return value
1250 ///
1251 /// The function returns:
1252 ///
1253 /// * `Poll::Pending` if the pipe is not ready for writing.
1254 /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
1255 /// * `Poll::Ready(Err(e))` if an error is encountered.
1256 ///
1257 /// # Errors
1258 ///
1259 /// This function may encounter any standard I/O error except `WouldBlock`.
1260 ///
1261 /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>1262 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1263 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
1264 }
1265
1266 /// Tries to write a buffer to the pipe, returning how many bytes were
1267 /// written.
1268 ///
1269 /// The function will attempt to write the entire contents of `buf`, but
1270 /// only part of the buffer may be written.
1271 ///
1272 /// This function is usually paired with `writable()`.
1273 ///
1274 /// # Return
1275 ///
1276 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1277 /// number of bytes written. If the pipe is not ready to write data,
1278 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1279 ///
1280 /// # Examples
1281 ///
1282 /// ```no_run
1283 /// use tokio::net::windows::named_pipe;
1284 /// use std::error::Error;
1285 /// use std::io;
1286 ///
1287 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write";
1288 ///
1289 /// #[tokio::main]
1290 /// async fn main() -> Result<(), Box<dyn Error>> {
1291 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1292 ///
1293 /// loop {
1294 /// // Wait for the pipe to be writable
1295 /// client.writable().await?;
1296 ///
1297 /// // Try to write data, this may still fail with `WouldBlock`
1298 /// // if the readiness event is a false positive.
1299 /// match client.try_write(b"hello world") {
1300 /// Ok(n) => {
1301 /// break;
1302 /// }
1303 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1304 /// continue;
1305 /// }
1306 /// Err(e) => {
1307 /// return Err(e.into());
1308 /// }
1309 /// }
1310 /// }
1311 ///
1312 /// Ok(())
1313 /// }
1314 /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>1315 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
1316 self.io
1317 .registration()
1318 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
1319 }
1320
1321 /// Tries to write several buffers to the pipe, returning how many bytes
1322 /// were written.
1323 ///
1324 /// Data is written from each buffer in order, with the final buffer read
1325 /// from possible being only partially consumed. This method behaves
1326 /// equivalently to a single call to [`try_write()`] with concatenated
1327 /// buffers.
1328 ///
1329 /// This function is usually paired with `writable()`.
1330 ///
1331 /// [`try_write()`]: NamedPipeClient::try_write()
1332 ///
1333 /// # Return
1334 ///
1335 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1336 /// number of bytes written. If the pipe is not ready to write data,
1337 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1338 ///
1339 /// # Examples
1340 ///
1341 /// ```no_run
1342 /// use tokio::net::windows::named_pipe;
1343 /// use std::error::Error;
1344 /// use std::io;
1345 ///
1346 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write-vectored";
1347 ///
1348 /// #[tokio::main]
1349 /// async fn main() -> Result<(), Box<dyn Error>> {
1350 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1351 ///
1352 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
1353 ///
1354 /// loop {
1355 /// // Wait for the pipe to be writable
1356 /// client.writable().await?;
1357 ///
1358 /// // Try to write data, this may still fail with `WouldBlock`
1359 /// // if the readiness event is a false positive.
1360 /// match client.try_write_vectored(&bufs) {
1361 /// Ok(n) => {
1362 /// break;
1363 /// }
1364 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1365 /// continue;
1366 /// }
1367 /// Err(e) => {
1368 /// return Err(e.into());
1369 /// }
1370 /// }
1371 /// }
1372 ///
1373 /// Ok(())
1374 /// }
1375 /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>1376 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
1377 self.io
1378 .registration()
1379 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
1380 }
1381
1382 /// Tries to read or write from the socket using a user-provided IO operation.
1383 ///
1384 /// If the socket is ready, the provided closure is called. The closure
1385 /// should attempt to perform IO operation from the socket by manually
1386 /// calling the appropriate syscall. If the operation fails because the
1387 /// socket is not actually ready, then the closure should return a
1388 /// `WouldBlock` error and the readiness flag is cleared. The return value
1389 /// of the closure is then returned by `try_io`.
1390 ///
1391 /// If the socket is not ready, then the closure is not called
1392 /// and a `WouldBlock` error is returned.
1393 ///
1394 /// The closure should only return a `WouldBlock` error if it has performed
1395 /// an IO operation on the socket that failed due to the socket not being
1396 /// ready. Returning a `WouldBlock` error in any other situation will
1397 /// incorrectly clear the readiness flag, which can cause the socket to
1398 /// behave incorrectly.
1399 ///
1400 /// The closure should not perform the IO operation using any of the methods
1401 /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1402 /// readiness flag and can cause the socket to behave incorrectly.
1403 ///
1404 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1405 ///
1406 /// [`readable()`]: NamedPipeClient::readable()
1407 /// [`writable()`]: NamedPipeClient::writable()
1408 /// [`ready()`]: NamedPipeClient::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>1409 pub fn try_io<R>(
1410 &self,
1411 interest: Interest,
1412 f: impl FnOnce() -> io::Result<R>,
1413 ) -> io::Result<R> {
1414 self.io.registration().try_io(interest, f)
1415 }
1416 }
1417
1418 impl AsyncRead for NamedPipeClient {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1419 fn poll_read(
1420 self: Pin<&mut Self>,
1421 cx: &mut Context<'_>,
1422 buf: &mut ReadBuf<'_>,
1423 ) -> Poll<io::Result<()>> {
1424 unsafe { self.io.poll_read(cx, buf) }
1425 }
1426 }
1427
1428 impl AsyncWrite for NamedPipeClient {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1429 fn poll_write(
1430 self: Pin<&mut Self>,
1431 cx: &mut Context<'_>,
1432 buf: &[u8],
1433 ) -> Poll<io::Result<usize>> {
1434 self.io.poll_write(cx, buf)
1435 }
1436
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1437 fn poll_write_vectored(
1438 self: Pin<&mut Self>,
1439 cx: &mut Context<'_>,
1440 bufs: &[io::IoSlice<'_>],
1441 ) -> Poll<io::Result<usize>> {
1442 self.io.poll_write_vectored(cx, bufs)
1443 }
1444
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>1445 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1446 Poll::Ready(Ok(()))
1447 }
1448
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>1449 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1450 self.poll_flush(cx)
1451 }
1452 }
1453
1454 impl AsRawHandle for NamedPipeClient {
as_raw_handle(&self) -> RawHandle1455 fn as_raw_handle(&self) -> RawHandle {
1456 self.io.as_raw_handle()
1457 }
1458 }
1459
1460 // Helper to set a boolean flag as a bitfield.
1461 macro_rules! bool_flag {
1462 ($f:expr, $t:expr, $flag:expr) => {{
1463 let current = $f;
1464
1465 if $t {
1466 $f = current | $flag;
1467 } else {
1468 $f = current & !$flag;
1469 };
1470 }};
1471 }
1472
1473 /// A builder structure for construct a named pipe with named pipe-specific
1474 /// options. This is required to use for named pipe servers who wants to modify
1475 /// pipe-related options.
1476 ///
1477 /// See [`ServerOptions::create`].
1478 #[derive(Debug, Clone)]
1479 pub struct ServerOptions {
1480 open_mode: DWORD,
1481 pipe_mode: DWORD,
1482 max_instances: DWORD,
1483 out_buffer_size: DWORD,
1484 in_buffer_size: DWORD,
1485 default_timeout: DWORD,
1486 }
1487
1488 impl ServerOptions {
1489 /// Creates a new named pipe builder with the default settings.
1490 ///
1491 /// ```
1492 /// use tokio::net::windows::named_pipe::ServerOptions;
1493 ///
1494 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-new";
1495 ///
1496 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1497 /// let server = ServerOptions::new().create(PIPE_NAME)?;
1498 /// # Ok(()) }
1499 /// ```
new() -> ServerOptions1500 pub fn new() -> ServerOptions {
1501 ServerOptions {
1502 open_mode: winbase::PIPE_ACCESS_DUPLEX | winbase::FILE_FLAG_OVERLAPPED,
1503 pipe_mode: winbase::PIPE_TYPE_BYTE | winbase::PIPE_REJECT_REMOTE_CLIENTS,
1504 max_instances: winbase::PIPE_UNLIMITED_INSTANCES,
1505 out_buffer_size: 65536,
1506 in_buffer_size: 65536,
1507 default_timeout: 0,
1508 }
1509 }
1510
1511 /// The pipe mode.
1512 ///
1513 /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
1514 /// documentation of what each mode means.
1515 ///
1516 /// This corresponding to specifying [`dwPipeMode`].
1517 ///
1518 /// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self1519 pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
1520 self.pipe_mode = match pipe_mode {
1521 PipeMode::Byte => winbase::PIPE_TYPE_BYTE,
1522 PipeMode::Message => winbase::PIPE_TYPE_MESSAGE,
1523 };
1524
1525 self
1526 }
1527
1528 /// The flow of data in the pipe goes from client to server only.
1529 ///
1530 /// This corresponds to setting [`PIPE_ACCESS_INBOUND`].
1531 ///
1532 /// [`PIPE_ACCESS_INBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_inbound
1533 ///
1534 /// # Errors
1535 ///
1536 /// Server side prevents connecting by denying inbound access, client errors
1537 /// with [`std::io::ErrorKind::PermissionDenied`] when attempting to create
1538 /// the connection.
1539 ///
1540 /// ```
1541 /// use std::io;
1542 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1543 ///
1544 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err1";
1545 ///
1546 /// # #[tokio::main] async fn main() -> io::Result<()> {
1547 /// let _server = ServerOptions::new()
1548 /// .access_inbound(false)
1549 /// .create(PIPE_NAME)?;
1550 ///
1551 /// let e = ClientOptions::new()
1552 /// .open(PIPE_NAME)
1553 /// .unwrap_err();
1554 ///
1555 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1556 /// # Ok(()) }
1557 /// ```
1558 ///
1559 /// Disabling writing allows a client to connect, but errors with
1560 /// [`std::io::ErrorKind::PermissionDenied`] if a write is attempted.
1561 ///
1562 /// ```
1563 /// use std::io;
1564 /// use tokio::io::AsyncWriteExt;
1565 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1566 ///
1567 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err2";
1568 ///
1569 /// # #[tokio::main] async fn main() -> io::Result<()> {
1570 /// let server = ServerOptions::new()
1571 /// .access_inbound(false)
1572 /// .create(PIPE_NAME)?;
1573 ///
1574 /// let mut client = ClientOptions::new()
1575 /// .write(false)
1576 /// .open(PIPE_NAME)?;
1577 ///
1578 /// server.connect().await?;
1579 ///
1580 /// let e = client.write(b"ping").await.unwrap_err();
1581 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1582 /// # Ok(()) }
1583 /// ```
1584 ///
1585 /// # Examples
1586 ///
1587 /// A unidirectional named pipe that only supports server-to-client
1588 /// communication.
1589 ///
1590 /// ```
1591 /// use std::io;
1592 /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1593 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1594 ///
1595 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound";
1596 ///
1597 /// # #[tokio::main] async fn main() -> io::Result<()> {
1598 /// let mut server = ServerOptions::new()
1599 /// .access_inbound(false)
1600 /// .create(PIPE_NAME)?;
1601 ///
1602 /// let mut client = ClientOptions::new()
1603 /// .write(false)
1604 /// .open(PIPE_NAME)?;
1605 ///
1606 /// server.connect().await?;
1607 ///
1608 /// let write = server.write_all(b"ping");
1609 ///
1610 /// let mut buf = [0u8; 4];
1611 /// let read = client.read_exact(&mut buf);
1612 ///
1613 /// let ((), read) = tokio::try_join!(write, read)?;
1614 ///
1615 /// assert_eq!(read, 4);
1616 /// assert_eq!(&buf[..], b"ping");
1617 /// # Ok(()) }
1618 /// ```
access_inbound(&mut self, allowed: bool) -> &mut Self1619 pub fn access_inbound(&mut self, allowed: bool) -> &mut Self {
1620 bool_flag!(self.open_mode, allowed, winbase::PIPE_ACCESS_INBOUND);
1621 self
1622 }
1623
1624 /// The flow of data in the pipe goes from server to client only.
1625 ///
1626 /// This corresponds to setting [`PIPE_ACCESS_OUTBOUND`].
1627 ///
1628 /// [`PIPE_ACCESS_OUTBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_outbound
1629 ///
1630 /// # Errors
1631 ///
1632 /// Server side prevents connecting by denying outbound access, client
1633 /// errors with [`std::io::ErrorKind::PermissionDenied`] when attempting to
1634 /// create the connection.
1635 ///
1636 /// ```
1637 /// use std::io;
1638 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1639 ///
1640 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err1";
1641 ///
1642 /// # #[tokio::main] async fn main() -> io::Result<()> {
1643 /// let server = ServerOptions::new()
1644 /// .access_outbound(false)
1645 /// .create(PIPE_NAME)?;
1646 ///
1647 /// let e = ClientOptions::new()
1648 /// .open(PIPE_NAME)
1649 /// .unwrap_err();
1650 ///
1651 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1652 /// # Ok(()) }
1653 /// ```
1654 ///
1655 /// Disabling reading allows a client to connect, but attempting to read
1656 /// will error with [`std::io::ErrorKind::PermissionDenied`].
1657 ///
1658 /// ```
1659 /// use std::io;
1660 /// use tokio::io::AsyncReadExt;
1661 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1662 ///
1663 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err2";
1664 ///
1665 /// # #[tokio::main] async fn main() -> io::Result<()> {
1666 /// let server = ServerOptions::new()
1667 /// .access_outbound(false)
1668 /// .create(PIPE_NAME)?;
1669 ///
1670 /// let mut client = ClientOptions::new()
1671 /// .read(false)
1672 /// .open(PIPE_NAME)?;
1673 ///
1674 /// server.connect().await?;
1675 ///
1676 /// let mut buf = [0u8; 4];
1677 /// let e = client.read(&mut buf).await.unwrap_err();
1678 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1679 /// # Ok(()) }
1680 /// ```
1681 ///
1682 /// # Examples
1683 ///
1684 /// A unidirectional named pipe that only supports client-to-server
1685 /// communication.
1686 ///
1687 /// ```
1688 /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1689 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1690 ///
1691 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound";
1692 ///
1693 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1694 /// let mut server = ServerOptions::new()
1695 /// .access_outbound(false)
1696 /// .create(PIPE_NAME)?;
1697 ///
1698 /// let mut client = ClientOptions::new()
1699 /// .read(false)
1700 /// .open(PIPE_NAME)?;
1701 ///
1702 /// server.connect().await?;
1703 ///
1704 /// let write = client.write_all(b"ping");
1705 ///
1706 /// let mut buf = [0u8; 4];
1707 /// let read = server.read_exact(&mut buf);
1708 ///
1709 /// let ((), read) = tokio::try_join!(write, read)?;
1710 ///
1711 /// println!("done reading and writing");
1712 ///
1713 /// assert_eq!(read, 4);
1714 /// assert_eq!(&buf[..], b"ping");
1715 /// # Ok(()) }
1716 /// ```
access_outbound(&mut self, allowed: bool) -> &mut Self1717 pub fn access_outbound(&mut self, allowed: bool) -> &mut Self {
1718 bool_flag!(self.open_mode, allowed, winbase::PIPE_ACCESS_OUTBOUND);
1719 self
1720 }
1721
1722 /// If you attempt to create multiple instances of a pipe with this flag
1723 /// set, creation of the first server instance succeeds, but creation of any
1724 /// subsequent instances will fail with
1725 /// [`std::io::ErrorKind::PermissionDenied`].
1726 ///
1727 /// This option is intended to be used with servers that want to ensure that
1728 /// they are the only process listening for clients on a given named pipe.
1729 /// This is accomplished by enabling it for the first server instance
1730 /// created in a process.
1731 ///
1732 /// This corresponds to setting [`FILE_FLAG_FIRST_PIPE_INSTANCE`].
1733 ///
1734 /// # Errors
1735 ///
1736 /// If this option is set and more than one instance of the server for a
1737 /// given named pipe exists, calling [`create`] will fail with
1738 /// [`std::io::ErrorKind::PermissionDenied`].
1739 ///
1740 /// ```
1741 /// use std::io;
1742 /// use tokio::net::windows::named_pipe::ServerOptions;
1743 ///
1744 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance-error";
1745 ///
1746 /// # #[tokio::main] async fn main() -> io::Result<()> {
1747 /// let server1 = ServerOptions::new()
1748 /// .first_pipe_instance(true)
1749 /// .create(PIPE_NAME)?;
1750 ///
1751 /// // Second server errs, since it's not the first instance.
1752 /// let e = ServerOptions::new()
1753 /// .first_pipe_instance(true)
1754 /// .create(PIPE_NAME)
1755 /// .unwrap_err();
1756 ///
1757 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1758 /// # Ok(()) }
1759 /// ```
1760 ///
1761 /// # Examples
1762 ///
1763 /// ```
1764 /// use std::io;
1765 /// use tokio::net::windows::named_pipe::ServerOptions;
1766 ///
1767 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance";
1768 ///
1769 /// # #[tokio::main] async fn main() -> io::Result<()> {
1770 /// let mut builder = ServerOptions::new();
1771 /// builder.first_pipe_instance(true);
1772 ///
1773 /// let server = builder.create(PIPE_NAME)?;
1774 /// let e = builder.create(PIPE_NAME).unwrap_err();
1775 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1776 /// drop(server);
1777 ///
1778 /// // OK: since, we've closed the other instance.
1779 /// let _server2 = builder.create(PIPE_NAME)?;
1780 /// # Ok(()) }
1781 /// ```
1782 ///
1783 /// [`create`]: ServerOptions::create
1784 /// [`FILE_FLAG_FIRST_PIPE_INSTANCE`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_first_pipe_instance
first_pipe_instance(&mut self, first: bool) -> &mut Self1785 pub fn first_pipe_instance(&mut self, first: bool) -> &mut Self {
1786 bool_flag!(
1787 self.open_mode,
1788 first,
1789 winbase::FILE_FLAG_FIRST_PIPE_INSTANCE
1790 );
1791 self
1792 }
1793
1794 /// Indicates whether this server can accept remote clients or not. Remote
1795 /// clients are disabled by default.
1796 ///
1797 /// This corresponds to setting [`PIPE_REJECT_REMOTE_CLIENTS`].
1798 ///
1799 /// [`PIPE_REJECT_REMOTE_CLIENTS`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_reject_remote_clients
reject_remote_clients(&mut self, reject: bool) -> &mut Self1800 pub fn reject_remote_clients(&mut self, reject: bool) -> &mut Self {
1801 bool_flag!(self.pipe_mode, reject, winbase::PIPE_REJECT_REMOTE_CLIENTS);
1802 self
1803 }
1804
1805 /// The maximum number of instances that can be created for this pipe. The
1806 /// first instance of the pipe can specify this value; the same number must
1807 /// be specified for other instances of the pipe. Acceptable values are in
1808 /// the range 1 through 254. The default value is unlimited.
1809 ///
1810 /// This corresponds to specifying [`nMaxInstances`].
1811 ///
1812 /// [`nMaxInstances`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
1813 ///
1814 /// # Errors
1815 ///
1816 /// The same numbers of `max_instances` have to be used by all servers. Any
1817 /// additional servers trying to be built which uses a mismatching value
1818 /// might error.
1819 ///
1820 /// ```
1821 /// use std::io;
1822 /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
1823 /// use winapi::shared::winerror;
1824 ///
1825 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-max-instances";
1826 ///
1827 /// # #[tokio::main] async fn main() -> io::Result<()> {
1828 /// let mut server = ServerOptions::new();
1829 /// server.max_instances(2);
1830 ///
1831 /// let s1 = server.create(PIPE_NAME)?;
1832 /// let c1 = ClientOptions::new().open(PIPE_NAME);
1833 ///
1834 /// let s2 = server.create(PIPE_NAME)?;
1835 /// let c2 = ClientOptions::new().open(PIPE_NAME);
1836 ///
1837 /// // Too many servers!
1838 /// let e = server.create(PIPE_NAME).unwrap_err();
1839 /// assert_eq!(e.raw_os_error(), Some(winerror::ERROR_PIPE_BUSY as i32));
1840 ///
1841 /// // Still too many servers even if we specify a higher value!
1842 /// let e = server.max_instances(100).create(PIPE_NAME).unwrap_err();
1843 /// assert_eq!(e.raw_os_error(), Some(winerror::ERROR_PIPE_BUSY as i32));
1844 /// # Ok(()) }
1845 /// ```
1846 ///
1847 /// # Panics
1848 ///
1849 /// This function will panic if more than 254 instances are specified. If
1850 /// you do not wish to set an instance limit, leave it unspecified.
1851 ///
1852 /// ```should_panic
1853 /// use tokio::net::windows::named_pipe::ServerOptions;
1854 ///
1855 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1856 /// let builder = ServerOptions::new().max_instances(255);
1857 /// # Ok(()) }
1858 /// ```
max_instances(&mut self, instances: usize) -> &mut Self1859 pub fn max_instances(&mut self, instances: usize) -> &mut Self {
1860 assert!(instances < 255, "cannot specify more than 254 instances");
1861 self.max_instances = instances as DWORD;
1862 self
1863 }
1864
1865 /// The number of bytes to reserve for the output buffer.
1866 ///
1867 /// This corresponds to specifying [`nOutBufferSize`].
1868 ///
1869 /// [`nOutBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
out_buffer_size(&mut self, buffer: u32) -> &mut Self1870 pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
1871 self.out_buffer_size = buffer as DWORD;
1872 self
1873 }
1874
1875 /// The number of bytes to reserve for the input buffer.
1876 ///
1877 /// This corresponds to specifying [`nInBufferSize`].
1878 ///
1879 /// [`nInBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
in_buffer_size(&mut self, buffer: u32) -> &mut Self1880 pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
1881 self.in_buffer_size = buffer as DWORD;
1882 self
1883 }
1884
1885 /// Creates the named pipe identified by `addr` for use as a server.
1886 ///
1887 /// This uses the [`CreateNamedPipe`] function.
1888 ///
1889 /// [`CreateNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
1890 ///
1891 /// # Errors
1892 ///
1893 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
1894 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
1895 ///
1896 /// [Tokio Runtime]: crate::runtime::Runtime
1897 /// [enabled I/O]: crate::runtime::Builder::enable_io
1898 ///
1899 /// # Examples
1900 ///
1901 /// ```
1902 /// use tokio::net::windows::named_pipe::ServerOptions;
1903 ///
1904 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-create";
1905 ///
1906 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1907 /// let server = ServerOptions::new().create(PIPE_NAME)?;
1908 /// # Ok(()) }
1909 /// ```
create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer>1910 pub fn create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer> {
1911 // Safety: We're calling create_with_security_attributes_raw w/ a null
1912 // pointer which disables it.
1913 unsafe { self.create_with_security_attributes_raw(addr, ptr::null_mut()) }
1914 }
1915
1916 /// Creates the named pipe identified by `addr` for use as a server.
1917 ///
1918 /// This is the same as [`create`] except that it supports providing the raw
1919 /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
1920 /// as the `lpSecurityAttributes` argument to [`CreateFile`].
1921 ///
1922 /// # Errors
1923 ///
1924 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
1925 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
1926 ///
1927 /// [Tokio Runtime]: crate::runtime::Runtime
1928 /// [enabled I/O]: crate::runtime::Builder::enable_io
1929 ///
1930 /// # Safety
1931 ///
1932 /// The `attrs` argument must either be null or point at a valid instance of
1933 /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
1934 /// behavior is identical to calling the [`create`] method.
1935 ///
1936 /// [`create`]: ServerOptions::create
1937 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
1938 /// [`SECURITY_ATTRIBUTES`]: crate::winapi::um::minwinbase::SECURITY_ATTRIBUTES
create_with_security_attributes_raw( &self, addr: impl AsRef<OsStr>, attrs: *mut c_void, ) -> io::Result<NamedPipeServer>1939 pub unsafe fn create_with_security_attributes_raw(
1940 &self,
1941 addr: impl AsRef<OsStr>,
1942 attrs: *mut c_void,
1943 ) -> io::Result<NamedPipeServer> {
1944 let addr = encode_addr(addr);
1945
1946 let h = namedpipeapi::CreateNamedPipeW(
1947 addr.as_ptr(),
1948 self.open_mode,
1949 self.pipe_mode,
1950 self.max_instances,
1951 self.out_buffer_size,
1952 self.in_buffer_size,
1953 self.default_timeout,
1954 attrs as *mut _,
1955 );
1956
1957 if h == handleapi::INVALID_HANDLE_VALUE {
1958 return Err(io::Error::last_os_error());
1959 }
1960
1961 NamedPipeServer::from_raw_handle(h)
1962 }
1963 }
1964
1965 /// A builder suitable for building and interacting with named pipes from the
1966 /// client side.
1967 ///
1968 /// See [`ClientOptions::open`].
1969 #[derive(Debug, Clone)]
1970 pub struct ClientOptions {
1971 desired_access: DWORD,
1972 security_qos_flags: DWORD,
1973 }
1974
1975 impl ClientOptions {
1976 /// Creates a new named pipe builder with the default settings.
1977 ///
1978 /// ```
1979 /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
1980 ///
1981 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-new";
1982 ///
1983 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1984 /// // Server must be created in order for the client creation to succeed.
1985 /// let server = ServerOptions::new().create(PIPE_NAME)?;
1986 /// let client = ClientOptions::new().open(PIPE_NAME)?;
1987 /// # Ok(()) }
1988 /// ```
new() -> Self1989 pub fn new() -> Self {
1990 Self {
1991 desired_access: winnt::GENERIC_READ | winnt::GENERIC_WRITE,
1992 security_qos_flags: winbase::SECURITY_IDENTIFICATION | winbase::SECURITY_SQOS_PRESENT,
1993 }
1994 }
1995
1996 /// If the client supports reading data. This is enabled by default.
1997 ///
1998 /// This corresponds to setting [`GENERIC_READ`] in the call to [`CreateFile`].
1999 ///
2000 /// [`GENERIC_READ`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2001 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
read(&mut self, allowed: bool) -> &mut Self2002 pub fn read(&mut self, allowed: bool) -> &mut Self {
2003 bool_flag!(self.desired_access, allowed, winnt::GENERIC_READ);
2004 self
2005 }
2006
2007 /// If the created pipe supports writing data. This is enabled by default.
2008 ///
2009 /// This corresponds to setting [`GENERIC_WRITE`] in the call to [`CreateFile`].
2010 ///
2011 /// [`GENERIC_WRITE`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2012 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
write(&mut self, allowed: bool) -> &mut Self2013 pub fn write(&mut self, allowed: bool) -> &mut Self {
2014 bool_flag!(self.desired_access, allowed, winnt::GENERIC_WRITE);
2015 self
2016 }
2017
2018 /// Sets qos flags which are combined with other flags and attributes in the
2019 /// call to [`CreateFile`].
2020 ///
2021 /// By default `security_qos_flags` is set to [`SECURITY_IDENTIFICATION`],
2022 /// calling this function would override that value completely with the
2023 /// argument specified.
2024 ///
2025 /// When `security_qos_flags` is not set, a malicious program can gain the
2026 /// elevated privileges of a privileged Rust process when it allows opening
2027 /// user-specified paths, by tricking it into opening a named pipe. So
2028 /// arguably `security_qos_flags` should also be set when opening arbitrary
2029 /// paths. However the bits can then conflict with other flags, specifically
2030 /// `FILE_FLAG_OPEN_NO_RECALL`.
2031 ///
2032 /// For information about possible values, see [Impersonation Levels] on the
2033 /// Windows Dev Center site. The `SECURITY_SQOS_PRESENT` flag is set
2034 /// automatically when using this method.
2035 ///
2036 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2037 /// [`SECURITY_IDENTIFICATION`]: crate::winapi::um::winbase::SECURITY_IDENTIFICATION
2038 /// [Impersonation Levels]: https://docs.microsoft.com/en-us/windows/win32/api/winnt/ne-winnt-security_impersonation_level
security_qos_flags(&mut self, flags: u32) -> &mut Self2039 pub fn security_qos_flags(&mut self, flags: u32) -> &mut Self {
2040 // See: https://github.com/rust-lang/rust/pull/58216
2041 self.security_qos_flags = flags | winbase::SECURITY_SQOS_PRESENT;
2042 self
2043 }
2044
2045 /// Opens the named pipe identified by `addr`.
2046 ///
2047 /// This opens the client using [`CreateFile`] with the
2048 /// `dwCreationDisposition` option set to `OPEN_EXISTING`.
2049 ///
2050 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2051 ///
2052 /// # Errors
2053 ///
2054 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2055 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2056 ///
2057 /// There are a few errors you need to take into account when creating a
2058 /// named pipe on the client side:
2059 ///
2060 /// * [`std::io::ErrorKind::NotFound`] - This indicates that the named pipe
2061 /// does not exist. Presumably the server is not up.
2062 /// * [`ERROR_PIPE_BUSY`] - This error is raised when the named pipe exists,
2063 /// but the server is not currently waiting for a connection. Please see the
2064 /// examples for how to check for this error.
2065 ///
2066 /// [`ERROR_PIPE_BUSY`]: crate::winapi::shared::winerror::ERROR_PIPE_BUSY
2067 /// [`winapi`]: crate::winapi
2068 /// [enabled I/O]: crate::runtime::Builder::enable_io
2069 /// [Tokio Runtime]: crate::runtime::Runtime
2070 ///
2071 /// A connect loop that waits until a pipe becomes available looks like
2072 /// this:
2073 ///
2074 /// ```no_run
2075 /// use std::time::Duration;
2076 /// use tokio::net::windows::named_pipe::ClientOptions;
2077 /// use tokio::time;
2078 /// use winapi::shared::winerror;
2079 ///
2080 /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
2081 ///
2082 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2083 /// let client = loop {
2084 /// match ClientOptions::new().open(PIPE_NAME) {
2085 /// Ok(client) => break client,
2086 /// Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
2087 /// Err(e) => return Err(e),
2088 /// }
2089 ///
2090 /// time::sleep(Duration::from_millis(50)).await;
2091 /// };
2092 ///
2093 /// // use the connected client.
2094 /// # Ok(()) }
2095 /// ```
open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient>2096 pub fn open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient> {
2097 // Safety: We're calling open_with_security_attributes_raw w/ a null
2098 // pointer which disables it.
2099 unsafe { self.open_with_security_attributes_raw(addr, ptr::null_mut()) }
2100 }
2101
2102 /// Opens the named pipe identified by `addr`.
2103 ///
2104 /// This is the same as [`open`] except that it supports providing the raw
2105 /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2106 /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2107 ///
2108 /// # Safety
2109 ///
2110 /// The `attrs` argument must either be null or point at a valid instance of
2111 /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2112 /// behavior is identical to calling the [`open`] method.
2113 ///
2114 /// [`open`]: ClientOptions::open
2115 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2116 /// [`SECURITY_ATTRIBUTES`]: crate::winapi::um::minwinbase::SECURITY_ATTRIBUTES
open_with_security_attributes_raw( &self, addr: impl AsRef<OsStr>, attrs: *mut c_void, ) -> io::Result<NamedPipeClient>2117 pub unsafe fn open_with_security_attributes_raw(
2118 &self,
2119 addr: impl AsRef<OsStr>,
2120 attrs: *mut c_void,
2121 ) -> io::Result<NamedPipeClient> {
2122 let addr = encode_addr(addr);
2123
2124 // NB: We could use a platform specialized `OpenOptions` here, but since
2125 // we have access to winapi it ultimately doesn't hurt to use
2126 // `CreateFile` explicitly since it allows the use of our already
2127 // well-structured wide `addr` to pass into CreateFileW.
2128 let h = fileapi::CreateFileW(
2129 addr.as_ptr(),
2130 self.desired_access,
2131 0,
2132 attrs as *mut _,
2133 fileapi::OPEN_EXISTING,
2134 self.get_flags(),
2135 ptr::null_mut(),
2136 );
2137
2138 if h == handleapi::INVALID_HANDLE_VALUE {
2139 return Err(io::Error::last_os_error());
2140 }
2141
2142 NamedPipeClient::from_raw_handle(h)
2143 }
2144
get_flags(&self) -> u322145 fn get_flags(&self) -> u32 {
2146 self.security_qos_flags | winbase::FILE_FLAG_OVERLAPPED
2147 }
2148 }
2149
2150 /// The pipe mode of a named pipe.
2151 ///
2152 /// Set through [`ServerOptions::pipe_mode`].
2153 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2154 #[non_exhaustive]
2155 pub enum PipeMode {
2156 /// Data is written to the pipe as a stream of bytes. The pipe does not
2157 /// distinguish bytes written during different write operations.
2158 ///
2159 /// Corresponds to [`PIPE_TYPE_BYTE`][crate::winapi::um::winbase::PIPE_TYPE_BYTE].
2160 Byte,
2161 /// Data is written to the pipe as a stream of messages. The pipe treats the
2162 /// bytes written during each write operation as a message unit. Any reading
2163 /// on a named pipe returns [`ERROR_MORE_DATA`] when a message is not read
2164 /// completely.
2165 ///
2166 /// Corresponds to [`PIPE_TYPE_MESSAGE`][crate::winapi::um::winbase::PIPE_TYPE_MESSAGE].
2167 ///
2168 /// [`ERROR_MORE_DATA`]: crate::winapi::shared::winerror::ERROR_MORE_DATA
2169 Message,
2170 }
2171
2172 /// Indicates the end of a named pipe.
2173 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2174 #[non_exhaustive]
2175 pub enum PipeEnd {
2176 /// The named pipe refers to the client end of a named pipe instance.
2177 ///
2178 /// Corresponds to [`PIPE_CLIENT_END`][crate::winapi::um::winbase::PIPE_CLIENT_END].
2179 Client,
2180 /// The named pipe refers to the server end of a named pipe instance.
2181 ///
2182 /// Corresponds to [`PIPE_SERVER_END`][crate::winapi::um::winbase::PIPE_SERVER_END].
2183 Server,
2184 }
2185
2186 /// Information about a named pipe.
2187 ///
2188 /// Constructed through [`NamedPipeServer::info`] or [`NamedPipeClient::info`].
2189 #[derive(Debug)]
2190 #[non_exhaustive]
2191 pub struct PipeInfo {
2192 /// Indicates the mode of a named pipe.
2193 pub mode: PipeMode,
2194 /// Indicates the end of a named pipe.
2195 pub end: PipeEnd,
2196 /// The maximum number of instances that can be created for this pipe.
2197 pub max_instances: u32,
2198 /// The number of bytes to reserve for the output buffer.
2199 pub out_buffer_size: u32,
2200 /// The number of bytes to reserve for the input buffer.
2201 pub in_buffer_size: u32,
2202 }
2203
2204 /// Encodes an address so that it is a null-terminated wide string.
encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]>2205 fn encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]> {
2206 let len = addr.as_ref().encode_wide().count();
2207 let mut vec = Vec::with_capacity(len + 1);
2208 vec.extend(addr.as_ref().encode_wide());
2209 vec.push(0);
2210 vec.into_boxed_slice()
2211 }
2212
2213 /// Internal function to get the info out of a raw named pipe.
named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo>2214 unsafe fn named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo> {
2215 let mut flags = 0;
2216 let mut out_buffer_size = 0;
2217 let mut in_buffer_size = 0;
2218 let mut max_instances = 0;
2219
2220 let result = namedpipeapi::GetNamedPipeInfo(
2221 handle,
2222 &mut flags,
2223 &mut out_buffer_size,
2224 &mut in_buffer_size,
2225 &mut max_instances,
2226 );
2227
2228 if result == FALSE {
2229 return Err(io::Error::last_os_error());
2230 }
2231
2232 let mut end = PipeEnd::Client;
2233 let mut mode = PipeMode::Byte;
2234
2235 if flags & winbase::PIPE_SERVER_END != 0 {
2236 end = PipeEnd::Server;
2237 }
2238
2239 if flags & winbase::PIPE_TYPE_MESSAGE != 0 {
2240 mode = PipeMode::Message;
2241 }
2242
2243 Ok(PipeInfo {
2244 end,
2245 mode,
2246 out_buffer_size,
2247 in_buffer_size,
2248 max_instances,
2249 })
2250 }
2251