1 #[macro_use(quick_error)] extern crate quick_error;
2 
3 #[macro_use] extern crate log;
4 
5 extern crate byteorder;
6 extern crate futures;
7 extern crate libc;
8 extern crate multimap;
9 extern crate net2;
10 extern crate nix;
11 extern crate rand;
12 extern crate tokio_core as tokio;
13 
14 use futures::Future;
15 use futures::sync::mpsc;
16 use std::io;
17 use std::thread;
18 use std::sync::{Arc, RwLock};
stream_flags_decode(lzma_stream_flags * options,const uint8_t * in)19 use std::cell::RefCell;
20 use tokio::reactor::{Handle, Core};
21 
22 mod dns_parser;
23 use dns_parser::Name;
24 
25 mod address_family;
26 mod fsm;
27 mod services;
28 #[cfg(windows)]
29 #[path = "netwin.rs"]
30 mod net;
31 #[cfg(not(windows))]
32 mod net;
lzma_stream_header_decode(lzma_stream_flags * options,const uint8_t * in)33 
34 use address_family::{Inet, Inet6};
35 use services::{ServicesInner, Services, ServiceData};
36 use fsm::{Command, FSM};
37 
38 const DEFAULT_TTL : u32 = 60;
39 const MDNS_PORT : u16 = 5353;
40 
41 pub struct Responder {
42     services: Services,
43     commands: RefCell<CommandSender>,
44     shutdown: Arc<Shutdown>,
45 }
46 
47 pub struct Service {
48     id: usize,
49     services: Services,
50     commands: CommandSender,
51     _shutdown: Arc<Shutdown>,
52 }
53 
54 type ResponderTask = Box<Future<Item=(), Error=io::Error> + Send>;
55 
56 impl Responder {
57     fn setup_core() -> io::Result<(Core, ResponderTask, Responder)> {
58         let core = Core::new()?;
59         let (responder, task) = Self::with_handle(&core.handle())?;
60         Ok((core, task, responder))
61     }
lzma_stream_footer_decode(lzma_stream_flags * options,const uint8_t * in)62 
63     pub fn new() -> io::Result<Responder> {
64         let (tx, rx) = std::sync::mpsc::sync_channel(0);
65         thread::Builder::new()
66             .name("mdns-responder".to_owned())
67             .spawn(move || {
68                 match Self::setup_core() {
69                     Ok((mut core, task, responder)) => {
70                         tx.send(Ok(responder)).expect("tx responder channel closed");
71                         core.run(task).expect("mdns thread failed");
72                     }
73                     Err(err) => {
74                         tx.send(Err(err)).expect("tx responder channel closed");
75                     }
76                 }
77             })?;
78 
79         rx.recv().expect("rx responder channel closed")
80     }
81 
82     pub fn spawn(handle: &Handle) -> io::Result<Responder> {
83         let (responder, task) = Responder::with_handle(handle)?;
84         handle.spawn(task.map_err(|e| {
85             warn!("mdns error {:?}", e);
86             ()
87         }));
88         Ok(responder)
89     }
90 
91     pub fn with_handle(handle: &Handle) -> io::Result<(Responder, ResponderTask)> {
92         let mut hostname = try!(net::gethostname());
93         if !hostname.ends_with(".local") {
94             hostname.push_str(".local");
95         }
96 
97         let services = Arc::new(RwLock::new(ServicesInner::new(hostname)));
98 
99         let v4 = FSM::<Inet>::new(handle, &services);
100         let v6 = FSM::<Inet6>::new(handle, &services);
101 
102         let (task, commands) : (ResponderTask, _) = match (v4, v6) {
103             (Ok((v4_task, v4_command)), Ok((v6_task, v6_command))) => {
104                 let task = v4_task.join(v6_task).map(|((),())| ());
105                 let task = Box::new(task);
106                 let commands = vec![v4_command, v6_command];
107                 (task, commands)
108             }
109 
110             (Ok((v4_task, v4_command)), Err(err)) => {
111                 warn!("Failed to register IPv6 receiver: {:?}", err);
112                 (Box::new(v4_task), vec![v4_command])
113             }
114 
115             (Err(err), _) => return Err(err),
116         };
117 
118         let commands = CommandSender(commands);
119         let responder = Responder {
120             services: services,
121             commands: RefCell::new(commands.clone()),
122             shutdown: Arc::new(Shutdown(commands)),
123         };
124 
125         Ok((responder, task))
126     }
127 }
128 
129 impl Responder {
130     pub fn register(&self, svc_type: String, svc_name: String, port: u16, txt: &[&str]) -> Service {
131         let txt = if txt.is_empty() {
132             vec![0]
133         } else {
134             txt.into_iter().flat_map(|entry| {
135                 let entry = entry.as_bytes();
136                 if entry.len() > 255 {
137                     panic!("{:?} is too long for a TXT record", entry);
138                 }
139                 std::iter::once(entry.len() as u8).chain(entry.into_iter().cloned())
140             }).collect()
141         };
142 
143         let svc = ServiceData {
144             typ: Name::from_str(format!("{}.local", svc_type)).unwrap(),
145             name: Name::from_str(format!("{}.{}.local", svc_name, svc_type)).unwrap(),
146             port: port,
147             txt: txt,
148         };
149 
150         self.commands.borrow_mut()
151             .send_unsolicited(svc.clone(), DEFAULT_TTL, true);
152 
153         let id = self.services
154             .write().unwrap()
155             .register(svc);
156 
157         Service {
158             id: id,
159             commands: self.commands.borrow().clone(),
160             services: self.services.clone(),
161             _shutdown: self.shutdown.clone(),
162         }
163     }
164 
165 }
166 
167 impl Drop for Service {
168     fn drop(&mut self) {
169         let svc = self.services
170             .write().unwrap()
171             .unregister(self.id);
172         self.commands.send_unsolicited(svc, 0, false);
173     }
174 }
175 
176 struct Shutdown(CommandSender);
177 impl Drop for Shutdown {
178     fn drop(&mut self) {
179         self.0.send_shutdown();
180         // TODO wait for tasks to shutdown
181     }
182 }
183 
184 #[derive(Clone)]
185 struct CommandSender(Vec<mpsc::UnboundedSender<Command>>);
186 impl CommandSender {
187     fn send(&mut self, cmd: Command) {
188         for tx in self.0.iter_mut() {
189             tx.unbounded_send(cmd.clone()).expect("responder died");
190         }
191     }
192 
193     fn send_unsolicited(&mut self, svc: ServiceData, ttl: u32, include_ip: bool) {
194         self.send(Command::SendUnsolicited {
195             svc: svc,
196             ttl: ttl,
197             include_ip: include_ip,
198         });
199     }
200 
201     fn send_shutdown(&mut self) {
202         self.send(Command::Shutdown);
203     }
204 }
205