1 // Copyright 2015-2018 Benjamin Fry <benjaminfry@me.com>
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use std;
9 use std::io;
10 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
11 use std::pin::Pin;
12 use std::sync::Arc;
13 use std::task::{Context, Poll};
14 
15 use futures_util::stream::{Stream, StreamExt};
16 use futures_util::{future, future::Future, ready, FutureExt, TryFutureExt};
17 use lazy_static::lazy_static;
18 use log::{debug, trace};
19 use rand;
20 use rand::distributions::{uniform::Uniform, Distribution};
21 use socket2::{self, Socket};
22 use tokio::net::UdpSocket;
23 
24 use crate::multicast::MdnsQueryType;
25 use crate::udp::UdpStream;
26 use crate::xfer::SerialMessage;
27 use crate::BufStreamHandle;
28 
29 pub(crate) const MDNS_PORT: u16 = 5353;
30 lazy_static! {
31     /// mDNS ipv4 address https://www.iana.org/assignments/multicast-addresses/multicast-addresses.xhtml
32     pub static ref MDNS_IPV4: SocketAddr = SocketAddr::new(Ipv4Addr::new(224,0,0,251).into(), MDNS_PORT);
33     /// link-local mDNS ipv6 address https://www.iana.org/assignments/ipv6-multicast-addresses/ipv6-multicast-addresses.xhtml
34     pub static ref MDNS_IPV6: SocketAddr = SocketAddr::new(Ipv6Addr::new(0xFF02, 0, 0, 0, 0, 0, 0, 0x00FB).into(), MDNS_PORT);
35 }
36 
37 /// A UDP stream of DNS binary packets
38 #[must_use = "futures do nothing unless polled"]
39 pub struct MdnsStream {
40     /// Multicast address used for mDNS queries
41     multicast_addr: SocketAddr,
42     /// This is used for sending and (directly) receiving messages
43     datagram: Option<UdpStream<UdpSocket>>,
44     // FIXME: like UdpStream, this Arc is unnecessary, only needed for temp async/await capture below
45     /// In one-shot multicast, this will not join the multicast group
46     multicast: Option<Arc<UdpSocket>>,
47     /// Receiving portion of the MdnsStream
48     rcving_mcast: Option<Pin<Box<dyn Future<Output = io::Result<SerialMessage>> + Send>>>,
49 }
50 
51 impl MdnsStream {
52     /// associates the socket to the well-known ipv4 multicast address
new_ipv4( mdns_query_type: MdnsQueryType, packet_ttl: Option<u32>, ipv4_if: Option<Ipv4Addr>, ) -> ( Box<dyn Future<Output = Result<MdnsStream, io::Error>> + Send + Unpin>, BufStreamHandle, )53     pub fn new_ipv4(
54         mdns_query_type: MdnsQueryType,
55         packet_ttl: Option<u32>,
56         ipv4_if: Option<Ipv4Addr>,
57     ) -> (
58         Box<dyn Future<Output = Result<MdnsStream, io::Error>> + Send + Unpin>,
59         BufStreamHandle,
60     ) {
61         Self::new(*MDNS_IPV4, mdns_query_type, packet_ttl, ipv4_if, None)
62     }
63 
64     /// associates the socket to the well-known ipv6 multicast address
new_ipv6( mdns_query_type: MdnsQueryType, packet_ttl: Option<u32>, ipv6_if: Option<u32>, ) -> ( Box<dyn Future<Output = Result<MdnsStream, io::Error>> + Send + Unpin>, BufStreamHandle, )65     pub fn new_ipv6(
66         mdns_query_type: MdnsQueryType,
67         packet_ttl: Option<u32>,
68         ipv6_if: Option<u32>,
69     ) -> (
70         Box<dyn Future<Output = Result<MdnsStream, io::Error>> + Send + Unpin>,
71         BufStreamHandle,
72     ) {
73         Self::new(*MDNS_IPV6, mdns_query_type, packet_ttl, None, ipv6_if)
74     }
75 
76     /// Returns the address of the multicast network in use
multicast_addr(&self) -> SocketAddr77     pub fn multicast_addr(&self) -> SocketAddr {
78         self.multicast_addr
79     }
80 
81     /// This method is available for specifying a custom Multicast address to use.
82     ///
83     /// In general this operates nearly identically to UDP, except that it automatically joins
84     ///  the default multicast DNS addresses. See https://tools.ietf.org/html/rfc6762#section-5
85     ///  for details.
86     ///
87     /// When sending ipv6 multicast packets, the interface being used is required,
88     ///  this will panic if the interface is not specified for all MdnsQueryType except Passive
89     ///  (which does not allow sending data)
90     ///
91     /// # Arguments
92     ///
93     /// * `multicast_addr` - address to use for multicast requests
94     /// * `mdns_query_type` - true if the querier using this socket will only perform standard DNS queries over multicast.
95     /// * `ipv4_if` - Address to bind to for sending multicast packets, defaults to `0.0.0.0` if not specified (not relevant for ipv6)
96     /// * `ipv6_if` - Interface index for the interface to be used when sending ipv6 packets.
97     ///
98     /// # Return
99     ///
100     /// a tuple of a Future Stream which will handle sending and receiving messages, and a
101     ///  handle which can be used to send messages into the stream.
new( multicast_addr: SocketAddr, mdns_query_type: MdnsQueryType, packet_ttl: Option<u32>, ipv4_if: Option<Ipv4Addr>, ipv6_if: Option<u32>, ) -> ( Box<dyn Future<Output = Result<MdnsStream, io::Error>> + Send + Unpin>, BufStreamHandle, )102     pub fn new(
103         multicast_addr: SocketAddr,
104         mdns_query_type: MdnsQueryType,
105         packet_ttl: Option<u32>,
106         ipv4_if: Option<Ipv4Addr>,
107         ipv6_if: Option<u32>,
108     ) -> (
109         Box<dyn Future<Output = Result<MdnsStream, io::Error>> + Send + Unpin>,
110         BufStreamHandle,
111     ) {
112         let (message_sender, outbound_messages) = BufStreamHandle::create();
113         let multicast_socket = match Self::join_multicast(&multicast_addr, mdns_query_type) {
114             Ok(socket) => socket,
115             Err(err) => return (Box::new(future::err(err)), message_sender),
116         };
117 
118         // TODO: allow the bind address to be specified...
119         // constructs a future for getting the next randomly bound port to a UdpSocket
120         let next_socket = Self::next_bound_local_address(
121             &multicast_addr,
122             mdns_query_type,
123             packet_ttl,
124             ipv4_if,
125             ipv6_if,
126         );
127 
128         // while 0 is meant to keep the packet on localhost, linux regards this as an error,
129         //   while macOS (BSD?) and Windows allow it.
130         if let Some(ttl) = packet_ttl {
131             assert!(ttl > 0, "TTL must be greater than 0");
132         }
133 
134         // This set of futures collapses the next udp socket into a stream which can be used for
135         //  sending and receiving udp packets.
136         let stream = {
137             Box::new(
138                 next_socket
139                     .map(move |socket| match socket {
140                         Ok(Some(socket)) => Ok(Some(UdpSocket::from_std(socket)?)),
141                         Ok(None) => Ok(None),
142                         Err(err) => Err(err),
143                     })
144                     .map_ok(move |socket: Option<_>| {
145                         let datagram: Option<_> =
146                             socket.map(|socket| UdpStream::from_parts(socket, outbound_messages));
147                         let multicast: Option<_> = multicast_socket.map(|multicast_socket| {
148                             Arc::new(UdpSocket::from_std(multicast_socket).expect("bad handle?"))
149                         });
150 
151                         MdnsStream {
152                             multicast_addr,
153                             datagram,
154                             multicast,
155                             rcving_mcast: None,
156                         }
157                     }),
158             )
159         };
160 
161         (stream, message_sender)
162     }
163 
164     /// On Windows, unlike all Unix variants, it is improper to bind to the multicast address
165     ///
166     /// see https://msdn.microsoft.com/en-us/library/windows/desktop/ms737550(v=vs.85).aspx
167     #[cfg(windows)]
bind_multicast(socket: &Socket, multicast_addr: &SocketAddr) -> io::Result<()>168     fn bind_multicast(socket: &Socket, multicast_addr: &SocketAddr) -> io::Result<()> {
169         let multicast_addr = match *multicast_addr {
170             SocketAddr::V4(addr) => SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), addr.port()),
171             SocketAddr::V6(addr) => {
172                 SocketAddr::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(), addr.port())
173             }
174         };
175         socket.bind(&socket2::SockAddr::from(multicast_addr))
176     }
177 
178     /// On unixes we bind to the multicast address, which causes multicast packets to be filtered
179     #[cfg(unix)]
bind_multicast(socket: &Socket, multicast_addr: &SocketAddr) -> io::Result<()>180     fn bind_multicast(socket: &Socket, multicast_addr: &SocketAddr) -> io::Result<()> {
181         socket.bind(&socket2::SockAddr::from(*multicast_addr))
182     }
183 
184     /// Returns a socket joined to the multicast address
join_multicast( multicast_addr: &SocketAddr, mdns_query_type: MdnsQueryType, ) -> Result<Option<std::net::UdpSocket>, io::Error>185     fn join_multicast(
186         multicast_addr: &SocketAddr,
187         mdns_query_type: MdnsQueryType,
188     ) -> Result<Option<std::net::UdpSocket>, io::Error> {
189         if !mdns_query_type.join_multicast() {
190             return Ok(None);
191         }
192 
193         let ip_addr = multicast_addr.ip();
194         // it's an error to not use a proper mDNS address
195         if !ip_addr.is_multicast() {
196             return Err(io::Error::new(
197                 io::ErrorKind::Other,
198                 format!("expected multicast address for binding: {}", ip_addr),
199             ));
200         }
201 
202         // binding the UdpSocket to the multicast address tells the OS to filter all packets on this socket to just this
203         //   multicast address
204         // TODO: allow the binding interface to be specified
205         let socket = match ip_addr {
206             IpAddr::V4(ref mdns_v4) => {
207                 let socket = Socket::new(
208                     socket2::Domain::ipv4(),
209                     socket2::Type::dgram(),
210                     Some(socket2::Protocol::udp()),
211                 )?;
212                 socket.join_multicast_v4(mdns_v4, &Ipv4Addr::new(0, 0, 0, 0))?;
213                 socket
214             }
215             IpAddr::V6(ref mdns_v6) => {
216                 let socket = Socket::new(
217                     socket2::Domain::ipv6(),
218                     socket2::Type::dgram(),
219                     Some(socket2::Protocol::udp()),
220                 )?;
221 
222                 socket.set_only_v6(true)?;
223                 socket.join_multicast_v6(mdns_v6, 0)?;
224                 socket
225             }
226         };
227 
228         socket.set_nonblocking(true)?;
229         socket.set_reuse_address(true)?;
230         #[cfg(unix)] // this is currently restricted to Unix's in socket2
231         socket.set_reuse_port(true)?;
232         Self::bind_multicast(&socket, multicast_addr)?;
233 
234         debug!("joined {}", multicast_addr);
235         Ok(Some(socket.into_udp_socket()))
236     }
237 
238     /// Creates a future for randomly binding to a local socket address for client connections.
next_bound_local_address( multicast_addr: &SocketAddr, mdns_query_type: MdnsQueryType, packet_ttl: Option<u32>, ipv4_if: Option<Ipv4Addr>, ipv6_if: Option<u32>, ) -> NextRandomUdpSocket239     fn next_bound_local_address(
240         multicast_addr: &SocketAddr,
241         mdns_query_type: MdnsQueryType,
242         packet_ttl: Option<u32>,
243         ipv4_if: Option<Ipv4Addr>,
244         ipv6_if: Option<u32>,
245     ) -> NextRandomUdpSocket {
246         let bind_address: IpAddr = match *multicast_addr {
247             SocketAddr::V4(..) => IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
248             SocketAddr::V6(..) => IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)),
249         };
250 
251         NextRandomUdpSocket {
252             bind_address,
253             mdns_query_type,
254             packet_ttl,
255             ipv4_if,
256             ipv6_if,
257         }
258     }
259 }
260 
261 impl Stream for MdnsStream {
262     type Item = io::Result<SerialMessage>;
263 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>264     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
265         assert!(self.datagram.is_some() || self.multicast.is_some());
266 
267         // we poll the datagram socket first, if available, since it's a direct response or direct request
268         if let Some(ref mut datagram) = self.as_mut().datagram {
269             match datagram.poll_next_unpin(cx) {
270                 Poll::Ready(ready) => return Poll::Ready(ready),
271                 Poll::Pending => (), // drop through
272             }
273         }
274 
275         loop {
276             let msg = if let Some(ref mut receiving) = self.rcving_mcast {
277                 // TODO: should we drop this packet if it's not from the same src as dest?
278                 let msg = ready!(receiving.as_mut().poll_unpin(cx))?;
279 
280                 Some(Poll::Ready(Some(Ok(msg))))
281             } else {
282                 None
283             };
284 
285             self.rcving_mcast = None;
286 
287             if let Some(msg) = msg {
288                 return msg;
289             }
290 
291             // let socket = Arc::clone(socket);
292             if let Some(ref socket) = self.multicast {
293                 let socket = Arc::clone(socket);
294                 let receive_future = async {
295                     let socket = socket;
296                     let mut buf = [0u8; 2048];
297                     let (len, src) = socket.recv_from(&mut buf).await?;
298 
299                     Ok(SerialMessage::new(
300                         buf.iter().take(len).cloned().collect(),
301                         src,
302                     ))
303                 };
304 
305                 self.rcving_mcast = Some(Box::pin(receive_future.boxed()));
306             }
307         }
308     }
309 }
310 
311 #[must_use = "futures do nothing unless polled"]
312 struct NextRandomUdpSocket {
313     bind_address: IpAddr,
314     mdns_query_type: MdnsQueryType,
315     packet_ttl: Option<u32>,
316     ipv4_if: Option<Ipv4Addr>,
317     ipv6_if: Option<u32>,
318 }
319 
320 impl NextRandomUdpSocket {
prepare_sender(&self, socket: std::net::UdpSocket) -> io::Result<std::net::UdpSocket>321     fn prepare_sender(&self, socket: std::net::UdpSocket) -> io::Result<std::net::UdpSocket> {
322         let addr = socket.local_addr()?;
323         debug!("preparing sender on: {}", addr);
324 
325         let socket = Socket::from(socket);
326 
327         // TODO: TTL doesn't work on ipv6
328         match addr {
329             SocketAddr::V4(..) => {
330                 socket.set_multicast_loop_v4(true)?;
331                 socket.set_multicast_if_v4(
332                     &self.ipv4_if.unwrap_or_else(|| Ipv4Addr::new(0, 0, 0, 0)),
333                 )?;
334                 if let Some(ttl) = self.packet_ttl {
335                     socket.set_ttl(ttl)?;
336                     socket.set_multicast_ttl_v4(ttl)?;
337                 }
338             }
339             SocketAddr::V6(..) => {
340                 let ipv6_if = self.ipv6_if.unwrap_or_else(|| {
341                     panic!("for ipv6 multicasting the interface must be specified")
342                 });
343 
344                 socket.set_multicast_loop_v6(true)?;
345                 socket.set_multicast_if_v6(ipv6_if)?;
346                 if let Some(ttl) = self.packet_ttl {
347                     socket.set_unicast_hops_v6(ttl)?;
348                     socket.set_multicast_hops_v6(ttl)?;
349                 }
350             }
351         }
352 
353         Ok(socket.into_udp_socket())
354     }
355 }
356 
357 impl Future for NextRandomUdpSocket {
358     // TODO: clean this up, the RandomUdpSocket shouldnt' care about the query type
359     type Output = io::Result<Option<std::net::UdpSocket>>;
360 
361     /// polls until there is an available next random UDP port.
362     ///
363     /// if there is no port available after 10 attempts, returns NotReady
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>364     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
365         // non-one-shot, i.e. continuous, always use one of the well-known mdns ports and bind to the multicast addr
366         if !self.mdns_query_type.sender() {
367             debug!("skipping sending stream");
368             Poll::Ready(Ok(None))
369         } else if self.mdns_query_type.bind_on_5353() {
370             let addr = SocketAddr::new(self.bind_address, MDNS_PORT);
371             debug!("binding sending stream to {}", addr);
372             let socket = std::net::UdpSocket::bind(&addr)?;
373             let socket = self.prepare_sender(socket)?;
374 
375             Poll::Ready(Ok(Some(socket)))
376         } else {
377             // TODO: this is basically identical to UdpStream from here... share some code? (except for the port restriction)
378             // one-shot queries look very similar to UDP socket, but can't listen on 5353
379             let rand_port_range = Uniform::new_inclusive(1025_u16, u16::max_value());
380             let mut rand = rand::thread_rng();
381 
382             for attempt in 0..10 {
383                 let port = rand_port_range.sample(&mut rand); // the range is [0 ... u16::max]
384 
385                 // see one_shot usage info: https://tools.ietf.org/html/rfc6762#section-5
386                 //  the MDNS_PORT is used to signal to remote processes that this is capable of receiving multicast packets
387                 //  i.e. is joined to the multicast address.
388                 if port == MDNS_PORT {
389                     trace!("unlucky, got MDNS_PORT");
390                     continue;
391                 }
392 
393                 let addr = SocketAddr::new(self.bind_address, port);
394                 debug!("binding sending stream to {}", addr);
395 
396                 match std::net::UdpSocket::bind(&addr) {
397                     Ok(socket) => {
398                         let socket = self.prepare_sender(socket)?;
399                         return Poll::Ready(Ok(Some(socket)));
400                     }
401                     Err(err) => debug!("unable to bind port, attempt: {}: {}", attempt, err),
402                 }
403             }
404 
405             debug!("could not get next random port, delaying");
406 
407             // TODO: this replaced a task::current().notify, is it correct?
408             cx.waker().wake_by_ref();
409             Poll::Pending
410         }
411     }
412 }
413 
414 #[cfg(test)]
415 pub(crate) mod tests {
416     #![allow(clippy::dbg_macro, clippy::print_stdout)]
417 
418     use super::*;
419     use futures_util::future::Either;
420     use tokio::runtime;
421 
422     // TODO: is there a better way?
423     const BASE_TEST_PORT: u16 = 5379;
424 
425     lazy_static! {
426         /// 250 appears to be unused/unregistered
427         static ref TEST_MDNS_IPV4: IpAddr = Ipv4Addr::new(224,0,0,250).into();
428         /// FA appears to be unused/unregistered
429         static ref TEST_MDNS_IPV6: IpAddr = Ipv6Addr::new(0xFF02, 0, 0, 0, 0, 0, 0, 0x00FA).into();
430     }
431 
432     // one_shot tests are basically clones from the udp tests
433     #[test]
test_next_random_socket()434     fn test_next_random_socket() {
435         // use env_logger;
436         // env_logger::init();
437 
438         let io_loop = runtime::Runtime::new().unwrap();
439         let (stream, _) = MdnsStream::new(
440             SocketAddr::new(*TEST_MDNS_IPV4, BASE_TEST_PORT),
441             MdnsQueryType::OneShot,
442             Some(1),
443             None,
444             None,
445         );
446         let result = io_loop.block_on(stream);
447 
448         if let Err(error) = result {
449             println!("Random address error: {:#?}", error);
450             panic!("failed to get next random address");
451         }
452     }
453 
454     // FIXME: reenable after breakage in async/await
455     #[ignore]
456     #[test]
test_one_shot_mdns_ipv4()457     fn test_one_shot_mdns_ipv4() {
458         one_shot_mdns_test(SocketAddr::new(*TEST_MDNS_IPV4, BASE_TEST_PORT + 1));
459     }
460 
461     #[test]
462     #[ignore]
test_one_shot_mdns_ipv6()463     fn test_one_shot_mdns_ipv6() {
464         one_shot_mdns_test(SocketAddr::new(*TEST_MDNS_IPV6, BASE_TEST_PORT + 2));
465     }
466 
467     //   as there are probably unexpected responses coming on the standard addresses
one_shot_mdns_test(mdns_addr: SocketAddr)468     fn one_shot_mdns_test(mdns_addr: SocketAddr) {
469         use std::time::Duration;
470 
471         let client_done = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
472 
473         let test_bytes: &'static [u8; 8] = b"DEADBEEF";
474         let send_recv_times = 10;
475         let client_done_clone = client_done.clone();
476 
477         // an in and out server
478         let server_handle = std::thread::Builder::new()
479             .name("test_one_shot_mdns:server".to_string())
480             .spawn(move || {
481                 let server_loop = runtime::Runtime::new().unwrap();
482                 let mut timeout = future::lazy(|_| tokio::time::sleep(Duration::from_millis(100)))
483                     .flatten()
484                     .boxed();
485 
486                 // TTLs are 0 so that multicast test packets never leave the test host...
487                 // FIXME: this is hardcoded to index 5 for ipv6, which isn't going to be correct in most cases...
488                 let (server_stream_future, mut server_sender) = MdnsStream::new(
489                     mdns_addr,
490                     MdnsQueryType::OneShotJoin,
491                     Some(1),
492                     None,
493                     Some(5),
494                 );
495 
496                 // For one-shot responses we are competing with a system mDNS responder, we will respond from a different port...
497                 let mut server_stream = server_loop
498                     .block_on(server_stream_future)
499                     .expect("could not create mDNS listener")
500                     .into_future();
501 
502                 for _ in 0..=send_recv_times {
503                     if client_done_clone.load(std::sync::atomic::Ordering::Relaxed) {
504                         return;
505                     }
506                     // wait for some bytes...
507                     match server_loop.block_on(
508                         future::lazy(|_| future::select(server_stream, timeout)).flatten(),
509                     ) {
510                         Either::Left((buffer_and_addr_stream_tmp, timeout_tmp)) => {
511                             let (buffer_and_addr, stream_tmp): (
512                                 Option<Result<SerialMessage, io::Error>>,
513                                 MdnsStream,
514                             ) = buffer_and_addr_stream_tmp;
515 
516                             server_stream = stream_tmp.into_future();
517                             timeout = timeout_tmp;
518                             let (buffer, addr) = buffer_and_addr
519                                 .expect("no msg received")
520                                 .expect("error receiving msg")
521                                 .into_parts();
522 
523                             assert_eq!(&buffer, test_bytes);
524                             //println!("server got data! {}", addr);
525 
526                             // bounce them right back...
527                             server_sender
528                                 .send(SerialMessage::new(test_bytes.to_vec(), addr))
529                                 .expect("could not send to client");
530                         }
531                         Either::Right(((), buffer_and_addr_stream_tmp)) => {
532                             server_stream = buffer_and_addr_stream_tmp;
533                             timeout =
534                                 future::lazy(|_| tokio::time::sleep(Duration::from_millis(100)))
535                                     .flatten()
536                                     .boxed();
537                         }
538                     }
539 
540                     // let the server turn for a bit... send the message
541                     server_loop.block_on(tokio::time::sleep(Duration::from_millis(100)));
542                 }
543             })
544             .unwrap();
545 
546         // setup the client, which is going to run on the testing thread...
547         let io_loop = runtime::Runtime::new().unwrap();
548 
549         // FIXME: this is hardcoded to index 5 for ipv6, which isn't going to be correct in most cases...
550         let (stream, mut sender) =
551             MdnsStream::new(mdns_addr, MdnsQueryType::OneShot, Some(1), None, Some(5));
552         let mut stream = io_loop.block_on(stream).ok().unwrap().into_future();
553         let mut timeout = future::lazy(|_| tokio::time::sleep(Duration::from_millis(100)))
554             .flatten()
555             .boxed();
556         let mut successes = 0;
557 
558         for _ in 0..send_recv_times {
559             // test once
560             sender
561                 .send(SerialMessage::new(test_bytes.to_vec(), mdns_addr))
562                 .unwrap();
563 
564             println!("client sending data!");
565 
566             // TODO: this lazy isn't needed is it?
567             match io_loop.block_on(future::lazy(|_| future::select(stream, timeout)).flatten()) {
568                 Either::Left((buffer_and_addr_stream_tmp, timeout_tmp)) => {
569                     let (buffer_and_addr, stream_tmp) = buffer_and_addr_stream_tmp;
570                     stream = stream_tmp.into_future();
571                     timeout = timeout_tmp;
572 
573                     let (buffer, _addr) = buffer_and_addr
574                         .expect("no msg received")
575                         .expect("error receiving msg")
576                         .into_parts();
577                     println!("client got data!");
578 
579                     assert_eq!(&buffer, test_bytes);
580                     successes += 1;
581                 }
582                 Either::Right(((), buffer_and_addr_stream_tmp)) => {
583                     stream = buffer_and_addr_stream_tmp;
584                     timeout = future::lazy(|_| tokio::time::sleep(Duration::from_millis(100)))
585                         .flatten()
586                         .boxed();
587                 }
588             }
589         }
590 
591         client_done.store(true, std::sync::atomic::Ordering::Relaxed);
592         println!("successes: {}", successes);
593         assert!(successes >= 1);
594         server_handle.join().expect("server thread failed");
595     }
596 
597     // FIXME: reenable after breakage in async/await
598     #[ignore]
599     #[test]
test_passive_mdns()600     fn test_passive_mdns() {
601         passive_mdns_test(
602             MdnsQueryType::Passive,
603             SocketAddr::new(*TEST_MDNS_IPV4, BASE_TEST_PORT + 3),
604         )
605     }
606 
607     // FIXME: reenable after breakage in async/await
608     #[ignore]
609     #[test]
test_oneshot_join_mdns()610     fn test_oneshot_join_mdns() {
611         passive_mdns_test(
612             MdnsQueryType::OneShotJoin,
613             SocketAddr::new(*TEST_MDNS_IPV4, BASE_TEST_PORT + 4),
614         )
615     }
616 
617     //   as there are probably unexpected responses coming on the standard addresses
passive_mdns_test(mdns_query_type: MdnsQueryType, mdns_addr: SocketAddr)618     fn passive_mdns_test(mdns_query_type: MdnsQueryType, mdns_addr: SocketAddr) {
619         use std::time::Duration;
620 
621         let server_got_packet = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
622 
623         let test_bytes: &'static [u8; 8] = b"DEADBEEF";
624         let send_recv_times = 10;
625         let server_got_packet_clone = server_got_packet.clone();
626 
627         // an in and out server
628         let _server_handle = std::thread::Builder::new()
629             .name("test_one_shot_mdns:server".to_string())
630             .spawn(move || {
631                 let io_loop = runtime::Runtime::new().unwrap();
632                 let mut timeout = future::lazy(|_| tokio::time::sleep(Duration::from_millis(100)))
633                     .flatten()
634                     .boxed();
635 
636                 // TTLs are 0 so that multicast test packets never leave the test host...
637                 // FIXME: this is hardcoded to index 5 for ipv6, which isn't going to be correct in most cases...
638                 let (server_stream_future, _server_sender) =
639                     MdnsStream::new(mdns_addr, mdns_query_type, Some(1), None, Some(5));
640 
641                 // For one-shot responses we are competing with a system mDNS responder, we will respond from a different port...
642                 let mut server_stream = io_loop
643                     .block_on(server_stream_future)
644                     .expect("could not create mDNS listener")
645                     .into_future();
646 
647                 for _ in 0..=send_recv_times {
648                     // wait for some bytes...
649                     match io_loop.block_on(
650                         future::lazy(|_| future::select(server_stream, timeout)).flatten(),
651                     ) {
652                         Either::Left((_buffer_and_addr_stream_tmp, _timeout_tmp)) => {
653                             // let (buffer_and_addr, stream_tmp) = buffer_and_addr_stream_tmp;
654 
655                             // server_stream = stream_tmp.into_future();
656                             // timeout = timeout_tmp;
657                             // let (buffer, addr) = buffer_and_addr.expect("no buffer received");
658 
659                             // assert_eq!(&buffer, test_bytes);
660                             // println!("server got data! {}", addr);
661 
662                             server_got_packet_clone
663                                 .store(true, std::sync::atomic::Ordering::Relaxed);
664                             return;
665                         }
666                         Either::Right(((), buffer_and_addr_stream_tmp)) => {
667                             server_stream = buffer_and_addr_stream_tmp;
668                             timeout =
669                                 future::lazy(|_| tokio::time::sleep(Duration::from_millis(100)))
670                                     .flatten()
671                                     .boxed();
672                         }
673                     }
674 
675                     // let the server turn for a bit... send the message
676                     io_loop.block_on(tokio::time::sleep(Duration::from_millis(100)));
677                 }
678             })
679             .unwrap();
680 
681         // setup the client, which is going to run on the testing thread...
682         let io_loop = runtime::Runtime::new().unwrap();
683         // FIXME: this is hardcoded to index 5 for ipv6, which isn't going to be correct in most cases...
684         let (stream, mut sender) =
685             MdnsStream::new(mdns_addr, MdnsQueryType::OneShot, Some(1), None, Some(5));
686         let mut stream = io_loop.block_on(stream).ok().unwrap().into_future();
687         let mut timeout = future::lazy(|_| tokio::time::sleep(Duration::from_millis(100)))
688             .flatten()
689             .boxed();
690 
691         for _ in 0..send_recv_times {
692             // test once
693             sender
694                 .send(SerialMessage::new(test_bytes.to_vec(), mdns_addr))
695                 .unwrap();
696 
697             println!("client sending data!");
698 
699             // TODO: this lazy is probably unnecessary?
700             let run_result =
701                 io_loop.block_on(future::lazy(|_| future::select(stream, timeout)).flatten());
702 
703             if server_got_packet.load(std::sync::atomic::Ordering::Relaxed) {
704                 return;
705             }
706 
707             match run_result {
708                 Either::Left((buffer_and_addr_stream_tmp, timeout_tmp)) => {
709                     let (_buffer_and_addr, stream_tmp) = buffer_and_addr_stream_tmp;
710                     stream = stream_tmp.into_future();
711                     timeout = timeout_tmp;
712                 }
713                 Either::Right(((), buffer_and_addr_stream_tmp)) => {
714                     stream = buffer_and_addr_stream_tmp;
715                     timeout = future::lazy(|_| tokio::time::sleep(Duration::from_millis(100)))
716                         .flatten()
717                         .boxed();
718                 }
719             }
720         }
721 
722         panic!("server never got packet.");
723     }
724 }
725