1 use super::{Error, ffi, to_c_str, c_str_to_slice, Watch, Message, MessageType, BusName, Path, ConnPath};
2 use super::{RequestNameReply, ReleaseNameReply, BusType};
3 use super::watch::WatchList;
4 use std::{fmt, mem, ptr, thread, panic, ops};
5 use std::collections::VecDeque;
6 use std::cell::{Cell, RefCell};
7 use std::os::unix::io::RawFd;
8 use std::os::raw::{c_void, c_char, c_int, c_uint};
9 
10 /// The type of function to use for replacing the message callback.
11 ///
12 /// See the documentation for Connection::replace_message_callback for more information.
13 pub type MessageCallback = Box<FnMut(&Connection, Message) -> bool + 'static>;
14 
15 #[repr(C)]
16 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
17 /// Flags to use for Connection::register_name.
18 ///
19 /// More than one flag can be specified, if so just add their values.
20 pub enum DBusNameFlag {
21     /// Allow another service to become the primary owner if requested
22     AllowReplacement = ffi::DBUS_NAME_FLAG_ALLOW_REPLACEMENT as isize,
23     /// Request to replace the current primary owner
24     ReplaceExisting = ffi::DBUS_NAME_FLAG_REPLACE_EXISTING as isize,
25     /// If we can not become the primary owner do not place us in the queue
26     DoNotQueue = ffi::DBUS_NAME_FLAG_DO_NOT_QUEUE as isize,
27 }
28 
29 impl DBusNameFlag {
30     /// u32 value of flag.
value(self) -> u3231     pub fn value(self) -> u32 { self as u32 }
32 }
33 
34 /// When listening for incoming events on the D-Bus, this enum will tell you what type
35 /// of incoming event has happened.
36 #[derive(Debug)]
37 pub enum ConnectionItem {
38     /// No event between now and timeout
39     Nothing,
40     /// Incoming method call
41     MethodCall(Message),
42     /// Incoming signal
43     Signal(Message),
44     /// Incoming method return, including method return errors (mostly used for Async I/O)
45     MethodReturn(Message),
46 }
47 
48 impl From<Message> for ConnectionItem {
from(m: Message) -> Self49     fn from(m: Message) -> Self {
50         let mtype = m.msg_type();
51         match mtype {
52             MessageType::Signal => ConnectionItem::Signal(m),
53             MessageType::MethodReturn => ConnectionItem::MethodReturn(m),
54             MessageType::Error => ConnectionItem::MethodReturn(m),
55             MessageType::MethodCall => ConnectionItem::MethodCall(m),
56             _ => panic!("unknown message type {:?} received from D-Bus", mtype),
57         }
58     }
59 }
60 
61 
62 /// ConnectionItem iterator
63 pub struct ConnectionItems<'a> {
64     c: &'a Connection,
65     timeout_ms: Option<i32>,
66     end_on_timeout: bool,
67     handlers: MsgHandlerList,
68 }
69 
70 impl<'a> ConnectionItems<'a> {
71     /// Builder method that adds a new msg handler.
72     ///
73     /// Note: Likely to changed/refactored/removed in next release
with<H: 'static + MsgHandler>(mut self, h: H) -> Self74     pub fn with<H: 'static + MsgHandler>(mut self, h: H) -> Self {
75         self.handlers.push(Box::new(h)); self
76     }
77 
78     // Returns true if processed, false if not
process_handlers(&mut self, ci: &ConnectionItem) -> bool79     fn process_handlers(&mut self, ci: &ConnectionItem) -> bool {
80         let m = match *ci {
81             ConnectionItem::MethodReturn(ref msg) => msg,
82             ConnectionItem::Signal(ref msg) => msg,
83             ConnectionItem::MethodCall(ref msg) => msg,
84             ConnectionItem::Nothing => return false,
85         };
86 
87         msghandler_process(&mut self.handlers, m, &self.c)
88     }
89 
90     /// Access and modify message handlers
91     ///
92     /// Note: Likely to changed/refactored/removed in next release
msg_handlers(&mut self) -> &mut Vec<Box<MsgHandler>>93     pub fn msg_handlers(&mut self) -> &mut Vec<Box<MsgHandler>> { &mut self.handlers }
94 
95     /// Creates a new ConnectionItems iterator
96     ///
97     /// For io_timeout, setting None means the fds will not be read/written. I e, only pending
98     /// items in libdbus's internal queue will be processed.
99     ///
100     /// For end_on_timeout, setting false will means that the iterator will never finish (unless
101     /// the D-Bus server goes down). Instead, ConnectionItem::Nothing will be returned in case no
102     /// items are in queue.
new(conn: &'a Connection, io_timeout: Option<i32>, end_on_timeout: bool) -> Self103     pub fn new(conn: &'a Connection, io_timeout: Option<i32>, end_on_timeout: bool) -> Self {
104         ConnectionItems {
105             c: conn,
106             timeout_ms: io_timeout,
107             end_on_timeout: end_on_timeout,
108             handlers: Vec::new(),
109         }
110     }
111 }
112 
113 impl<'a> Iterator for ConnectionItems<'a> {
114     type Item = ConnectionItem;
next(&mut self) -> Option<ConnectionItem>115     fn next(&mut self) -> Option<ConnectionItem> {
116         loop {
117             if self.c.i.filter_cb.borrow().is_none() { panic!("ConnectionItems::next called recursively or with a MessageCallback set to None"); }
118             let i: Option<ConnectionItem> = self.c.next_msg().map(|x| x.into());
119             if let Some(ci) = i {
120                 if !self.process_handlers(&ci) { return Some(ci); }
121             }
122 
123             if let Some(t) = self.timeout_ms {
124 		let r = unsafe { ffi::dbus_connection_read_write_dispatch(self.c.conn(), t as c_int) };
125 		self.c.check_panic();
126 		if !self.c.i.pending_items.borrow().is_empty() { continue };
127 		if r == 0 { return None; }
128             }
129 
130             let r = unsafe { ffi::dbus_connection_dispatch(self.c.conn()) };
131             self.c.check_panic();
132 
133             if !self.c.i.pending_items.borrow().is_empty() { continue };
134             if r == ffi::DBusDispatchStatus::DataRemains { continue };
135             if r == ffi::DBusDispatchStatus::Complete { return if self.end_on_timeout { None } else { Some(ConnectionItem::Nothing) } };
136             panic!("dbus_connection_dispatch failed");
137         }
138     }
139 }
140 
141 /// Iterator over incoming messages on a connection.
142 #[derive(Debug, Clone)]
143 pub struct ConnMsgs<C> {
144     /// The connection or some reference to it.
145     pub conn: C,
146     /// How many ms dbus should block, waiting for incoming messages until timing out.
147     ///
148     /// If set to None, the dbus library will not read/write from file descriptors at all.
149     /// Instead the iterator will end when there's nothing currently in the queue.
150     pub timeout_ms: Option<u32>,
151 }
152 
153 impl<C: ops::Deref<Target = Connection>> Iterator for ConnMsgs<C> {
154     type Item = Message;
next(&mut self) -> Option<Self::Item>155     fn next(&mut self) -> Option<Self::Item> {
156 
157         loop {
158             let iconn = &self.conn.i;
159             if iconn.filter_cb.borrow().is_none() { panic!("ConnMsgs::next called recursively or with a MessageCallback set to None"); }
160             let i = self.conn.next_msg();
161             if let Some(ci) = i { return Some(ci); }
162 
163             if let Some(t) = self.timeout_ms {
164 		let r = unsafe { ffi::dbus_connection_read_write_dispatch(self.conn.conn(), t as c_int) };
165 		self.conn.check_panic();
166 		if !iconn.pending_items.borrow().is_empty() { continue };
167 		if r == 0 { return None; }
168             }
169 
170             let r = unsafe { ffi::dbus_connection_dispatch(self.conn.conn()) };
171             self.conn.check_panic();
172 
173             if !iconn.pending_items.borrow().is_empty() { continue };
174             if r == ffi::DBusDispatchStatus::DataRemains { continue };
175             if r == ffi::DBusDispatchStatus::Complete { return None }
176             panic!("dbus_connection_dispatch failed");
177         }
178     }
179 }
180 
181 /* Since we register callbacks with userdata pointers,
182    we need to make sure the connection pointer does not move around.
183    Hence this extra indirection. */
184 struct IConnection {
185     conn: Cell<*mut ffi::DBusConnection>,
186     pending_items: RefCell<VecDeque<Message>>,
187     watches: Option<Box<WatchList>>,
188     handlers: RefCell<MsgHandlerList>,
189 
190     filter_cb: RefCell<Option<MessageCallback>>,
191     filter_cb_panic: RefCell<thread::Result<()>>,
192 }
193 
194 /// A D-Bus connection. Start here if you want to get on the D-Bus!
195 pub struct Connection {
196     i: Box<IConnection>,
197 }
198 
conn_handle(c: &Connection) -> *mut ffi::DBusConnection199 pub fn conn_handle(c: &Connection) -> *mut ffi::DBusConnection {
200     c.i.conn.get()
201 }
202 
filter_message_cb(conn: *mut ffi::DBusConnection, msg: *mut ffi::DBusMessage, user_data: *mut c_void) -> ffi::DBusHandlerResult203 extern "C" fn filter_message_cb(conn: *mut ffi::DBusConnection, msg: *mut ffi::DBusMessage,
204     user_data: *mut c_void) -> ffi::DBusHandlerResult {
205 
206     let i: &IConnection = unsafe { mem::transmute(user_data) };
207     let connref: panic::AssertUnwindSafe<&Connection> = unsafe { mem::transmute(&i) };
208     if i.conn.get() != conn || i.filter_cb_panic.try_borrow().is_err() {
209         // This should never happen, but let's be extra sure
210         // process::abort(); ??
211         return ffi::DBusHandlerResult::Handled;
212     }
213     if i.filter_cb_panic.borrow().is_err() {
214         // We're in panic mode. Let's quit this ASAP
215         return ffi::DBusHandlerResult::Handled;
216     }
217 
218     let fcb = panic::AssertUnwindSafe(&i.filter_cb);
219     let r = panic::catch_unwind(|| {
220         let m = Message::from_ptr(msg, true);
221         let mut cb = fcb.borrow_mut().take().unwrap(); // Take the callback out while we call it.
222         let r = cb(connref.0, m);
223         let mut cb2 = fcb.borrow_mut(); // If the filter callback has not been replaced, put it back in.
224         if cb2.is_none() { *cb2 = Some(cb) };
225         r
226     });
227 
228     match r {
229         Ok(false) => ffi::DBusHandlerResult::NotYetHandled,
230         Ok(true) => ffi::DBusHandlerResult::Handled,
231         Err(e) => {
232             *i.filter_cb_panic.borrow_mut() = Err(e);
233             ffi::DBusHandlerResult::Handled
234         }
235     }
236 }
237 
default_filter_callback(c: &Connection, m: Message) -> bool238 fn default_filter_callback(c: &Connection, m: Message) -> bool {
239     let b = m.msg_type() == MessageType::Signal;
240     c.i.pending_items.borrow_mut().push_back(m);
241     b
242 }
243 
object_path_message_cb(_conn: *mut ffi::DBusConnection, _msg: *mut ffi::DBusMessage, _user_data: *mut c_void) -> ffi::DBusHandlerResult244 extern "C" fn object_path_message_cb(_conn: *mut ffi::DBusConnection, _msg: *mut ffi::DBusMessage,
245     _user_data: *mut c_void) -> ffi::DBusHandlerResult {
246     /* Already pushed in filter_message_cb, so we just set the handled flag here to disable the
247        "default" handler. */
248     ffi::DBusHandlerResult::Handled
249 }
250 
251 impl Connection {
252     #[inline(always)]
conn(&self) -> *mut ffi::DBusConnection253     fn conn(&self) -> *mut ffi::DBusConnection {
254         self.i.conn.get()
255     }
256 
conn_from_ptr(conn: *mut ffi::DBusConnection) -> Result<Connection, Error>257     fn conn_from_ptr(conn: *mut ffi::DBusConnection) -> Result<Connection, Error> {
258         let mut c = Connection { i: Box::new(IConnection {
259             conn: Cell::new(conn),
260             pending_items: RefCell::new(VecDeque::new()),
261             watches: None,
262             handlers: RefCell::new(vec!()),
263             filter_cb: RefCell::new(Some(Box::new(default_filter_callback))),
264             filter_cb_panic: RefCell::new(Ok(())),
265         })};
266 
267         /* No, we don't want our app to suddenly quit if dbus goes down */
268         unsafe { ffi::dbus_connection_set_exit_on_disconnect(conn, 0) };
269         assert!(unsafe {
270             ffi::dbus_connection_add_filter(c.conn(), Some(filter_message_cb), mem::transmute(&*c.i), None)
271         } != 0);
272 
273         c.i.watches = Some(WatchList::new(&c, Box::new(|_| {})));
274         Ok(c)
275     }
276 
277     /// Creates a new D-Bus connection.
get_private(bus: BusType) -> Result<Connection, Error>278     pub fn get_private(bus: BusType) -> Result<Connection, Error> {
279         let mut e = Error::empty();
280         let conn = unsafe { ffi::dbus_bus_get_private(bus, e.get_mut()) };
281         if conn == ptr::null_mut() {
282             return Err(e)
283         }
284         Self::conn_from_ptr(conn)
285     }
286 
287     /// Creates a new D-Bus connection to a remote address.
288     ///
289     /// Note: for all common cases (System / Session bus) you probably want "get_private" instead.
open_private(address: &str) -> Result<Connection, Error>290     pub fn open_private(address: &str) -> Result<Connection, Error> {
291         let mut e = Error::empty();
292         let conn = unsafe { ffi::dbus_connection_open_private(to_c_str(address).as_ptr(), e.get_mut()) };
293         if conn == ptr::null_mut() {
294             return Err(e)
295         }
296         Self::conn_from_ptr(conn)
297     }
298 
299     /// Registers a new D-Bus connection with the bus.
300     ///
301     /// Note: `get_private` does this automatically, useful with `open_private`
register(&self) -> Result<(), Error>302     pub fn register(&self) -> Result<(), Error> {
303         let mut e = Error::empty();
304         if unsafe { ffi::dbus_bus_register(self.conn(), e.get_mut()) == 0 } {
305             Err(e)
306         } else {
307             Ok(())
308         }
309     }
310 
311     /// Gets whether the connection is currently open.
is_connected(&self) -> bool312     pub fn is_connected(&self) -> bool {
313         unsafe { ffi::dbus_connection_get_is_connected(self.conn()) != 0 }
314     }
315 
316     /// Sends a message over the D-Bus and waits for a reply.
317     /// This is usually used for method calls.
send_with_reply_and_block(&self, msg: Message, timeout_ms: i32) -> Result<Message, Error>318     pub fn send_with_reply_and_block(&self, msg: Message, timeout_ms: i32) -> Result<Message, Error> {
319         let mut e = Error::empty();
320         let response = unsafe {
321             ffi::dbus_connection_send_with_reply_and_block(self.conn(), msg.ptr(),
322                 timeout_ms as c_int, e.get_mut())
323         };
324         if response == ptr::null_mut() {
325             return Err(e);
326         }
327         Ok(Message::from_ptr(response, false))
328     }
329 
330     /// Sends a message over the D-Bus without waiting. Useful for sending signals and method call replies.
send(&self, msg: Message) -> Result<u32,()>331     pub fn send(&self, msg: Message) -> Result<u32,()> {
332         let mut serial = 0u32;
333         let r = unsafe { ffi::dbus_connection_send(self.conn(), msg.ptr(), &mut serial) };
334         if r == 0 { return Err(()); }
335         unsafe { ffi::dbus_connection_flush(self.conn()) };
336         Ok(serial)
337     }
338 
339     /// Sends a message over the D-Bus, returning a MessageReply.
340     ///
341     /// Call add_handler on the result to start waiting for reply. This should be done before next call to `incoming` or `iter`.
send_with_reply<'a, F: FnOnce(Result<&Message, Error>) + 'a>(&self, msg: Message, f: F) -> Result<MessageReply<F>, ()>342     pub fn send_with_reply<'a, F: FnOnce(Result<&Message, Error>) + 'a>(&self, msg: Message, f: F) -> Result<MessageReply<F>, ()> {
343         let serial = self.send(msg)?;
344         Ok(MessageReply(Some(f), serial))
345     }
346 
347     /// Adds a message handler to the connection.
348     ///
349     /// # Example
350     ///
351     /// ```
352     /// use std::{cell, rc};
353     /// use dbus::{Connection, Message, BusType};
354     ///
355     /// let c = Connection::get_private(BusType::Session).unwrap();
356     /// let m = Message::new_method_call("org.freedesktop.DBus", "/", "org.freedesktop.DBus", "ListNames").unwrap();
357     ///
358     /// let done: rc::Rc<cell::Cell<bool>> = Default::default();
359     /// let done2 = done.clone();
360     /// c.add_handler(c.send_with_reply(m, move |reply| {
361     ///     let v: Vec<&str> = reply.unwrap().read1().unwrap();
362     ///     println!("The names on the D-Bus are: {:?}", v);
363     ///     done2.set(true);
364     /// }).unwrap());
365     /// while !done.get() { c.incoming(100).next(); }
366     /// ```
add_handler<H: MsgHandler + 'static>(&self, h: H)367     pub fn add_handler<H: MsgHandler + 'static>(&self, h: H) {
368         let h = Box::new(h);
369         self.i.handlers.borrow_mut().push(h);
370     }
371 
372     /// Removes a MsgHandler from the connection.
373     ///
374     /// If there are many MsgHandlers, it is not specified which one will be returned.
375     ///
376     /// There might be more methods added later on, which give better ways to deal
377     /// with the list of MsgHandler currently on the connection. If this would help you,
378     /// please [file an issue](https://github.com/diwic/dbus-rs/issues).
extract_handler(&self) -> Option<Box<MsgHandler>>379     pub fn extract_handler(&self) -> Option<Box<MsgHandler>> {
380         self.i.handlers.borrow_mut().pop()
381     }
382 
383     /// Get the connection's unique name.
unique_name(&self) -> String384     pub fn unique_name(&self) -> String {
385         let c = unsafe { ffi::dbus_bus_get_unique_name(self.conn()) };
386         c_str_to_slice(&c).unwrap_or("").to_string()
387     }
388 
389     /// Check if there are new incoming events
390     ///
391     /// If there are no incoming events, ConnectionItems::Nothing will be returned.
392     /// See ConnectionItems::new if you want to customize this behaviour.
iter(&self, timeout_ms: i32) -> ConnectionItems393     pub fn iter(&self, timeout_ms: i32) -> ConnectionItems {
394         ConnectionItems::new(self, Some(timeout_ms), false)
395     }
396 
397     /// Check if there are new incoming events
398     ///
399     /// Supersedes "iter".
incoming(&self, timeout_ms: u32) -> ConnMsgs<&Self>400     pub fn incoming(&self, timeout_ms: u32) -> ConnMsgs<&Self> {
401         ConnMsgs { conn: &self, timeout_ms: Some(timeout_ms) }
402     }
403 
404     /// Register an object path.
register_object_path(&self, path: &str) -> Result<(), Error>405     pub fn register_object_path(&self, path: &str) -> Result<(), Error> {
406         let mut e = Error::empty();
407         let p = to_c_str(path);
408         let vtable = ffi::DBusObjectPathVTable {
409             unregister_function: None,
410             message_function: Some(object_path_message_cb),
411             dbus_internal_pad1: None,
412             dbus_internal_pad2: None,
413             dbus_internal_pad3: None,
414             dbus_internal_pad4: None,
415         };
416         let r = unsafe {
417             let user_data: *mut c_void = mem::transmute(&*self.i);
418             ffi::dbus_connection_try_register_object_path(self.conn(), p.as_ptr(), &vtable, user_data, e.get_mut())
419         };
420         if r == 0 { Err(e) } else { Ok(()) }
421     }
422 
423     /// Unregister an object path.
unregister_object_path(&self, path: &str)424     pub fn unregister_object_path(&self, path: &str) {
425         let p = to_c_str(path);
426         let r = unsafe { ffi::dbus_connection_unregister_object_path(self.conn(), p.as_ptr()) };
427         if r == 0 { panic!("Out of memory"); }
428     }
429 
430     /// List registered object paths.
list_registered_object_paths(&self, path: &str) -> Vec<String>431     pub fn list_registered_object_paths(&self, path: &str) -> Vec<String> {
432         let p = to_c_str(path);
433         let mut clist: *mut *mut c_char = ptr::null_mut();
434         let r = unsafe { ffi::dbus_connection_list_registered(self.conn(), p.as_ptr(), &mut clist) };
435         if r == 0 { panic!("Out of memory"); }
436         let mut v = Vec::new();
437         let mut i = 0;
438         loop {
439             let s = unsafe {
440                 let citer = clist.offset(i);
441                 if *citer == ptr::null_mut() { break };
442                 mem::transmute(citer)
443             };
444             v.push(format!("{}", c_str_to_slice(s).unwrap()));
445             i += 1;
446         }
447         unsafe { ffi::dbus_free_string_array(clist) };
448         v
449     }
450 
451     /// Register a name.
register_name(&self, name: &str, flags: u32) -> Result<RequestNameReply, Error>452     pub fn register_name(&self, name: &str, flags: u32) -> Result<RequestNameReply, Error> {
453         let mut e = Error::empty();
454         let n = to_c_str(name);
455         let r = unsafe { ffi::dbus_bus_request_name(self.conn(), n.as_ptr(), flags, e.get_mut()) };
456         if r == -1 { Err(e) } else { Ok(unsafe { mem::transmute(r) }) }
457     }
458 
459     /// Release a name.
release_name(&self, name: &str) -> Result<ReleaseNameReply, Error>460     pub fn release_name(&self, name: &str) -> Result<ReleaseNameReply, Error> {
461         let mut e = Error::empty();
462         let n = to_c_str(name);
463         let r = unsafe { ffi::dbus_bus_release_name(self.conn(), n.as_ptr(), e.get_mut()) };
464         if r == -1 { Err(e) } else { Ok(unsafe { mem::transmute(r) }) }
465     }
466 
467     /// Add a match rule to match messages on the message bus.
468     ///
469     /// See the `unity_focused_window` example for how to use this to catch signals.
470     /// (The syntax of the "rule" string is specified in the [D-Bus specification](https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-routing-match-rules).)
add_match(&self, rule: &str) -> Result<(), Error>471     pub fn add_match(&self, rule: &str) -> Result<(), Error> {
472         let mut e = Error::empty();
473         let n = to_c_str(rule);
474         unsafe { ffi::dbus_bus_add_match(self.conn(), n.as_ptr(), e.get_mut()) };
475         if e.name().is_some() { Err(e) } else { Ok(()) }
476     }
477 
478     /// Remove a match rule to match messages on the message bus.
remove_match(&self, rule: &str) -> Result<(), Error>479     pub fn remove_match(&self, rule: &str) -> Result<(), Error> {
480         let mut e = Error::empty();
481         let n = to_c_str(rule);
482         unsafe { ffi::dbus_bus_remove_match(self.conn(), n.as_ptr(), e.get_mut()) };
483         if e.name().is_some() { Err(e) } else { Ok(()) }
484     }
485 
486     /// Async I/O: Get an up-to-date list of file descriptors to watch.
487     ///
488     /// See the `Watch` struct for an example.
watch_fds(&self) -> Vec<Watch>489     pub fn watch_fds(&self) -> Vec<Watch> {
490         self.i.watches.as_ref().unwrap().get_enabled_fds()
491     }
492 
493     /// Async I/O: Call this function whenever you detected an event on the Fd,
494     /// Flags are a set of WatchEvent bits.
495     /// The returned iterator will return pending items only, never block for new events.
496     ///
497     /// See the `Watch` struct for an example.
watch_handle(&self, fd: RawFd, flags: c_uint) -> ConnectionItems498     pub fn watch_handle(&self, fd: RawFd, flags: c_uint) -> ConnectionItems {
499         self.i.watches.as_ref().unwrap().watch_handle(fd, flags);
500         ConnectionItems::new(self, None, true)
501     }
502 
503 
504     /// Create a convenience struct for easier calling of many methods on the same destination and path.
with_path<'a, D: Into<BusName<'a>>, P: Into<Path<'a>>>(&'a self, dest: D, path: P, timeout_ms: i32) -> ConnPath<'a, &'a Connection>505     pub fn with_path<'a, D: Into<BusName<'a>>, P: Into<Path<'a>>>(&'a self, dest: D, path: P, timeout_ms: i32) ->
506         ConnPath<'a, &'a Connection> {
507         ConnPath { conn: self, dest: dest.into(), path: path.into(), timeout: timeout_ms }
508     }
509 
510     /// Replace the default message callback. Returns the previously set callback.
511     ///
512     /// By default, when you call ConnectionItems::next, all relevant incoming messages
513     /// are returned through the ConnectionItems iterator, and
514     /// irrelevant messages are passed on to libdbus's default handler.
515     /// If you need to customize this behaviour (i e, to handle all incoming messages yourself),
516     /// you can set this message callback yourself. A few caveats apply:
517     ///
518     /// Return true from the callback to disable libdbus's internal handling of the message, or
519     /// false to allow it. In other words, true and false correspond to
520     /// `DBUS_HANDLER_RESULT_HANDLED` and `DBUS_HANDLER_RESULT_NOT_YET_HANDLED` respectively.
521     ///
522     /// Be sure to call the previously set callback from inside your callback,
523     /// if you want, e.g. ConnectionItems::next to yield the message.
524     ///
525     /// You can unset the message callback (might be useful to satisfy the borrow checker), but
526     /// you will get a panic if you call ConnectionItems::next while the message callback is unset.
527     /// The message callback will be temporary unset while inside a message callback, so calling
528     /// ConnectionItems::next recursively will also result in a panic.
529     ///
530     /// If your message callback panics, ConnectionItems::next will panic, too.
531     ///
532     /// # Examples
533     ///
534     /// Replace the default callback with our own:
535     ///
536     /// ```ignore
537     /// use dbus::{Connection, BusType};
538     /// let c = Connection::get_private(BusType::Session).unwrap();
539     /// // Set our callback
540     /// c.replace_message_callback(Some(Box::new(move |conn, msg| {
541     ///     println!("Got message: {:?}", msg.get_items());
542     ///     // Let libdbus handle some things by default,
543     ///     // like "nonexistent object" error replies to method calls
544     ///     false
545     /// })));
546     ///
547     /// for _ in c.iter(1000) {
548     ///    // Only `ConnectionItem::Nothing` would be ever yielded here.
549     /// }
550     /// ```
551     ///
552     /// Chain our callback to filter out some messages before `iter().next()`:
553     ///
554     /// ```
555     /// use dbus::{Connection, BusType, MessageType};
556     /// let c = Connection::get_private(BusType::Session).unwrap();
557     /// // Take the previously set callback
558     /// let mut old_cb = c.replace_message_callback(None).unwrap();
559     /// // Set our callback
560     /// c.replace_message_callback(Some(Box::new(move |conn, msg| {
561     ///     // Handle all signals on the spot
562     ///     if msg.msg_type() == MessageType::Signal {
563     ///         println!("Got signal: {:?}", msg.get_items());
564     ///         // Stop all further processing of the message
565     ///         return true;
566     ///     }
567     ///     // Delegate the rest of the messages to the previous callback
568     ///     // in chain, e.g. to have them yielded by `iter().next()`
569     ///     old_cb(conn, msg)
570     /// })));
571     ///
572     /// # if false {
573     /// for _ in c.iter(1000) {
574     ///    // `ConnectionItem::Signal` would never be yielded here.
575     /// }
576     /// # }
577     /// ```
replace_message_callback(&self, f: Option<MessageCallback>) -> Option<MessageCallback>578     pub fn replace_message_callback(&self, f: Option<MessageCallback>) -> Option<MessageCallback> {
579         mem::replace(&mut *self.i.filter_cb.borrow_mut(), f)
580     }
581 
582     /// Sets a callback to be called if a file descriptor status changes.
583     ///
584     /// For async I/O. In rare cases, the number of fds to poll for read/write can change.
585     /// If this ever happens, you'll get a callback. The watch changed is provided as a parameter.
586     ///
587     /// In rare cases this might not even happen in the thread calling anything on the connection,
588     /// so the callback needs to be `Send`.
589     /// A mutex is held during the callback. If you try to call set_watch_callback from a callback,
590     /// you will deadlock.
591     ///
592     /// (Previously, this was instead put in a ConnectionItem queue, but this was not working correctly.
593     /// see https://github.com/diwic/dbus-rs/issues/99 for additional info.)
set_watch_callback(&self, f: Box<Fn(Watch) + Send>)594     pub fn set_watch_callback(&self, f: Box<Fn(Watch) + Send>) { self.i.watches.as_ref().unwrap().set_on_update(f); }
595 
check_panic(&self)596     fn check_panic(&self) {
597         let p = mem::replace(&mut *self.i.filter_cb_panic.borrow_mut(), Ok(()));
598         if let Err(perr) = p { panic::resume_unwind(perr); }
599     }
600 
next_msg(&self) -> Option<Message>601     fn next_msg(&self) -> Option<Message> {
602         while let Some(msg) = self.i.pending_items.borrow_mut().pop_front() {
603             let mut v: MsgHandlerList = mem::replace(&mut *self.i.handlers.borrow_mut(), vec!());
604             let b = msghandler_process(&mut v, &msg, self);
605             let mut v2 = self.i.handlers.borrow_mut();
606             v.append(&mut *v2);
607             *v2 = v;
608             if !b { return Some(msg) };
609         };
610         None
611     }
612 
613 }
614 
615 impl Drop for Connection {
drop(&mut self)616     fn drop(&mut self) {
617         unsafe {
618             ffi::dbus_connection_close(self.conn());
619             ffi::dbus_connection_unref(self.conn());
620         }
621     }
622 }
623 
624 impl fmt::Debug for Connection {
fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error>625     fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
626         write!(f, "D-Bus Connection({})", self.unique_name())
627     }
628 }
629 
630 #[derive(Clone, Debug)]
631 /// Type of messages to be handled by a MsgHandler.
632 ///
633 /// Note: More variants can be added in the future; but unless you're writing your own D-Bus engine
634 /// you should not have to match on these anyway.
635 pub enum MsgHandlerType {
636     /// Handle all messages
637     All,
638     /// Handle only messages of a specific type
639     MsgType(MessageType),
640     /// Handle only method replies with this serial number
641     Reply(u32),
642 }
643 
644 impl MsgHandlerType {
matches_msg(&self, m: &Message) -> bool645     fn matches_msg(&self, m: &Message) -> bool {
646         match *self {
647             MsgHandlerType::All => true,
648             MsgHandlerType::MsgType(t) => m.msg_type() == t,
649             MsgHandlerType::Reply(serial) => {
650                 let t = m.msg_type();
651                 ((t == MessageType::MethodReturn) || (t == MessageType::Error)) && (m.get_reply_serial() == Some(serial))
652             }
653         }
654     }
655 }
656 
657 /// A trait for handling incoming messages.
658 pub trait MsgHandler {
659     /// Type of messages for which the handler will be called
660     ///
661     /// Note: The return value of this function might be cached, so it must return the same value all the time.
handler_type(&self) -> MsgHandlerType662     fn handler_type(&self) -> MsgHandlerType;
663 
664     /// Function to be called if the message matches the MsgHandlerType
handle_msg(&mut self, _msg: &Message) -> Option<MsgHandlerResult>665     fn handle_msg(&mut self, _msg: &Message) -> Option<MsgHandlerResult> { None }
666 }
667 
668 /// The result from MsgHandler::handle.
669 #[derive(Debug, Default)]
670 pub struct MsgHandlerResult {
671     /// Indicates that the message has been dealt with and should not be processed further.
672     pub handled: bool,
673     /// Indicates that this MsgHandler no longer wants to receive messages and should be removed.
674     pub done: bool,
675     /// Messages to send (e g, a reply to a method call)
676     pub reply: Vec<Message>,
677 }
678 
679 type MsgHandlerList = Vec<Box<MsgHandler>>;
680 
msghandler_process(v: &mut MsgHandlerList, m: &Message, c: &Connection) -> bool681 fn msghandler_process(v: &mut MsgHandlerList, m: &Message, c: &Connection) -> bool {
682     let mut ii: isize = -1;
683     loop {
684         ii += 1;
685         let i = ii as usize;
686         if i >= v.len() { return false };
687 
688         if !v[i].handler_type().matches_msg(m) { continue; }
689         if let Some(r) = v[i].handle_msg(m) {
690             for msg in r.reply.into_iter() { c.send(msg).unwrap(); }
691             if r.done { v.remove(i); ii -= 1; }
692             if r.handled { return true; }
693         }
694     }
695 }
696 
697 /// The struct returned from `Connection::send_and_reply`.
698 ///
699 /// It implements the `MsgHandler` trait so you can use `Connection::add_handler`.
700 pub struct MessageReply<F>(Option<F>, u32);
701 
702 impl<'a, F: FnOnce(Result<&Message, Error>) + 'a> MsgHandler for MessageReply<F> {
handler_type(&self) -> MsgHandlerType703     fn handler_type(&self) -> MsgHandlerType { MsgHandlerType::Reply(self.1) }
handle_msg(&mut self, msg: &Message) -> Option<MsgHandlerResult>704     fn handle_msg(&mut self, msg: &Message) -> Option<MsgHandlerResult> {
705         let e = match msg.msg_type() {
706             MessageType::MethodReturn => Ok(msg),
707             MessageType::Error => Err(msg.set_error_from_msg().unwrap_err()),
708             _ => unreachable!(),
709         };
710         debug_assert_eq!(msg.get_reply_serial(), Some(self.1));
711         self.0.take().unwrap()(e);
712         return Some(MsgHandlerResult { handled: true, done: true, reply: Vec::new() })
713     }
714 }
715 
716 
717 #[test]
message_reply()718 fn message_reply() {
719     use std::{cell, rc};
720     let c = Connection::get_private(BusType::Session).unwrap();
721     assert!(c.is_connected());
722     let m = Message::new_method_call("org.freedesktop.DBus", "/", "org.freedesktop.DBus", "ListNames").unwrap();
723     let quit = rc::Rc::new(cell::Cell::new(false));
724     let quit2 = quit.clone();
725     let reply = c.send_with_reply(m, move |result| {
726         let r = result.unwrap();
727         let _: ::arg::Array<&str, _>  = r.get1().unwrap();
728         quit2.set(true);
729     }).unwrap();
730     for _ in c.iter(1000).with(reply) { if quit.get() { return; } }
731     assert!(false);
732 }
733 
734