1 // Copyright 2015-2018 Benjamin Fry <benjaminfry@me.com> 2 // 3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5 // http://opensource.org/licenses/MIT>, at your option. This file may not be 6 // copied, modified, or distributed except according to those terms. 7 8 use std; 9 use std::io; 10 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; 11 use std::pin::Pin; 12 use std::sync::Arc; 13 use std::task::{Context, Poll}; 14 15 use futures_util::stream::{Stream, StreamExt}; 16 use futures_util::{future, future::Future, ready, FutureExt, TryFutureExt}; 17 use lazy_static::lazy_static; 18 use log::{debug, trace}; 19 use rand; 20 use rand::distributions::{uniform::Uniform, Distribution}; 21 use socket2::{self, Socket}; 22 use tokio::net::UdpSocket; 23 24 use crate::multicast::MdnsQueryType; 25 use crate::udp::UdpStream; 26 use crate::xfer::SerialMessage; 27 use crate::BufStreamHandle; 28 29 pub(crate) const MDNS_PORT: u16 = 5353; 30 lazy_static! { 31 /// mDNS ipv4 address https://www.iana.org/assignments/multicast-addresses/multicast-addresses.xhtml 32 pub static ref MDNS_IPV4: SocketAddr = SocketAddr::new(Ipv4Addr::new(224,0,0,251).into(), MDNS_PORT); 33 /// link-local mDNS ipv6 address https://www.iana.org/assignments/ipv6-multicast-addresses/ipv6-multicast-addresses.xhtml 34 pub static ref MDNS_IPV6: SocketAddr = SocketAddr::new(Ipv6Addr::new(0xFF02, 0, 0, 0, 0, 0, 0, 0x00FB).into(), MDNS_PORT); 35 } 36 37 /// A UDP stream of DNS binary packets 38 #[must_use = "futures do nothing unless polled"] 39 pub struct MdnsStream { 40 /// Multicast address used for mDNS queries 41 multicast_addr: SocketAddr, 42 /// This is used for sending and (directly) receiving messages 43 datagram: Option<UdpStream<UdpSocket>>, 44 // FIXME: like UdpStream, this Arc is unnecessary, only needed for temp async/await capture below 45 /// In one-shot multicast, this will not join the multicast group 46 multicast: Option<Arc<UdpSocket>>, 47 /// Receiving portion of the MdnsStream 48 rcving_mcast: Option<Pin<Box<dyn Future<Output = io::Result<SerialMessage>> + Send>>>, 49 } 50 51 impl MdnsStream { 52 /// associates the socket to the well-known ipv4 multicast address new_ipv4( mdns_query_type: MdnsQueryType, packet_ttl: Option<u32>, ipv4_if: Option<Ipv4Addr>, ) -> ( Box<dyn Future<Output = Result<MdnsStream, io::Error>> + Send + Unpin>, BufStreamHandle, )53 pub fn new_ipv4( 54 mdns_query_type: MdnsQueryType, 55 packet_ttl: Option<u32>, 56 ipv4_if: Option<Ipv4Addr>, 57 ) -> ( 58 Box<dyn Future<Output = Result<MdnsStream, io::Error>> + Send + Unpin>, 59 BufStreamHandle, 60 ) { 61 Self::new(*MDNS_IPV4, mdns_query_type, packet_ttl, ipv4_if, None) 62 } 63 64 /// associates the socket to the well-known ipv6 multicast address new_ipv6( mdns_query_type: MdnsQueryType, packet_ttl: Option<u32>, ipv6_if: Option<u32>, ) -> ( Box<dyn Future<Output = Result<MdnsStream, io::Error>> + Send + Unpin>, BufStreamHandle, )65 pub fn new_ipv6( 66 mdns_query_type: MdnsQueryType, 67 packet_ttl: Option<u32>, 68 ipv6_if: Option<u32>, 69 ) -> ( 70 Box<dyn Future<Output = Result<MdnsStream, io::Error>> + Send + Unpin>, 71 BufStreamHandle, 72 ) { 73 Self::new(*MDNS_IPV6, mdns_query_type, packet_ttl, None, ipv6_if) 74 } 75 76 /// Returns the address of the multicast network in use multicast_addr(&self) -> SocketAddr77 pub fn multicast_addr(&self) -> SocketAddr { 78 self.multicast_addr 79 } 80 81 /// This method is available for specifying a custom Multicast address to use. 82 /// 83 /// In general this operates nearly identically to UDP, except that it automatically joins 84 /// the default multicast DNS addresses. See https://tools.ietf.org/html/rfc6762#section-5 85 /// for details. 86 /// 87 /// When sending ipv6 multicast packets, the interface being used is required, 88 /// this will panic if the interface is not specified for all MdnsQueryType except Passive 89 /// (which does not allow sending data) 90 /// 91 /// # Arguments 92 /// 93 /// * `multicast_addr` - address to use for multicast requests 94 /// * `mdns_query_type` - true if the querier using this socket will only perform standard DNS queries over multicast. 95 /// * `ipv4_if` - Address to bind to for sending multicast packets, defaults to `0.0.0.0` if not specified (not relevant for ipv6) 96 /// * `ipv6_if` - Interface index for the interface to be used when sending ipv6 packets. 97 /// 98 /// # Return 99 /// 100 /// a tuple of a Future Stream which will handle sending and receiving messages, and a 101 /// handle which can be used to send messages into the stream. new( multicast_addr: SocketAddr, mdns_query_type: MdnsQueryType, packet_ttl: Option<u32>, ipv4_if: Option<Ipv4Addr>, ipv6_if: Option<u32>, ) -> ( Box<dyn Future<Output = Result<MdnsStream, io::Error>> + Send + Unpin>, BufStreamHandle, )102 pub fn new( 103 multicast_addr: SocketAddr, 104 mdns_query_type: MdnsQueryType, 105 packet_ttl: Option<u32>, 106 ipv4_if: Option<Ipv4Addr>, 107 ipv6_if: Option<u32>, 108 ) -> ( 109 Box<dyn Future<Output = Result<MdnsStream, io::Error>> + Send + Unpin>, 110 BufStreamHandle, 111 ) { 112 let (message_sender, outbound_messages) = BufStreamHandle::create(); 113 let multicast_socket = match Self::join_multicast(&multicast_addr, mdns_query_type) { 114 Ok(socket) => socket, 115 Err(err) => return (Box::new(future::err(err)), message_sender), 116 }; 117 118 // TODO: allow the bind address to be specified... 119 // constructs a future for getting the next randomly bound port to a UdpSocket 120 let next_socket = Self::next_bound_local_address( 121 &multicast_addr, 122 mdns_query_type, 123 packet_ttl, 124 ipv4_if, 125 ipv6_if, 126 ); 127 128 // while 0 is meant to keep the packet on localhost, linux regards this as an error, 129 // while macOS (BSD?) and Windows allow it. 130 if let Some(ttl) = packet_ttl { 131 assert!(ttl > 0, "TTL must be greater than 0"); 132 } 133 134 // This set of futures collapses the next udp socket into a stream which can be used for 135 // sending and receiving udp packets. 136 let stream = { 137 Box::new( 138 next_socket 139 .map(move |socket| match socket { 140 Ok(Some(socket)) => Ok(Some(UdpSocket::from_std(socket)?)), 141 Ok(None) => Ok(None), 142 Err(err) => Err(err), 143 }) 144 .map_ok(move |socket: Option<_>| { 145 let datagram: Option<_> = 146 socket.map(|socket| UdpStream::from_parts(socket, outbound_messages)); 147 let multicast: Option<_> = multicast_socket.map(|multicast_socket| { 148 Arc::new(UdpSocket::from_std(multicast_socket).expect("bad handle?")) 149 }); 150 151 MdnsStream { 152 multicast_addr, 153 datagram, 154 multicast, 155 rcving_mcast: None, 156 } 157 }), 158 ) 159 }; 160 161 (stream, message_sender) 162 } 163 164 /// On Windows, unlike all Unix variants, it is improper to bind to the multicast address 165 /// 166 /// see https://msdn.microsoft.com/en-us/library/windows/desktop/ms737550(v=vs.85).aspx 167 #[cfg(windows)] bind_multicast(socket: &Socket, multicast_addr: &SocketAddr) -> io::Result<()>168 fn bind_multicast(socket: &Socket, multicast_addr: &SocketAddr) -> io::Result<()> { 169 let multicast_addr = match *multicast_addr { 170 SocketAddr::V4(addr) => SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), addr.port()), 171 SocketAddr::V6(addr) => { 172 SocketAddr::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(), addr.port()) 173 } 174 }; 175 socket.bind(&socket2::SockAddr::from(multicast_addr)) 176 } 177 178 /// On unixes we bind to the multicast address, which causes multicast packets to be filtered 179 #[cfg(unix)] bind_multicast(socket: &Socket, multicast_addr: &SocketAddr) -> io::Result<()>180 fn bind_multicast(socket: &Socket, multicast_addr: &SocketAddr) -> io::Result<()> { 181 socket.bind(&socket2::SockAddr::from(*multicast_addr)) 182 } 183 184 /// Returns a socket joined to the multicast address join_multicast( multicast_addr: &SocketAddr, mdns_query_type: MdnsQueryType, ) -> Result<Option<std::net::UdpSocket>, io::Error>185 fn join_multicast( 186 multicast_addr: &SocketAddr, 187 mdns_query_type: MdnsQueryType, 188 ) -> Result<Option<std::net::UdpSocket>, io::Error> { 189 if !mdns_query_type.join_multicast() { 190 return Ok(None); 191 } 192 193 let ip_addr = multicast_addr.ip(); 194 // it's an error to not use a proper mDNS address 195 if !ip_addr.is_multicast() { 196 return Err(io::Error::new( 197 io::ErrorKind::Other, 198 format!("expected multicast address for binding: {}", ip_addr), 199 )); 200 } 201 202 // binding the UdpSocket to the multicast address tells the OS to filter all packets on this socket to just this 203 // multicast address 204 // TODO: allow the binding interface to be specified 205 let socket = match ip_addr { 206 IpAddr::V4(ref mdns_v4) => { 207 let socket = Socket::new( 208 socket2::Domain::ipv4(), 209 socket2::Type::dgram(), 210 Some(socket2::Protocol::udp()), 211 )?; 212 socket.join_multicast_v4(mdns_v4, &Ipv4Addr::new(0, 0, 0, 0))?; 213 socket 214 } 215 IpAddr::V6(ref mdns_v6) => { 216 let socket = Socket::new( 217 socket2::Domain::ipv6(), 218 socket2::Type::dgram(), 219 Some(socket2::Protocol::udp()), 220 )?; 221 222 socket.set_only_v6(true)?; 223 socket.join_multicast_v6(mdns_v6, 0)?; 224 socket 225 } 226 }; 227 228 socket.set_nonblocking(true)?; 229 socket.set_reuse_address(true)?; 230 #[cfg(unix)] // this is currently restricted to Unix's in socket2 231 socket.set_reuse_port(true)?; 232 Self::bind_multicast(&socket, multicast_addr)?; 233 234 debug!("joined {}", multicast_addr); 235 Ok(Some(socket.into_udp_socket())) 236 } 237 238 /// Creates a future for randomly binding to a local socket address for client connections. next_bound_local_address( multicast_addr: &SocketAddr, mdns_query_type: MdnsQueryType, packet_ttl: Option<u32>, ipv4_if: Option<Ipv4Addr>, ipv6_if: Option<u32>, ) -> NextRandomUdpSocket239 fn next_bound_local_address( 240 multicast_addr: &SocketAddr, 241 mdns_query_type: MdnsQueryType, 242 packet_ttl: Option<u32>, 243 ipv4_if: Option<Ipv4Addr>, 244 ipv6_if: Option<u32>, 245 ) -> NextRandomUdpSocket { 246 let bind_address: IpAddr = match *multicast_addr { 247 SocketAddr::V4(..) => IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 248 SocketAddr::V6(..) => IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 249 }; 250 251 NextRandomUdpSocket { 252 bind_address, 253 mdns_query_type, 254 packet_ttl, 255 ipv4_if, 256 ipv6_if, 257 } 258 } 259 } 260 261 impl Stream for MdnsStream { 262 type Item = io::Result<SerialMessage>; 263 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>264 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 265 assert!(self.datagram.is_some() || self.multicast.is_some()); 266 267 // we poll the datagram socket first, if available, since it's a direct response or direct request 268 if let Some(ref mut datagram) = self.as_mut().datagram { 269 match datagram.poll_next_unpin(cx) { 270 Poll::Ready(ready) => return Poll::Ready(ready), 271 Poll::Pending => (), // drop through 272 } 273 } 274 275 loop { 276 let msg = if let Some(ref mut receiving) = self.rcving_mcast { 277 // TODO: should we drop this packet if it's not from the same src as dest? 278 let msg = ready!(receiving.as_mut().poll_unpin(cx))?; 279 280 Some(Poll::Ready(Some(Ok(msg)))) 281 } else { 282 None 283 }; 284 285 self.rcving_mcast = None; 286 287 if let Some(msg) = msg { 288 return msg; 289 } 290 291 // let socket = Arc::clone(socket); 292 if let Some(ref socket) = self.multicast { 293 let socket = Arc::clone(socket); 294 let receive_future = async { 295 let socket = socket; 296 let mut buf = [0u8; 2048]; 297 let (len, src) = socket.recv_from(&mut buf).await?; 298 299 Ok(SerialMessage::new( 300 buf.iter().take(len).cloned().collect(), 301 src, 302 )) 303 }; 304 305 self.rcving_mcast = Some(Box::pin(receive_future.boxed())); 306 } 307 } 308 } 309 } 310 311 #[must_use = "futures do nothing unless polled"] 312 struct NextRandomUdpSocket { 313 bind_address: IpAddr, 314 mdns_query_type: MdnsQueryType, 315 packet_ttl: Option<u32>, 316 ipv4_if: Option<Ipv4Addr>, 317 ipv6_if: Option<u32>, 318 } 319 320 impl NextRandomUdpSocket { prepare_sender(&self, socket: std::net::UdpSocket) -> io::Result<std::net::UdpSocket>321 fn prepare_sender(&self, socket: std::net::UdpSocket) -> io::Result<std::net::UdpSocket> { 322 let addr = socket.local_addr()?; 323 debug!("preparing sender on: {}", addr); 324 325 let socket = Socket::from(socket); 326 327 // TODO: TTL doesn't work on ipv6 328 match addr { 329 SocketAddr::V4(..) => { 330 socket.set_multicast_loop_v4(true)?; 331 socket.set_multicast_if_v4( 332 &self.ipv4_if.unwrap_or_else(|| Ipv4Addr::new(0, 0, 0, 0)), 333 )?; 334 if let Some(ttl) = self.packet_ttl { 335 socket.set_ttl(ttl)?; 336 socket.set_multicast_ttl_v4(ttl)?; 337 } 338 } 339 SocketAddr::V6(..) => { 340 let ipv6_if = self.ipv6_if.unwrap_or_else(|| { 341 panic!("for ipv6 multicasting the interface must be specified") 342 }); 343 344 socket.set_multicast_loop_v6(true)?; 345 socket.set_multicast_if_v6(ipv6_if)?; 346 if let Some(ttl) = self.packet_ttl { 347 socket.set_unicast_hops_v6(ttl)?; 348 socket.set_multicast_hops_v6(ttl)?; 349 } 350 } 351 } 352 353 Ok(socket.into_udp_socket()) 354 } 355 } 356 357 impl Future for NextRandomUdpSocket { 358 // TODO: clean this up, the RandomUdpSocket shouldnt' care about the query type 359 type Output = io::Result<Option<std::net::UdpSocket>>; 360 361 /// polls until there is an available next random UDP port. 362 /// 363 /// if there is no port available after 10 attempts, returns NotReady poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>364 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 365 // non-one-shot, i.e. continuous, always use one of the well-known mdns ports and bind to the multicast addr 366 if !self.mdns_query_type.sender() { 367 debug!("skipping sending stream"); 368 Poll::Ready(Ok(None)) 369 } else if self.mdns_query_type.bind_on_5353() { 370 let addr = SocketAddr::new(self.bind_address, MDNS_PORT); 371 debug!("binding sending stream to {}", addr); 372 let socket = std::net::UdpSocket::bind(&addr)?; 373 let socket = self.prepare_sender(socket)?; 374 375 Poll::Ready(Ok(Some(socket))) 376 } else { 377 // TODO: this is basically identical to UdpStream from here... share some code? (except for the port restriction) 378 // one-shot queries look very similar to UDP socket, but can't listen on 5353 379 let rand_port_range = Uniform::new_inclusive(1025_u16, u16::max_value()); 380 let mut rand = rand::thread_rng(); 381 382 for attempt in 0..10 { 383 let port = rand_port_range.sample(&mut rand); // the range is [0 ... u16::max] 384 385 // see one_shot usage info: https://tools.ietf.org/html/rfc6762#section-5 386 // the MDNS_PORT is used to signal to remote processes that this is capable of receiving multicast packets 387 // i.e. is joined to the multicast address. 388 if port == MDNS_PORT { 389 trace!("unlucky, got MDNS_PORT"); 390 continue; 391 } 392 393 let addr = SocketAddr::new(self.bind_address, port); 394 debug!("binding sending stream to {}", addr); 395 396 match std::net::UdpSocket::bind(&addr) { 397 Ok(socket) => { 398 let socket = self.prepare_sender(socket)?; 399 return Poll::Ready(Ok(Some(socket))); 400 } 401 Err(err) => debug!("unable to bind port, attempt: {}: {}", attempt, err), 402 } 403 } 404 405 debug!("could not get next random port, delaying"); 406 407 // TODO: this replaced a task::current().notify, is it correct? 408 cx.waker().wake_by_ref(); 409 Poll::Pending 410 } 411 } 412 } 413 414 #[cfg(test)] 415 pub(crate) mod tests { 416 #![allow(clippy::dbg_macro, clippy::print_stdout)] 417 418 use super::*; 419 use futures_util::future::Either; 420 use tokio::runtime; 421 422 // TODO: is there a better way? 423 const BASE_TEST_PORT: u16 = 5379; 424 425 lazy_static! { 426 /// 250 appears to be unused/unregistered 427 static ref TEST_MDNS_IPV4: IpAddr = Ipv4Addr::new(224,0,0,250).into(); 428 /// FA appears to be unused/unregistered 429 static ref TEST_MDNS_IPV6: IpAddr = Ipv6Addr::new(0xFF02, 0, 0, 0, 0, 0, 0, 0x00FA).into(); 430 } 431 432 // one_shot tests are basically clones from the udp tests 433 #[test] test_next_random_socket()434 fn test_next_random_socket() { 435 // use env_logger; 436 // env_logger::init(); 437 438 let io_loop = runtime::Runtime::new().unwrap(); 439 let (stream, _) = MdnsStream::new( 440 SocketAddr::new(*TEST_MDNS_IPV4, BASE_TEST_PORT), 441 MdnsQueryType::OneShot, 442 Some(1), 443 None, 444 None, 445 ); 446 let result = io_loop.block_on(stream); 447 448 if let Err(error) = result { 449 println!("Random address error: {:#?}", error); 450 panic!("failed to get next random address"); 451 } 452 } 453 454 // FIXME: reenable after breakage in async/await 455 #[ignore] 456 #[test] test_one_shot_mdns_ipv4()457 fn test_one_shot_mdns_ipv4() { 458 one_shot_mdns_test(SocketAddr::new(*TEST_MDNS_IPV4, BASE_TEST_PORT + 1)); 459 } 460 461 #[test] 462 #[ignore] test_one_shot_mdns_ipv6()463 fn test_one_shot_mdns_ipv6() { 464 one_shot_mdns_test(SocketAddr::new(*TEST_MDNS_IPV6, BASE_TEST_PORT + 2)); 465 } 466 467 // as there are probably unexpected responses coming on the standard addresses one_shot_mdns_test(mdns_addr: SocketAddr)468 fn one_shot_mdns_test(mdns_addr: SocketAddr) { 469 use std::time::Duration; 470 471 let client_done = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); 472 473 let test_bytes: &'static [u8; 8] = b"DEADBEEF"; 474 let send_recv_times = 10; 475 let client_done_clone = client_done.clone(); 476 477 // an in and out server 478 let server_handle = std::thread::Builder::new() 479 .name("test_one_shot_mdns:server".to_string()) 480 .spawn(move || { 481 let server_loop = runtime::Runtime::new().unwrap(); 482 let mut timeout = future::lazy(|_| tokio::time::sleep(Duration::from_millis(100))) 483 .flatten() 484 .boxed(); 485 486 // TTLs are 0 so that multicast test packets never leave the test host... 487 // FIXME: this is hardcoded to index 5 for ipv6, which isn't going to be correct in most cases... 488 let (server_stream_future, mut server_sender) = MdnsStream::new( 489 mdns_addr, 490 MdnsQueryType::OneShotJoin, 491 Some(1), 492 None, 493 Some(5), 494 ); 495 496 // For one-shot responses we are competing with a system mDNS responder, we will respond from a different port... 497 let mut server_stream = server_loop 498 .block_on(server_stream_future) 499 .expect("could not create mDNS listener") 500 .into_future(); 501 502 for _ in 0..=send_recv_times { 503 if client_done_clone.load(std::sync::atomic::Ordering::Relaxed) { 504 return; 505 } 506 // wait for some bytes... 507 match server_loop.block_on( 508 future::lazy(|_| future::select(server_stream, timeout)).flatten(), 509 ) { 510 Either::Left((buffer_and_addr_stream_tmp, timeout_tmp)) => { 511 let (buffer_and_addr, stream_tmp): ( 512 Option<Result<SerialMessage, io::Error>>, 513 MdnsStream, 514 ) = buffer_and_addr_stream_tmp; 515 516 server_stream = stream_tmp.into_future(); 517 timeout = timeout_tmp; 518 let (buffer, addr) = buffer_and_addr 519 .expect("no msg received") 520 .expect("error receiving msg") 521 .into_parts(); 522 523 assert_eq!(&buffer, test_bytes); 524 //println!("server got data! {}", addr); 525 526 // bounce them right back... 527 server_sender 528 .send(SerialMessage::new(test_bytes.to_vec(), addr)) 529 .expect("could not send to client"); 530 } 531 Either::Right(((), buffer_and_addr_stream_tmp)) => { 532 server_stream = buffer_and_addr_stream_tmp; 533 timeout = 534 future::lazy(|_| tokio::time::sleep(Duration::from_millis(100))) 535 .flatten() 536 .boxed(); 537 } 538 } 539 540 // let the server turn for a bit... send the message 541 server_loop.block_on(tokio::time::sleep(Duration::from_millis(100))); 542 } 543 }) 544 .unwrap(); 545 546 // setup the client, which is going to run on the testing thread... 547 let io_loop = runtime::Runtime::new().unwrap(); 548 549 // FIXME: this is hardcoded to index 5 for ipv6, which isn't going to be correct in most cases... 550 let (stream, mut sender) = 551 MdnsStream::new(mdns_addr, MdnsQueryType::OneShot, Some(1), None, Some(5)); 552 let mut stream = io_loop.block_on(stream).ok().unwrap().into_future(); 553 let mut timeout = future::lazy(|_| tokio::time::sleep(Duration::from_millis(100))) 554 .flatten() 555 .boxed(); 556 let mut successes = 0; 557 558 for _ in 0..send_recv_times { 559 // test once 560 sender 561 .send(SerialMessage::new(test_bytes.to_vec(), mdns_addr)) 562 .unwrap(); 563 564 println!("client sending data!"); 565 566 // TODO: this lazy isn't needed is it? 567 match io_loop.block_on(future::lazy(|_| future::select(stream, timeout)).flatten()) { 568 Either::Left((buffer_and_addr_stream_tmp, timeout_tmp)) => { 569 let (buffer_and_addr, stream_tmp) = buffer_and_addr_stream_tmp; 570 stream = stream_tmp.into_future(); 571 timeout = timeout_tmp; 572 573 let (buffer, _addr) = buffer_and_addr 574 .expect("no msg received") 575 .expect("error receiving msg") 576 .into_parts(); 577 println!("client got data!"); 578 579 assert_eq!(&buffer, test_bytes); 580 successes += 1; 581 } 582 Either::Right(((), buffer_and_addr_stream_tmp)) => { 583 stream = buffer_and_addr_stream_tmp; 584 timeout = future::lazy(|_| tokio::time::sleep(Duration::from_millis(100))) 585 .flatten() 586 .boxed(); 587 } 588 } 589 } 590 591 client_done.store(true, std::sync::atomic::Ordering::Relaxed); 592 println!("successes: {}", successes); 593 assert!(successes >= 1); 594 server_handle.join().expect("server thread failed"); 595 } 596 597 // FIXME: reenable after breakage in async/await 598 #[ignore] 599 #[test] test_passive_mdns()600 fn test_passive_mdns() { 601 passive_mdns_test( 602 MdnsQueryType::Passive, 603 SocketAddr::new(*TEST_MDNS_IPV4, BASE_TEST_PORT + 3), 604 ) 605 } 606 607 // FIXME: reenable after breakage in async/await 608 #[ignore] 609 #[test] test_oneshot_join_mdns()610 fn test_oneshot_join_mdns() { 611 passive_mdns_test( 612 MdnsQueryType::OneShotJoin, 613 SocketAddr::new(*TEST_MDNS_IPV4, BASE_TEST_PORT + 4), 614 ) 615 } 616 617 // as there are probably unexpected responses coming on the standard addresses passive_mdns_test(mdns_query_type: MdnsQueryType, mdns_addr: SocketAddr)618 fn passive_mdns_test(mdns_query_type: MdnsQueryType, mdns_addr: SocketAddr) { 619 use std::time::Duration; 620 621 let server_got_packet = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); 622 623 let test_bytes: &'static [u8; 8] = b"DEADBEEF"; 624 let send_recv_times = 10; 625 let server_got_packet_clone = server_got_packet.clone(); 626 627 // an in and out server 628 let _server_handle = std::thread::Builder::new() 629 .name("test_one_shot_mdns:server".to_string()) 630 .spawn(move || { 631 let io_loop = runtime::Runtime::new().unwrap(); 632 let mut timeout = future::lazy(|_| tokio::time::sleep(Duration::from_millis(100))) 633 .flatten() 634 .boxed(); 635 636 // TTLs are 0 so that multicast test packets never leave the test host... 637 // FIXME: this is hardcoded to index 5 for ipv6, which isn't going to be correct in most cases... 638 let (server_stream_future, _server_sender) = 639 MdnsStream::new(mdns_addr, mdns_query_type, Some(1), None, Some(5)); 640 641 // For one-shot responses we are competing with a system mDNS responder, we will respond from a different port... 642 let mut server_stream = io_loop 643 .block_on(server_stream_future) 644 .expect("could not create mDNS listener") 645 .into_future(); 646 647 for _ in 0..=send_recv_times { 648 // wait for some bytes... 649 match io_loop.block_on( 650 future::lazy(|_| future::select(server_stream, timeout)).flatten(), 651 ) { 652 Either::Left((_buffer_and_addr_stream_tmp, _timeout_tmp)) => { 653 // let (buffer_and_addr, stream_tmp) = buffer_and_addr_stream_tmp; 654 655 // server_stream = stream_tmp.into_future(); 656 // timeout = timeout_tmp; 657 // let (buffer, addr) = buffer_and_addr.expect("no buffer received"); 658 659 // assert_eq!(&buffer, test_bytes); 660 // println!("server got data! {}", addr); 661 662 server_got_packet_clone 663 .store(true, std::sync::atomic::Ordering::Relaxed); 664 return; 665 } 666 Either::Right(((), buffer_and_addr_stream_tmp)) => { 667 server_stream = buffer_and_addr_stream_tmp; 668 timeout = 669 future::lazy(|_| tokio::time::sleep(Duration::from_millis(100))) 670 .flatten() 671 .boxed(); 672 } 673 } 674 675 // let the server turn for a bit... send the message 676 io_loop.block_on(tokio::time::sleep(Duration::from_millis(100))); 677 } 678 }) 679 .unwrap(); 680 681 // setup the client, which is going to run on the testing thread... 682 let io_loop = runtime::Runtime::new().unwrap(); 683 // FIXME: this is hardcoded to index 5 for ipv6, which isn't going to be correct in most cases... 684 let (stream, mut sender) = 685 MdnsStream::new(mdns_addr, MdnsQueryType::OneShot, Some(1), None, Some(5)); 686 let mut stream = io_loop.block_on(stream).ok().unwrap().into_future(); 687 let mut timeout = future::lazy(|_| tokio::time::sleep(Duration::from_millis(100))) 688 .flatten() 689 .boxed(); 690 691 for _ in 0..send_recv_times { 692 // test once 693 sender 694 .send(SerialMessage::new(test_bytes.to_vec(), mdns_addr)) 695 .unwrap(); 696 697 println!("client sending data!"); 698 699 // TODO: this lazy is probably unnecessary? 700 let run_result = 701 io_loop.block_on(future::lazy(|_| future::select(stream, timeout)).flatten()); 702 703 if server_got_packet.load(std::sync::atomic::Ordering::Relaxed) { 704 return; 705 } 706 707 match run_result { 708 Either::Left((buffer_and_addr_stream_tmp, timeout_tmp)) => { 709 let (_buffer_and_addr, stream_tmp) = buffer_and_addr_stream_tmp; 710 stream = stream_tmp.into_future(); 711 timeout = timeout_tmp; 712 } 713 Either::Right(((), buffer_and_addr_stream_tmp)) => { 714 stream = buffer_and_addr_stream_tmp; 715 timeout = future::lazy(|_| tokio::time::sleep(Duration::from_millis(100))) 716 .flatten() 717 .boxed(); 718 } 719 } 720 } 721 722 panic!("server never got packet."); 723 } 724 } 725