1 #[macro_use(quick_error)] extern crate quick_error;
2
3 #[macro_use] extern crate log;
4
5 extern crate byteorder;
6 extern crate futures;
7 extern crate libc;
8 extern crate multimap;
9 extern crate net2;
10 extern crate nix;
11 extern crate rand;
12 extern crate tokio_core as tokio;
13
14 use futures::Future;
15 use futures::sync::mpsc;
16 use std::io;
17 use std::thread;
18 use std::sync::{Arc, RwLock};
stream_flags_decode(lzma_stream_flags * options,const uint8_t * in)19 use std::cell::RefCell;
20 use tokio::reactor::{Handle, Core};
21
22 mod dns_parser;
23 use dns_parser::Name;
24
25 mod address_family;
26 mod fsm;
27 mod services;
28 #[cfg(windows)]
29 #[path = "netwin.rs"]
30 mod net;
31 #[cfg(not(windows))]
32 mod net;
lzma_stream_header_decode(lzma_stream_flags * options,const uint8_t * in)33
34 use address_family::{Inet, Inet6};
35 use services::{ServicesInner, Services, ServiceData};
36 use fsm::{Command, FSM};
37
38 const DEFAULT_TTL : u32 = 60;
39 const MDNS_PORT : u16 = 5353;
40
41 pub struct Responder {
42 services: Services,
43 commands: RefCell<CommandSender>,
44 shutdown: Arc<Shutdown>,
45 }
46
47 pub struct Service {
48 id: usize,
49 services: Services,
50 commands: CommandSender,
51 _shutdown: Arc<Shutdown>,
52 }
53
54 type ResponderTask = Box<Future<Item=(), Error=io::Error> + Send>;
55
56 impl Responder {
57 fn setup_core() -> io::Result<(Core, ResponderTask, Responder)> {
58 let core = Core::new()?;
59 let (responder, task) = Self::with_handle(&core.handle())?;
60 Ok((core, task, responder))
61 }
lzma_stream_footer_decode(lzma_stream_flags * options,const uint8_t * in)62
63 pub fn new() -> io::Result<Responder> {
64 let (tx, rx) = std::sync::mpsc::sync_channel(0);
65 thread::Builder::new()
66 .name("mdns-responder".to_owned())
67 .spawn(move || {
68 match Self::setup_core() {
69 Ok((mut core, task, responder)) => {
70 tx.send(Ok(responder)).expect("tx responder channel closed");
71 core.run(task).expect("mdns thread failed");
72 }
73 Err(err) => {
74 tx.send(Err(err)).expect("tx responder channel closed");
75 }
76 }
77 })?;
78
79 rx.recv().expect("rx responder channel closed")
80 }
81
82 pub fn spawn(handle: &Handle) -> io::Result<Responder> {
83 let (responder, task) = Responder::with_handle(handle)?;
84 handle.spawn(task.map_err(|e| {
85 warn!("mdns error {:?}", e);
86 ()
87 }));
88 Ok(responder)
89 }
90
91 pub fn with_handle(handle: &Handle) -> io::Result<(Responder, ResponderTask)> {
92 let mut hostname = try!(net::gethostname());
93 if !hostname.ends_with(".local") {
94 hostname.push_str(".local");
95 }
96
97 let services = Arc::new(RwLock::new(ServicesInner::new(hostname)));
98
99 let v4 = FSM::<Inet>::new(handle, &services);
100 let v6 = FSM::<Inet6>::new(handle, &services);
101
102 let (task, commands) : (ResponderTask, _) = match (v4, v6) {
103 (Ok((v4_task, v4_command)), Ok((v6_task, v6_command))) => {
104 let task = v4_task.join(v6_task).map(|((),())| ());
105 let task = Box::new(task);
106 let commands = vec![v4_command, v6_command];
107 (task, commands)
108 }
109
110 (Ok((v4_task, v4_command)), Err(err)) => {
111 warn!("Failed to register IPv6 receiver: {:?}", err);
112 (Box::new(v4_task), vec![v4_command])
113 }
114
115 (Err(err), _) => return Err(err),
116 };
117
118 let commands = CommandSender(commands);
119 let responder = Responder {
120 services: services,
121 commands: RefCell::new(commands.clone()),
122 shutdown: Arc::new(Shutdown(commands)),
123 };
124
125 Ok((responder, task))
126 }
127 }
128
129 impl Responder {
130 pub fn register(&self, svc_type: String, svc_name: String, port: u16, txt: &[&str]) -> Service {
131 let txt = if txt.is_empty() {
132 vec![0]
133 } else {
134 txt.into_iter().flat_map(|entry| {
135 let entry = entry.as_bytes();
136 if entry.len() > 255 {
137 panic!("{:?} is too long for a TXT record", entry);
138 }
139 std::iter::once(entry.len() as u8).chain(entry.into_iter().cloned())
140 }).collect()
141 };
142
143 let svc = ServiceData {
144 typ: Name::from_str(format!("{}.local", svc_type)).unwrap(),
145 name: Name::from_str(format!("{}.{}.local", svc_name, svc_type)).unwrap(),
146 port: port,
147 txt: txt,
148 };
149
150 self.commands.borrow_mut()
151 .send_unsolicited(svc.clone(), DEFAULT_TTL, true);
152
153 let id = self.services
154 .write().unwrap()
155 .register(svc);
156
157 Service {
158 id: id,
159 commands: self.commands.borrow().clone(),
160 services: self.services.clone(),
161 _shutdown: self.shutdown.clone(),
162 }
163 }
164
165 }
166
167 impl Drop for Service {
168 fn drop(&mut self) {
169 let svc = self.services
170 .write().unwrap()
171 .unregister(self.id);
172 self.commands.send_unsolicited(svc, 0, false);
173 }
174 }
175
176 struct Shutdown(CommandSender);
177 impl Drop for Shutdown {
178 fn drop(&mut self) {
179 self.0.send_shutdown();
180 // TODO wait for tasks to shutdown
181 }
182 }
183
184 #[derive(Clone)]
185 struct CommandSender(Vec<mpsc::UnboundedSender<Command>>);
186 impl CommandSender {
187 fn send(&mut self, cmd: Command) {
188 for tx in self.0.iter_mut() {
189 tx.unbounded_send(cmd.clone()).expect("responder died");
190 }
191 }
192
193 fn send_unsolicited(&mut self, svc: ServiceData, ttl: u32, include_ip: bool) {
194 self.send(Command::SendUnsolicited {
195 svc: svc,
196 ttl: ttl,
197 include_ip: include_ip,
198 });
199 }
200
201 fn send_shutdown(&mut self) {
202 self.send(Command::Shutdown);
203 }
204 }
205