1 #[macro_use(quick_error)] 2 extern crate quick_error; 3 4 #[macro_use] 5 extern crate log; 6 7 extern crate byteorder; 8 extern crate futures; 9 extern crate get_if_addrs; 10 extern crate hostname; 11 extern crate multimap; 12 extern crate net2; 13 extern crate rand; 14 extern crate tokio_core as tokio; 15 16 use futures::sync::mpsc; 17 use futures::Future; 18 use std::cell::RefCell; 19 use std::io; 20 use std::sync::{Arc, RwLock}; 21 use std::thread; 22 use tokio::reactor::{Core, Handle}; 23 24 mod dns_parser; 25 use dns_parser::Name; 26 27 mod address_family; 28 mod fsm; 29 mod services; 30 31 use address_family::{Inet, Inet6}; 32 use fsm::{Command, FSM}; 33 use services::{ServiceData, Services, ServicesInner}; 34 35 const DEFAULT_TTL: u32 = 60; 36 const MDNS_PORT: u16 = 5353; 37 38 pub struct Responder { 39 services: Services, 40 commands: RefCell<CommandSender>, 41 shutdown: Arc<Shutdown>, 42 } 43 44 pub struct Service { 45 id: usize, 46 services: Services, 47 commands: CommandSender, 48 _shutdown: Arc<Shutdown>, 49 } 50 51 type ResponderTask = Box<Future<Item = (), Error = io::Error> + Send>; 52 53 impl Responder { setup_core() -> io::Result<(Core, ResponderTask, Responder)>54 fn setup_core() -> io::Result<(Core, ResponderTask, Responder)> { 55 let core = Core::new()?; 56 let (responder, task) = Self::with_handle(&core.handle())?; 57 Ok((core, task, responder)) 58 } 59 new() -> io::Result<Responder>60 pub fn new() -> io::Result<Responder> { 61 let (tx, rx) = std::sync::mpsc::sync_channel(0); 62 thread::Builder::new() 63 .name("mdns-responder".to_owned()) 64 .spawn(move || match Self::setup_core() { 65 Ok((mut core, task, responder)) => { 66 tx.send(Ok(responder)).expect("tx responder channel closed"); 67 core.run(task).expect("mdns thread failed"); 68 } 69 Err(err) => { 70 tx.send(Err(err)).expect("tx responder channel closed"); 71 } 72 })?; 73 74 rx.recv().expect("rx responder channel closed") 75 } 76 spawn(handle: &Handle) -> io::Result<Responder>77 pub fn spawn(handle: &Handle) -> io::Result<Responder> { 78 let (responder, task) = Responder::with_handle(handle)?; 79 handle.spawn(task.map_err(|e| { 80 warn!("mdns error {:?}", e); 81 () 82 })); 83 Ok(responder) 84 } 85 with_handle(handle: &Handle) -> io::Result<(Responder, ResponderTask)>86 pub fn with_handle(handle: &Handle) -> io::Result<(Responder, ResponderTask)> { 87 let mut hostname = match hostname::get() { 88 Ok(s) => match s.into_string() { 89 Ok(s) => s, 90 Err(_) => { 91 return Err(std::io::Error::new( 92 std::io::ErrorKind::InvalidData, 93 "Hostname not valid unicode", 94 )) 95 } 96 }, 97 Err(err) => return Err(err), 98 }; 99 if !hostname.ends_with(".local") { 100 hostname.push_str(".local"); 101 } 102 103 let services = Arc::new(RwLock::new(ServicesInner::new(hostname))); 104 105 let v4 = FSM::<Inet>::new(handle, &services); 106 let v6 = FSM::<Inet6>::new(handle, &services); 107 108 let (task, commands): (ResponderTask, _) = match (v4, v6) { 109 (Ok((v4_task, v4_command)), Ok((v6_task, v6_command))) => { 110 let task = v4_task.join(v6_task).map(|((), ())| ()); 111 let task = Box::new(task); 112 let commands = vec![v4_command, v6_command]; 113 (task, commands) 114 } 115 116 (Ok((v4_task, v4_command)), Err(err)) => { 117 warn!("Failed to register IPv6 receiver: {:?}", err); 118 (Box::new(v4_task), vec![v4_command]) 119 } 120 121 (Err(err), _) => return Err(err), 122 }; 123 124 let commands = CommandSender(commands); 125 let responder = Responder { 126 services: services, 127 commands: RefCell::new(commands.clone()), 128 shutdown: Arc::new(Shutdown(commands)), 129 }; 130 131 Ok((responder, task)) 132 } 133 } 134 135 impl Responder { register(&self, svc_type: String, svc_name: String, port: u16, txt: &[&str]) -> Service136 pub fn register(&self, svc_type: String, svc_name: String, port: u16, txt: &[&str]) -> Service { 137 let txt = if txt.is_empty() { 138 vec![0] 139 } else { 140 txt.into_iter() 141 .flat_map(|entry| { 142 let entry = entry.as_bytes(); 143 if entry.len() > 255 { 144 panic!("{:?} is too long for a TXT record", entry); 145 } 146 std::iter::once(entry.len() as u8).chain(entry.into_iter().cloned()) 147 }) 148 .collect() 149 }; 150 151 let svc = ServiceData { 152 typ: Name::from_str(format!("{}.local", svc_type)).unwrap(), 153 name: Name::from_str(format!("{}.{}.local", svc_name, svc_type)).unwrap(), 154 port: port, 155 txt: txt, 156 }; 157 158 self.commands 159 .borrow_mut() 160 .send_unsolicited(svc.clone(), DEFAULT_TTL, true); 161 162 let id = self.services.write().unwrap().register(svc); 163 164 Service { 165 id: id, 166 commands: self.commands.borrow().clone(), 167 services: self.services.clone(), 168 _shutdown: self.shutdown.clone(), 169 } 170 } 171 } 172 173 impl Drop for Service { drop(&mut self)174 fn drop(&mut self) { 175 let svc = self.services.write().unwrap().unregister(self.id); 176 self.commands.send_unsolicited(svc, 0, false); 177 } 178 } 179 180 struct Shutdown(CommandSender); 181 182 impl Drop for Shutdown { drop(&mut self)183 fn drop(&mut self) { 184 self.0.send_shutdown(); 185 // TODO wait for tasks to shutdown 186 } 187 } 188 189 #[derive(Clone)] 190 struct CommandSender(Vec<mpsc::UnboundedSender<Command>>); 191 impl CommandSender { send(&mut self, cmd: Command)192 fn send(&mut self, cmd: Command) { 193 for tx in self.0.iter_mut() { 194 tx.unbounded_send(cmd.clone()).expect("responder died"); 195 } 196 } 197 send_unsolicited(&mut self, svc: ServiceData, ttl: u32, include_ip: bool)198 fn send_unsolicited(&mut self, svc: ServiceData, ttl: u32, include_ip: bool) { 199 self.send(Command::SendUnsolicited { 200 svc: svc, 201 ttl: ttl, 202 include_ip: include_ip, 203 }); 204 } 205 send_shutdown(&mut self)206 fn send_shutdown(&mut self) { 207 self.send(Command::Shutdown); 208 } 209 } 210