1 #[macro_use(quick_error)]
2 extern crate quick_error;
3 
4 #[macro_use]
5 extern crate log;
6 
7 extern crate byteorder;
8 extern crate futures;
9 extern crate get_if_addrs;
10 extern crate hostname;
11 extern crate multimap;
12 extern crate net2;
13 extern crate rand;
14 extern crate tokio_core as tokio;
15 
16 use futures::sync::mpsc;
17 use futures::Future;
18 use std::cell::RefCell;
19 use std::io;
20 use std::sync::{Arc, RwLock};
21 use std::thread;
22 use tokio::reactor::{Core, Handle};
23 
24 mod dns_parser;
25 use dns_parser::Name;
26 
27 mod address_family;
28 mod fsm;
29 mod services;
30 
31 use address_family::{Inet, Inet6};
32 use fsm::{Command, FSM};
33 use services::{ServiceData, Services, ServicesInner};
34 
35 const DEFAULT_TTL: u32 = 60;
36 const MDNS_PORT: u16 = 5353;
37 
38 pub struct Responder {
39     services: Services,
40     commands: RefCell<CommandSender>,
41     shutdown: Arc<Shutdown>,
42 }
43 
44 pub struct Service {
45     id: usize,
46     services: Services,
47     commands: CommandSender,
48     _shutdown: Arc<Shutdown>,
49 }
50 
51 type ResponderTask = Box<Future<Item = (), Error = io::Error> + Send>;
52 
53 impl Responder {
setup_core() -> io::Result<(Core, ResponderTask, Responder)>54     fn setup_core() -> io::Result<(Core, ResponderTask, Responder)> {
55         let core = Core::new()?;
56         let (responder, task) = Self::with_handle(&core.handle())?;
57         Ok((core, task, responder))
58     }
59 
new() -> io::Result<Responder>60     pub fn new() -> io::Result<Responder> {
61         let (tx, rx) = std::sync::mpsc::sync_channel(0);
62         thread::Builder::new()
63             .name("mdns-responder".to_owned())
64             .spawn(move || match Self::setup_core() {
65                 Ok((mut core, task, responder)) => {
66                     tx.send(Ok(responder)).expect("tx responder channel closed");
67                     core.run(task).expect("mdns thread failed");
68                 }
69                 Err(err) => {
70                     tx.send(Err(err)).expect("tx responder channel closed");
71                 }
72             })?;
73 
74         rx.recv().expect("rx responder channel closed")
75     }
76 
spawn(handle: &Handle) -> io::Result<Responder>77     pub fn spawn(handle: &Handle) -> io::Result<Responder> {
78         let (responder, task) = Responder::with_handle(handle)?;
79         handle.spawn(task.map_err(|e| {
80             warn!("mdns error {:?}", e);
81             ()
82         }));
83         Ok(responder)
84     }
85 
with_handle(handle: &Handle) -> io::Result<(Responder, ResponderTask)>86     pub fn with_handle(handle: &Handle) -> io::Result<(Responder, ResponderTask)> {
87         let mut hostname = match hostname::get() {
88             Ok(s) => match s.into_string() {
89                 Ok(s) => s,
90                 Err(_) => {
91                     return Err(std::io::Error::new(
92                         std::io::ErrorKind::InvalidData,
93                         "Hostname not valid unicode",
94                     ))
95                 }
96             },
97             Err(err) => return Err(err),
98         };
99         if !hostname.ends_with(".local") {
100             hostname.push_str(".local");
101         }
102 
103         let services = Arc::new(RwLock::new(ServicesInner::new(hostname)));
104 
105         let v4 = FSM::<Inet>::new(handle, &services);
106         let v6 = FSM::<Inet6>::new(handle, &services);
107 
108         let (task, commands): (ResponderTask, _) = match (v4, v6) {
109             (Ok((v4_task, v4_command)), Ok((v6_task, v6_command))) => {
110                 let task = v4_task.join(v6_task).map(|((), ())| ());
111                 let task = Box::new(task);
112                 let commands = vec![v4_command, v6_command];
113                 (task, commands)
114             }
115 
116             (Ok((v4_task, v4_command)), Err(err)) => {
117                 warn!("Failed to register IPv6 receiver: {:?}", err);
118                 (Box::new(v4_task), vec![v4_command])
119             }
120 
121             (Err(err), _) => return Err(err),
122         };
123 
124         let commands = CommandSender(commands);
125         let responder = Responder {
126             services: services,
127             commands: RefCell::new(commands.clone()),
128             shutdown: Arc::new(Shutdown(commands)),
129         };
130 
131         Ok((responder, task))
132     }
133 }
134 
135 impl Responder {
register(&self, svc_type: String, svc_name: String, port: u16, txt: &[&str]) -> Service136     pub fn register(&self, svc_type: String, svc_name: String, port: u16, txt: &[&str]) -> Service {
137         let txt = if txt.is_empty() {
138             vec![0]
139         } else {
140             txt.into_iter()
141                 .flat_map(|entry| {
142                     let entry = entry.as_bytes();
143                     if entry.len() > 255 {
144                         panic!("{:?} is too long for a TXT record", entry);
145                     }
146                     std::iter::once(entry.len() as u8).chain(entry.into_iter().cloned())
147                 })
148                 .collect()
149         };
150 
151         let svc = ServiceData {
152             typ: Name::from_str(format!("{}.local", svc_type)).unwrap(),
153             name: Name::from_str(format!("{}.{}.local", svc_name, svc_type)).unwrap(),
154             port: port,
155             txt: txt,
156         };
157 
158         self.commands
159             .borrow_mut()
160             .send_unsolicited(svc.clone(), DEFAULT_TTL, true);
161 
162         let id = self.services.write().unwrap().register(svc);
163 
164         Service {
165             id: id,
166             commands: self.commands.borrow().clone(),
167             services: self.services.clone(),
168             _shutdown: self.shutdown.clone(),
169         }
170     }
171 }
172 
173 impl Drop for Service {
drop(&mut self)174     fn drop(&mut self) {
175         let svc = self.services.write().unwrap().unregister(self.id);
176         self.commands.send_unsolicited(svc, 0, false);
177     }
178 }
179 
180 struct Shutdown(CommandSender);
181 
182 impl Drop for Shutdown {
drop(&mut self)183     fn drop(&mut self) {
184         self.0.send_shutdown();
185         // TODO wait for tasks to shutdown
186     }
187 }
188 
189 #[derive(Clone)]
190 struct CommandSender(Vec<mpsc::UnboundedSender<Command>>);
191 impl CommandSender {
send(&mut self, cmd: Command)192     fn send(&mut self, cmd: Command) {
193         for tx in self.0.iter_mut() {
194             tx.unbounded_send(cmd.clone()).expect("responder died");
195         }
196     }
197 
send_unsolicited(&mut self, svc: ServiceData, ttl: u32, include_ip: bool)198     fn send_unsolicited(&mut self, svc: ServiceData, ttl: u32, include_ip: bool) {
199         self.send(Command::SendUnsolicited {
200             svc: svc,
201             ttl: ttl,
202             include_ip: include_ip,
203         });
204     }
205 
send_shutdown(&mut self)206     fn send_shutdown(&mut self) {
207         self.send(Command::Shutdown);
208     }
209 }
210