1 use crate::{BusType, Error, Message, to_c_str, Watch};
2 use std::{ptr, str};
3 use std::ffi::CStr;
4 use std::os::raw::{c_void};
5
6 #[derive(Debug)]
7 pub struct ConnHandle(*mut ffi::DBusConnection);
8
9 unsafe impl Send for ConnHandle {}
10 unsafe impl Sync for ConnHandle {}
11
12 impl Drop for ConnHandle {
drop(&mut self)13 fn drop(&mut self) {
14 unsafe {
15 ffi::dbus_connection_close(self.0);
16 ffi::dbus_connection_unref(self.0);
17 }
18 }
19 }
20
21 /// Experimental rewrite of Connection [unstable / experimental]
22 ///
23 /// Slightly lower level, with better support for async operations.
24 /// Also, this struct is Send + Sync.
25 ///
26 /// Blocking operations should be clearly marked as such, although if you
27 /// try to access the connection from several threads at the same time,
28 /// blocking might occur due to an internal mutex inside the dbus library.
29 ///
30 /// This version avoids dbus_connection_dispatch, and thus avoids
31 /// callbacks from that function. Instead the same functionality needs to be
32 /// implemented by these bindings somehow - this is not done yet.
33 #[derive(Debug)]
34 pub struct TxRx {
35 handle: ConnHandle,
36 }
37
38 impl TxRx {
39 #[inline(always)]
conn(&self) -> *mut ffi::DBusConnection40 pub (crate) fn conn(&self) -> *mut ffi::DBusConnection {
41 self.handle.0
42 }
43
conn_from_ptr(ptr: *mut ffi::DBusConnection) -> Result<TxRx, Error>44 fn conn_from_ptr(ptr: *mut ffi::DBusConnection) -> Result<TxRx, Error> {
45 let handle = ConnHandle(ptr);
46
47 /* No, we don't want our app to suddenly quit if dbus goes down */
48 unsafe { ffi::dbus_connection_set_exit_on_disconnect(ptr, 0) };
49
50 let c = TxRx { handle };
51
52 Ok(c)
53 }
54
55
56 /// Creates a new D-Bus connection.
57 ///
58 /// Blocking: until the connection is up and running.
get_private(bus: BusType) -> Result<TxRx, Error>59 pub fn get_private(bus: BusType) -> Result<TxRx, Error> {
60 let mut e = Error::empty();
61 let conn = unsafe { ffi::dbus_bus_get_private(bus, e.get_mut()) };
62 if conn == ptr::null_mut() {
63 return Err(e)
64 }
65 Self::conn_from_ptr(conn)
66 }
67
68 /// Creates a new D-Bus connection to a remote address.
69 ///
70 /// Note: for all common cases (System / Session bus) you probably want "get_private" instead.
71 ///
72 /// Blocking: until the connection is established.
open_private(address: &str) -> Result<TxRx, Error>73 pub fn open_private(address: &str) -> Result<TxRx, Error> {
74 let mut e = Error::empty();
75 let conn = unsafe { ffi::dbus_connection_open_private(to_c_str(address).as_ptr(), e.get_mut()) };
76 if conn == ptr::null_mut() {
77 return Err(e)
78 }
79 Self::conn_from_ptr(conn)
80 }
81
82 /// Registers a new D-Bus connection with the bus.
83 ///
84 /// Note: `get_private` does this automatically, useful with `open_private`
85 ///
86 /// Blocking: until a "Hello" response is received from the server.
register(&mut self) -> Result<(), Error>87 pub fn register(&mut self) -> Result<(), Error> {
88 // This function needs to take &mut self, because it changes unique_name and unique_name takes a &self
89 let mut e = Error::empty();
90 if unsafe { ffi::dbus_bus_register(self.conn(), e.get_mut()) == 0 } {
91 Err(e)
92 } else {
93 Ok(())
94 }
95 }
96
97 /// Gets whether the connection is currently open.
is_connected(&self) -> bool98 pub fn is_connected(&self) -> bool {
99 unsafe { ffi::dbus_connection_get_is_connected(self.conn()) != 0 }
100 }
101
102 /// Get the connection's unique name.
103 ///
104 /// It's usually something like ":1.54"
unique_name(&self) -> Option<&str>105 pub fn unique_name(&self) -> Option<&str> {
106 let c = unsafe { ffi::dbus_bus_get_unique_name(self.conn()) };
107 if c == ptr::null_mut() { return None; }
108 let s = unsafe { CStr::from_ptr(c) };
109 str::from_utf8(s.to_bytes()).ok()
110 }
111
112
113 /// Puts a message into libdbus out queue. Use "flush" or "read_write" to make sure it is sent over the wire.
114 ///
115 /// Returns a serial number than can be used to match against a reply.
send(&self, msg: Message) -> Result<u32, ()>116 pub fn send(&self, msg: Message) -> Result<u32, ()> {
117 let mut serial = 0u32;
118 let r = unsafe { ffi::dbus_connection_send(self.conn(), msg.ptr(), &mut serial) };
119 if r == 0 { return Err(()); }
120 Ok(serial)
121 }
122
123 /// Flush the queue of outgoing messages.
124 ///
125 /// Blocking: until the outgoing queue is empty.
flush(&self)126 pub fn flush(&self) { unsafe { ffi::dbus_connection_flush(self.conn()) } }
127
128 /// Read and write to the connection.
129 ///
130 /// Incoming messages are put in the internal queue, outgoing messages are written.
131 ///
132 /// Blocking: If there are no messages, for up to timeout_ms milliseconds, or forever if timeout_ms is None.
133 /// For non-blocking behaviour, set timeout_ms to Some(0).
read_write(&self, timeout_ms: Option<i32>) -> Result<(), ()>134 pub fn read_write(&self, timeout_ms: Option<i32>) -> Result<(), ()> {
135 let t = timeout_ms.unwrap_or(-1);
136 if unsafe { ffi::dbus_connection_read_write(self.conn(), t) == 0 } {
137 Err(())
138 } else {
139 Ok(())
140 }
141 }
142
143 /// Removes a message from the incoming queue, or returns None if the queue is empty.
144 ///
145 /// Use "read_write" first, so that messages are put into the incoming queue.
146 /// For unhandled messages, please call MessageDispatcher::default_dispatch to return
147 /// default replies for method calls.
pop_message(&self) -> Option<Message>148 pub fn pop_message(&self) -> Option<Message> {
149 let mptr = unsafe { ffi::dbus_connection_pop_message(self.conn()) };
150 if mptr == ptr::null_mut() {
151 None
152 } else {
153 Some(Message::from_ptr(mptr, false))
154 }
155 }
156
157 /// Get an up-to-date list of file descriptors to watch.
158 ///
159 /// Might be changed into something that allows for callbacks when the watch list is changed.
watch_fds(&mut self) -> Result<Vec<Watch>, ()>160 pub fn watch_fds(&mut self) -> Result<Vec<Watch>, ()> {
161 extern "C" fn add_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) -> u32 {
162 unsafe {
163 let wlist: &mut Vec<Watch> = &mut *(data as *mut _);
164 wlist.push(Watch::from_raw(watch));
165 }
166 1
167 }
168 let mut r = vec!();
169 if unsafe { ffi::dbus_connection_set_watch_functions(self.conn(),
170 Some(add_watch_cb), None, None, &mut r as *mut _ as *mut _, None) } == 0 { return Err(()) }
171 assert!(unsafe { ffi::dbus_connection_set_watch_functions(self.conn(),
172 None, None, None, ptr::null_mut(), None) } != 0);
173 Ok(r)
174 }
175 }
176
177 #[test]
test_txrx_send_sync()178 fn test_txrx_send_sync() {
179 fn is_send<T: Send>(_: &T) {}
180 fn is_sync<T: Sync>(_: &T) {}
181 let c = TxRx::get_private(BusType::Session).unwrap();
182 is_send(&c);
183 is_sync(&c);
184 }
185
186 #[test]
txrx_simple_test()187 fn txrx_simple_test() {
188 let mut c = TxRx::get_private(BusType::Session).unwrap();
189 assert!(c.is_connected());
190 let fds = c.watch_fds().unwrap();
191 println!("{:?}", fds);
192 assert!(fds.len() > 0);
193 let m = Message::new_method_call("org.freedesktop.DBus", "/", "org.freedesktop.DBus", "ListNames").unwrap();
194 let reply = c.send(m).unwrap();
195 let my_name = c.unique_name().unwrap();
196 loop {
197 while let Some(mut msg) = c.pop_message() {
198 println!("{:?}", msg);
199 if msg.get_reply_serial() == Some(reply) {
200 let r = msg.as_result().unwrap();
201 let z: ::arg::Array<&str, _> = r.get1().unwrap();
202 for n in z {
203 println!("{}", n);
204 if n == my_name { return; } // Hooray, we found ourselves!
205 }
206 assert!(false);
207 } else if let Some(r) = crate::MessageDispatcher::<()>::default_dispatch(&msg) {
208 c.send(r).unwrap();
209 }
210 }
211 c.read_write(Some(100)).unwrap();
212 }
213 }
214
215