1 //! Named pipes
2
3 use std::cell::RefCell;
4 use std::ffi::OsStr;
5 use std::fs::{OpenOptions, File};
6 use std::io::prelude::*;
7 use std::io;
8 use std::os::windows::ffi::*;
9 use std::os::windows::io::*;
10 use std::time::Duration;
11
12 use winapi::shared::ntdef::HANDLE;
13 use winapi::shared::minwindef::*;
14 use winapi::shared::winerror::*;
15 use winapi::um::fileapi::*;
16 use winapi::um::handleapi::*;
17 use winapi::um::ioapiset::*;
18 use winapi::um::minwinbase::*;
19 use winapi::um::namedpipeapi::*;
20 use winapi::um::winbase::*;
21 use handle::Handle;
22 use overlapped::Overlapped;
23
24 /// Readable half of an anonymous pipe.
25 #[derive(Debug)]
26 pub struct AnonRead(Handle);
27
28 /// Writable half of an anonymous pipe.
29 #[derive(Debug)]
30 pub struct AnonWrite(Handle);
31
32 /// A named pipe that can accept connections.
33 #[derive(Debug)]
34 pub struct NamedPipe(Handle);
35
36 /// A builder structure for creating a new named pipe.
37 #[derive(Debug)]
38 pub struct NamedPipeBuilder {
39 name: Vec<u16>,
40 dwOpenMode: DWORD,
41 dwPipeMode: DWORD,
42 nMaxInstances: DWORD,
43 nOutBufferSize: DWORD,
44 nInBufferSize: DWORD,
45 nDefaultTimeOut: DWORD,
46 }
47
48 /// Creates a new anonymous in-memory pipe, returning the read/write ends of the
49 /// pipe.
50 ///
51 /// The buffer size for this pipe may also be specified, but the system will
52 /// normally use this as a suggestion and it's not guaranteed that the buffer
53 /// will be precisely this size.
54 pub fn anonymous(buffer_size: u32) -> io::Result<(AnonRead, AnonWrite)> {
55 let mut read = 0 as HANDLE;
56 let mut write = 0 as HANDLE;
57 try!(::cvt(unsafe {
58 CreatePipe(&mut read, &mut write, 0 as *mut _, buffer_size)
59 }));
60 Ok((AnonRead(Handle::new(read)), AnonWrite(Handle::new(write))))
61 }
62
63 impl Read for AnonRead {
isdigit_string(char * str)64 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
65 }
66 impl<'a> Read for &'a AnonRead {
67 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
68 }
69
70 impl AsRawHandle for AnonRead {
71 fn as_raw_handle(&self) -> HANDLE { self.0.raw() }
72 }
73 impl FromRawHandle for AnonRead {
74 unsafe fn from_raw_handle(handle: HANDLE) -> AnonRead {
HTS106_Label_initialize(HTS106_Label * label)75 AnonRead(Handle::new(handle))
76 }
77 }
78 impl IntoRawHandle for AnonRead {
79 fn into_raw_handle(self) -> HANDLE { self.0.into_raw() }
80 }
81
82 impl Write for AnonWrite {
83 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
HTS106_Label_check_time(HTS106_Label * label)84 fn flush(&mut self) -> io::Result<()> { Ok(()) }
85 }
86 impl<'a> Write for &'a AnonWrite {
87 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
88 fn flush(&mut self) -> io::Result<()> { Ok(()) }
89 }
90
91 impl AsRawHandle for AnonWrite {
92 fn as_raw_handle(&self) -> HANDLE { self.0.raw() }
93 }
94 impl FromRawHandle for AnonWrite {
95 unsafe fn from_raw_handle(handle: HANDLE) -> AnonWrite {
96 AnonWrite(Handle::new(handle))
97 }
98 }
99 impl IntoRawHandle for AnonWrite {
100 fn into_raw_handle(self) -> HANDLE { self.0.into_raw() }
101 }
102
103 /// A convenience function to connect to a named pipe.
104 ///
105 /// This function will block the calling process until it can connect to the
106 /// pipe server specified by `addr`. This will use `NamedPipe::wait` internally
107 /// to block until it can connect.
HTS106_Label_load_from_fn(HTS106_Label * label,int sampling_rate,int fperiod,char * fn)108 pub fn connect<A: AsRef<OsStr>>(addr: A) -> io::Result<File> {
109 _connect(addr.as_ref())
110 }
111
112 fn _connect(addr: &OsStr) -> io::Result<File> {
113 let mut r = OpenOptions::new();
114 let mut w = OpenOptions::new();
115 let mut rw = OpenOptions::new();
HTS106_Label_load_from_fp(HTS106_Label * label,int sampling_rate,int fperiod,HTS106_File * fp)116 r.read(true);
117 w.write(true);
118 rw.read(true).write(true);
119 loop {
120 let res = rw.open(addr).or_else(|_| r.open(addr))
121 .or_else(|_| w.open(addr));
122 match res {
123 Ok(f) => return Ok(f),
124 Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32)
125 => {}
126 Err(e) => return Err(e),
127 }
128
129 try!(NamedPipe::wait(addr, Some(Duration::new(20, 0))));
130 }
131 }
132
133 impl NamedPipe {
134 /// Creates a new initial named pipe.
135 ///
136 /// This function is equivalent to:
137 ///
138 /// ```
139 /// use miow::pipe::NamedPipeBuilder;
140 ///
141 /// # let addr = "foo";
142 /// NamedPipeBuilder::new(addr)
143 /// .first(true)
144 /// .inbound(true)
145 /// .outbound(true)
146 /// .out_buffer_size(65536)
147 /// .in_buffer_size(65536)
148 /// .create();
149 /// ```
150 pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
151 NamedPipeBuilder::new(addr).create()
152 }
153
154 /// Waits until either a time-out interval elapses or an instance of the
155 /// specified named pipe is available for connection.
156 ///
157 /// If this function succeeds the process can create a `File` to connect to
HTS106_Label_load_from_string(HTS106_Label * label,int sampling_rate,int fperiod,char * data)158 /// the named pipe.
159 pub fn wait<A: AsRef<OsStr>>(addr: A, timeout: Option<Duration>)
160 -> io::Result<()> {
161 NamedPipe::_wait(addr.as_ref(), timeout)
162 }
163
164 fn _wait(addr: &OsStr, timeout: Option<Duration>) -> io::Result<()> {
165 let addr = addr.encode_wide().chain(Some(0)).collect::<Vec<_>>();
166 let timeout = ::dur2ms(timeout);
167 ::cvt(unsafe {
168 WaitNamedPipeW(addr.as_ptr(), timeout)
169 }).map(|_| ())
170 }
171
172 /// Connects this named pipe to a client, blocking until one becomes
173 /// available.
174 ///
175 /// This function will call the `ConnectNamedPipe` function to await for a
176 /// client to connect. This can be called immediately after the pipe is
177 /// created, or after it has been disconnected from a previous client.
178 pub fn connect(&self) -> io::Result<()> {
179 match ::cvt(unsafe { ConnectNamedPipe(self.0.raw(), 0 as *mut _) }) {
180 Ok(_) => Ok(()),
181 Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32)
182 => Ok(()),
183 Err(e) => Err(e),
184 }
185 }
186
187 /// Issue a connection request with the specified overlapped operation.
188 ///
189 /// This function will issue a request to connect a client to this server,
190 /// returning immediately after starting the overlapped operation.
191 ///
192 /// If this function immediately succeeds then `Ok(true)` is returned. If
193 /// the overlapped operation is enqueued and pending, then `Ok(false)` is
194 /// returned. Otherwise an error is returned indicating what went wrong.
195 ///
196 /// # Unsafety
197 ///
198 /// This function is unsafe because the kernel requires that the
199 /// `overlapped` pointer is valid until the end of the I/O operation. The
200 /// kernel also requires that `overlapped` is unique for this I/O operation
HTS106_Label_load_from_string_list(HTS106_Label * label,int sampling_rate,int fperiod,char ** data,int size)201 /// and is not in use for any other I/O.
202 ///
203 /// To safely use this function callers must ensure that this pointer is
204 /// valid until the I/O operation is completed, typically via completion
205 /// ports and waiting to receive the completion notification on the port.
206 pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED)
207 -> io::Result<bool> {
208 match ::cvt(ConnectNamedPipe(self.0.raw(), overlapped)) {
209 Ok(_) => Ok(true),
210 Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32)
211 => Ok(true),
212 Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32)
213 => Ok(false),
214 Err(e) => Err(e),
215 }
216 }
217
218 /// Disconnects this named pipe from any connected client.
219 pub fn disconnect(&self) -> io::Result<()> {
220 ::cvt(unsafe {
221 DisconnectNamedPipe(self.0.raw())
222 }).map(|_| ())
223 }
224
225 /// Issues an overlapped read operation to occur on this pipe.
226 ///
227 /// This function will issue an asynchronous read to occur in an overlapped
228 /// fashion, returning immediately. The `buf` provided will be filled in
229 /// with data and the request is tracked by the `overlapped` function
230 /// provided.
231 ///
232 /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
233 /// `n` is the number of bytes read. If an asynchronous operation is
234 /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
235 /// it is returned.
236 ///
237 /// When this operation completes (or if it completes immediately), another
238 /// mechanism must be used to learn how many bytes were transferred (such as
239 /// looking at the filed in the IOCP status message).
240 ///
241 /// # Unsafety
242 ///
243 /// This function is unsafe because the kernel requires that the `buf` and
244 /// `overlapped` pointers to be valid until the end of the I/O operation.
245 /// The kernel also requires that `overlapped` is unique for this I/O
246 /// operation and is not in use for any other I/O.
247 ///
HTS106_Label_set_frame_specified_flag(HTS106_Label * label,HTS106_Boolean i)248 /// To safely use this function callers must ensure that the pointers are
249 /// valid until the I/O operation is completed, typically via completion
250 /// ports and waiting to receive the completion notification on the port.
251 pub unsafe fn read_overlapped(&self,
252 buf: &mut [u8],
253 overlapped: *mut OVERLAPPED)
254 -> io::Result<Option<usize>> {
255 self.0.read_overlapped(buf, overlapped)
256 }
257
258 /// Issues an overlapped write operation to occur on this pipe.
259 ///
260 /// This function will issue an asynchronous write to occur in an overlapped
HTS106_Label_get_size(HTS106_Label * label)261 /// fashion, returning immediately. The `buf` provided will be filled in
262 /// with data and the request is tracked by the `overlapped` function
263 /// provided.
264 ///
265 /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
266 /// `n` is the number of bytes written. If an asynchronous operation is
267 /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
268 /// it is returned.
269 ///
270 /// When this operation completes (or if it completes immediately), another
271 /// mechanism must be used to learn how many bytes were transferred (such as
272 /// looking at the filed in the IOCP status message).
273 ///
274 /// # Unsafety
275 ///
276 /// This function is unsafe because the kernel requires that the `buf` and
277 /// `overlapped` pointers to be valid until the end of the I/O operation.
278 /// The kernel also requires that `overlapped` is unique for this I/O
279 /// operation and is not in use for any other I/O.
280 ///
281 /// To safely use this function callers must ensure that the pointers are
282 /// valid until the I/O operation is completed, typically via completion
283 /// ports and waiting to receive the completion notification on the port.
284 pub unsafe fn write_overlapped(&self,
285 buf: &[u8],
286 overlapped: *mut OVERLAPPED)
287 -> io::Result<Option<usize>> {
288 self.0.write_overlapped(buf, overlapped)
289 }
290
291 /// Calls the `GetOverlappedResult` function to get the result of an
292 /// overlapped operation for this handle.
293 ///
294 /// This function takes the `OVERLAPPED` argument which must have been used
295 /// to initiate an overlapped I/O operation, and returns either the
296 /// successful number of bytes transferred during the operation or an error
HTS106_Label_get_end_frame(HTS106_Label * label,int string_index)297 /// if one occurred.
298 ///
299 /// # Unsafety
300 ///
301 /// This function is unsafe as `overlapped` must have previously been used
302 /// to execute an operation for this handle, and it must also be a valid
303 /// pointer to an `Overlapped` instance.
304 ///
305 /// # Panics
306 ///
307 /// This function will panic
308 pub unsafe fn result(&self, overlapped: *mut OVERLAPPED)
309 -> io::Result<usize> {
310 let mut transferred = 0;
311 let r = GetOverlappedResult(self.0.raw(),
312 overlapped,
313 &mut transferred,
314 FALSE);
315 if r == 0 {
316 Err(io::Error::last_os_error())
317 } else {
318 Ok(transferred as usize)
319 }
320 }
321 }
322
323 thread_local! {
324 static NAMED_PIPE_OVERLAPPED: RefCell<Option<Overlapped>> = RefCell::new(None);
325 }
326
327 /// Call a function with a threadlocal `Overlapped`. The function `f` should be
328 /// sure that the event is reset, either manually or by a thread being released.
329 fn with_threadlocal_overlapped<F>(f: F) -> io::Result<usize>
330 where F: FnOnce(&Overlapped) -> io::Result<usize>
331 {
332 NAMED_PIPE_OVERLAPPED.with(|overlapped| {
333 let mut mborrow = overlapped.borrow_mut();
334 if let None = *mborrow {
335 let op = Overlapped::initialize_with_autoreset_event()?;
336 *mborrow = Some(op);
337 }
338 f(mborrow.as_ref().unwrap())
339 })
340 }
341
342 impl Read for NamedPipe {
343 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
344 // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
345 with_threadlocal_overlapped(|overlapped| unsafe {
346 self.0.read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
347 })
348 }
349 }
350 impl<'a> Read for &'a NamedPipe {
351 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
352 // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
353 with_threadlocal_overlapped(|overlapped| unsafe {
354 self.0.read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
355 })
356 }
357 }
358
359 impl Write for NamedPipe {
360 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
361 // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
362 with_threadlocal_overlapped(|overlapped| unsafe {
363 self.0.write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
364 })
365 }
366 fn flush(&mut self) -> io::Result<()> {
367 <&NamedPipe as Write>::flush(&mut &*self)
368 }
369 }
370 impl<'a> Write for &'a NamedPipe {
371 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
372 // This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
373 with_threadlocal_overlapped(|overlapped| unsafe {
374 self.0.write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
375 })
376 }
377 fn flush(&mut self) -> io::Result<()> {
378 ::cvt(unsafe { FlushFileBuffers(self.0.raw()) }).map(|_| ())
379 }
380 }
381
382 impl AsRawHandle for NamedPipe {
383 fn as_raw_handle(&self) -> HANDLE { self.0.raw() }
384 }
385 impl FromRawHandle for NamedPipe {
386 unsafe fn from_raw_handle(handle: HANDLE) -> NamedPipe {
387 NamedPipe(Handle::new(handle))
388 }
389 }
390 impl IntoRawHandle for NamedPipe {
391 fn into_raw_handle(self) -> HANDLE { self.0.into_raw() }
392 }
393
394 fn flag(slot: &mut DWORD, on: bool, val: DWORD) {
395 if on {
396 *slot |= val;
397 } else {
398 *slot &= !val;
399 }
400 }
401
402 impl NamedPipeBuilder {
403 /// Creates a new named pipe builder with the default settings.
404 pub fn new<A: AsRef<OsStr>>(addr: A) -> NamedPipeBuilder {
405 NamedPipeBuilder {
406 name: addr.as_ref().encode_wide().chain(Some(0)).collect(),
407 dwOpenMode: PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE |
408 FILE_FLAG_OVERLAPPED,
409 dwPipeMode: PIPE_TYPE_BYTE,
410 nMaxInstances: PIPE_UNLIMITED_INSTANCES,
411 nOutBufferSize: 65536,
412 nInBufferSize: 65536,
413 nDefaultTimeOut: 0,
414 }
415 }
416
417 /// Indicates whether data is allowed to flow from the client to the server.
418 pub fn inbound(&mut self, allowed: bool) -> &mut Self {
419 flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_INBOUND);
420 self
421 }
422
423 /// Indicates whether data is allowed to flow from the server to the client.
424 pub fn outbound(&mut self, allowed: bool) -> &mut Self {
425 flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_OUTBOUND);
426 self
427 }
428
429 /// Indicates that this pipe must be the first instance.
430 ///
431 /// If set to true, then creation will fail if there's already an instance
432 /// elsewhere.
433 pub fn first(&mut self, first: bool) -> &mut Self {
434 flag(&mut self.dwOpenMode, first, FILE_FLAG_FIRST_PIPE_INSTANCE);
435 self
436 }
437
438 /// Indicates whether this server can accept remote clients or not.
439 pub fn accept_remote(&mut self, accept: bool) -> &mut Self {
440 flag(&mut self.dwPipeMode, !accept, PIPE_REJECT_REMOTE_CLIENTS);
441 self
442 }
443
444 /// Specifies the maximum number of instances of the server pipe that are
445 /// allowed.
446 ///
447 /// The first instance of a pipe can specify this value. A value of 255
448 /// indicates that there is no limit to the number of instances.
449 pub fn max_instances(&mut self, instances: u8) -> &mut Self {
450 self.nMaxInstances = instances as DWORD;
451 self
452 }
453
454 /// Specifies the number of bytes to reserver for the output buffer
455 pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
456 self.nOutBufferSize = buffer as DWORD;
457 self
458 }
459
460 /// Specifies the number of bytes to reserver for the input buffer
461 pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
462 self.nInBufferSize = buffer as DWORD;
463 self
464 }
465
466 /// Using the options in this builder, attempt to create a new named pipe.
467 ///
468 /// This function will call the `CreateNamedPipe` function and return the
469 /// result.
470 pub fn create(&mut self) -> io::Result<NamedPipe> {
471 unsafe { self.with_security_attributes(::std::ptr::null_mut()) }
472 }
473
474 /// Using the options in the builder and the provided security attributes, attempt to create a
475 /// new named pipe. This function has to be called with a valid pointer to a
476 /// `SECURITY_ATTRIBUTES` struct that will stay valid for the lifetime of this function or a
477 /// null pointer.
478 ///
479 /// This function will call the `CreateNamedPipe` function and return the
480 /// result.
481 pub unsafe fn with_security_attributes(&mut self, attrs: *mut SECURITY_ATTRIBUTES) -> io::Result<NamedPipe> {
482 let h = CreateNamedPipeW(self.name.as_ptr(),
483 self.dwOpenMode, self.dwPipeMode,
484 self.nMaxInstances, self.nOutBufferSize,
485 self.nInBufferSize, self.nDefaultTimeOut,
486 attrs);
487
488 if h == INVALID_HANDLE_VALUE {
489 Err(io::Error::last_os_error())
490 } else {
491 Ok(NamedPipe(Handle::new(h)))
492 }
493 }
494 }
495
496 #[cfg(test)]
497 mod tests {
498 use std::fs::{File, OpenOptions};
499 use std::io::prelude::*;
500 use std::sync::mpsc::channel;
501 use std::thread;
502 use std::time::Duration;
503
504 use rand::{thread_rng, Rng};
505
506 use super::{anonymous, NamedPipe, NamedPipeBuilder};
507 use iocp::CompletionPort;
508 use Overlapped;
509
510 fn name() -> String {
511 let name = thread_rng().gen_ascii_chars().take(30).collect::<String>();
512 format!(r"\\.\pipe\{}", name)
513 }
514
515 #[test]
516 fn anon() {
517 let (mut read, mut write) = t!(anonymous(256));
518 assert_eq!(t!(write.write(&[1, 2, 3])), 3);
519 let mut b = [0; 10];
520 assert_eq!(t!(read.read(&mut b)), 3);
521 assert_eq!(&b[..3], &[1, 2, 3]);
522 }
523
524 #[test]
525 fn named_not_first() {
526 let name = name();
527 let _a = t!(NamedPipe::new(&name));
528 assert!(NamedPipe::new(&name).is_err());
529
530 t!(NamedPipeBuilder::new(&name).first(false).create());
531 }
532
533 #[test]
534 fn named_connect() {
535 let name = name();
536 let a = t!(NamedPipe::new(&name));
537
538 let t = thread::spawn(move || {
539 t!(File::open(name));
540 });
541
542 t!(a.connect());
543 t!(a.disconnect());
544 t!(t.join());
545 }
546
547 #[test]
548 fn named_wait() {
549 let name = name();
550 let a = t!(NamedPipe::new(&name));
551
552 let (tx, rx) = channel();
553 let t = thread::spawn(move || {
554 t!(NamedPipe::wait(&name, None));
555 t!(File::open(&name));
556 assert!(NamedPipe::wait(&name, Some(Duration::from_millis(1))).is_err());
557 t!(tx.send(()));
558 });
559
560 t!(a.connect());
561 t!(rx.recv());
562 t!(a.disconnect());
563 t!(t.join());
564 }
565
566 #[test]
567 fn named_connect_overlapped() {
568 let name = name();
569 let a = t!(NamedPipe::new(&name));
570
571 let t = thread::spawn(move || {
572 t!(File::open(name));
573 });
574
575 let cp = t!(CompletionPort::new(1));
576 t!(cp.add_handle(2, &a));
577
578 let over = Overlapped::zero();
579 unsafe {
580 t!(a.connect_overlapped(over.raw()));
581 }
582
583 let status = t!(cp.get(None));
584 assert_eq!(status.bytes_transferred(), 0);
585 assert_eq!(status.token(), 2);
586 assert_eq!(status.overlapped(), over.raw());
587 t!(t.join());
588 }
589
590 #[test]
591 fn named_read_write() {
592 let name = name();
593 let mut a = t!(NamedPipe::new(&name));
594
595 let t = thread::spawn(move || {
596 let mut f = t!(OpenOptions::new().read(true).write(true).open(name));
597 t!(f.write_all(&[1, 2, 3]));
598 let mut b = [0; 10];
599 assert_eq!(t!(f.read(&mut b)), 3);
600 assert_eq!(&b[..3], &[1, 2, 3]);
601 });
602
603 t!(a.connect());
604 let mut b = [0; 10];
605 assert_eq!(t!(a.read(&mut b)), 3);
606 assert_eq!(&b[..3], &[1, 2, 3]);
607 t!(a.write_all(&[1, 2, 3]));
608 t!(a.flush());
609 t!(a.disconnect());
610 t!(t.join());
611 }
612
613 #[test]
614 fn named_read_write_multi() {
615 for _ in 0..5 {
616 named_read_write()
617 }
618 }
619
620 #[test]
621 fn named_read_write_multi_same_thread() {
622 let name1 = name();
623 let mut a1 = t!(NamedPipe::new(&name1));
624 let name2 = name();
625 let mut a2 = t!(NamedPipe::new(&name2));
626
627 let t = thread::spawn(move || {
628 let mut f = t!(OpenOptions::new().read(true).write(true).open(name1));
629 t!(f.write_all(&[1, 2, 3]));
630 let mut b = [0; 10];
631 assert_eq!(t!(f.read(&mut b)), 3);
632 assert_eq!(&b[..3], &[1, 2, 3]);
633
634 let mut f = t!(OpenOptions::new().read(true).write(true).open(name2));
635 t!(f.write_all(&[1, 2, 3]));
636 let mut b = [0; 10];
637 assert_eq!(t!(f.read(&mut b)), 3);
638 assert_eq!(&b[..3], &[1, 2, 3]);
639 });
640
641 t!(a1.connect());
642 let mut b = [0; 10];
643 assert_eq!(t!(a1.read(&mut b)), 3);
644 assert_eq!(&b[..3], &[1, 2, 3]);
645 t!(a1.write_all(&[1, 2, 3]));
646 t!(a1.flush());
647 t!(a1.disconnect());
648
649 t!(a2.connect());
650 let mut b = [0; 10];
651 assert_eq!(t!(a2.read(&mut b)), 3);
652 assert_eq!(&b[..3], &[1, 2, 3]);
653 t!(a2.write_all(&[1, 2, 3]));
654 t!(a2.flush());
655 t!(a2.disconnect());
656
657 t!(t.join());
658 }
659
660 #[test]
661 fn named_read_overlapped() {
662 let name = name();
663 let a = t!(NamedPipe::new(&name));
664
665 let t = thread::spawn(move || {
666 let mut f = t!(File::create(name));
667 t!(f.write_all(&[1, 2, 3]));
668 });
669
670 let cp = t!(CompletionPort::new(1));
671 t!(cp.add_handle(3, &a));
672 t!(a.connect());
673
674 let mut b = [0; 10];
675 let over = Overlapped::zero();
676 unsafe {
677 t!(a.read_overlapped(&mut b, over.raw()));
678 }
679 let status = t!(cp.get(None));
680 assert_eq!(status.bytes_transferred(), 3);
681 assert_eq!(status.token(), 3);
682 assert_eq!(status.overlapped(), over.raw());
683 assert_eq!(&b[..3], &[1, 2, 3]);
684
685 t!(t.join());
686 }
687
688 #[test]
689 fn named_write_overlapped() {
690 let name = name();
691 let a = t!(NamedPipe::new(&name));
692
693 let t = thread::spawn(move || {
694 let mut f = t!(super::connect(name));
695 let mut b = [0; 10];
696 assert_eq!(t!(f.read(&mut b)), 3);
697 assert_eq!(&b[..3], &[1, 2, 3])
698 });
699
700 let cp = t!(CompletionPort::new(1));
701 t!(cp.add_handle(3, &a));
702 t!(a.connect());
703
704 let over = Overlapped::zero();
705 unsafe {
706 t!(a.write_overlapped(&[1, 2, 3], over.raw()));
707 }
708
709 let status = t!(cp.get(None));
710 assert_eq!(status.bytes_transferred(), 3);
711 assert_eq!(status.token(), 3);
712 assert_eq!(status.overlapped(), over.raw());
713
714 t!(t.join());
715 }
716 }
717