1 //! D-Bus bindings for Rust
2 //!
3 //! [D-Bus](http://dbus.freedesktop.org/) is a message bus, and is mainly used in Linux
4 //! for communication between processes. It is present by default on almost every
5 //! Linux distribution out there, and runs in two instances - one per session, and one
6 //! system-wide.
7 //!
8 //! See the examples directory for some examples to get you started.
9 
10 #![warn(missing_docs)]
11 
12 extern crate libc;
13 
14 pub use ffi::DBusBusType as BusType;
15 pub use ffi::DBusNameFlag as NameFlag;
16 pub use ffi::DBusRequestNameReply as RequestNameReply;
17 pub use ffi::DBusReleaseNameReply as ReleaseNameReply;
18 pub use ffi::DBusMessageType as MessageType;
19 pub use ffi::DBusWatchEvent as WatchEvent;
20 
21 pub use message::{Message, MessageItem, FromMessageItem, OwnedFd, ArrayError};
22 pub use prop::PropHandler;
23 pub use prop::Props;
24 pub use watch::Watch;
25 
26 /// A TypeSig describes the type of a MessageItem.
27 pub type TypeSig<'a> = std::borrow::Cow<'a, str>;
28 
29 use std::ffi::{CString, CStr};
30 use std::ptr::{self};
31 use std::collections::LinkedList;
32 use std::cell::{Cell, RefCell};
33 use std::mem;
34 use std::os::unix::io::RawFd;
35 
36 #[allow(missing_docs)]
37 mod ffi;
38 mod message;
39 mod prop;
40 mod objpath;
41 mod watch;
42 
43 mod strings;
44 pub use strings::{Signature, Path, Interface, Member, ErrorName, BusName};
45 
46 /// Contains functionality for the "server" of a D-Bus object. A remote application can
47 /// introspect this object and call methods on it.
48 /// Deprecated - use the `tree` module instead.
49 pub mod obj {
50     pub use objpath::{ObjectPath, Interface, Property, Signal, Argument};
51     pub use objpath::{Method, MethodHandler, MethodResult};
52     pub use objpath::{PropertyROHandler, PropertyRWHandler, PropertyWOHandler, PropertyGetResult, PropertySetResult};
53 }
54 
55 mod methoddisp;
56 
57 /// Contains functionality for dispatching methods on a D-Bus "server".
58 /// Supersedes the `obj` module. Properties are somewhat still WIP,
59 /// but should in any case be better than `obj` already.
60 ///
61 /// # Example
62 /// ```
63 /// use dbus::{tree, Connection, BusType};
64 /// let f = tree::Factory::new_fn();
65 /// /* Add a method returning "Thanks!" on interface "com.example.dbus.rs"
66 ///    on object path "/example". */
67 /// let t = f.tree().add(f.object_path("/example").introspectable()
68 ///     .add(f.interface("com.example.dbus.rs")
69 ///         .add_m(f.method("CallMe", |m,_,_| {
70 ///             Ok(vec!(m.method_return().append("Thanks!"))) }
71 ///         ).out_arg("s"))
72 /// ));
73 ///
74 /// let c = Connection::get_private(BusType::Session).unwrap();
75 /// t.set_registered(&c, true).unwrap();
76 /// /* Run forever */
77 /// // for _ in t.run(&c, c.iter(1000)) {}
78 /// ```
79 
80 pub mod tree {
81     pub use methoddisp::{Factory, Tree, TreeServer, ObjectPath, Interface, Signal};
82     pub use methoddisp::{Property, EmitsChangedSignal, Access};
83     pub use methoddisp::{Method, MethodErr, MethodResult, Argument};
84     pub use methoddisp::{MethodFn, MethodFnMut, MethodSync};
85 }
86 
87 static INITDBUS: std::sync::Once = std::sync::ONCE_INIT;
88 
init_dbus()89 fn init_dbus() {
90     INITDBUS.call_once(|| {
91         if unsafe { ffi::dbus_threads_init_default() } == 0 {
92             panic!("Out of memory when trying to initialize D-Bus library!");
93         }
94     });
95 }
96 
97 /// D-Bus Error wrapper
98 pub struct Error {
99     e: ffi::DBusError,
100 }
101 
102 unsafe impl Send for Error {}
103 
c_str_to_slice(c: & *const libc::c_char) -> Option<&str>104 fn c_str_to_slice(c: & *const libc::c_char) -> Option<&str> {
105     if *c == ptr::null() { None }
106     else { std::str::from_utf8( unsafe { CStr::from_ptr(*c).to_bytes() }).ok() }
107 }
108 
to_c_str(n: &str) -> CString109 fn to_c_str(n: &str) -> CString { CString::new(n.as_bytes()).unwrap() }
110 
111 impl Error {
112 
113     /// Create a new custom D-Bus Error.
new_custom(name: &str, message: &str) -> Error114     pub fn new_custom(name: &str, message: &str) -> Error {
115         let n = to_c_str(name);
116         let m = to_c_str(&message.replace("%","%%"));
117         let mut e = Error::empty();
118 
119         unsafe { ffi::dbus_set_error(e.get_mut(), n.as_ptr(), m.as_ptr()) };
120         e
121     }
122 
empty() -> Error123     fn empty() -> Error {
124         init_dbus();
125         let mut e = ffi::DBusError {
126             name: ptr::null(),
127             message: ptr::null(),
128             dummy: 0,
129             padding1: ptr::null()
130         };
131         unsafe { ffi::dbus_error_init(&mut e); }
132         Error{ e: e }
133     }
134 
135     /// Error name/type, e g 'org.freedesktop.DBus.Error.Failed'
name(&self) -> Option<&str>136     pub fn name(&self) -> Option<&str> {
137         c_str_to_slice(&self.e.name)
138     }
139 
140     /// Custom message, e g 'Could not find a matching object path'
message(&self) -> Option<&str>141     pub fn message(&self) -> Option<&str> {
142         c_str_to_slice(&self.e.message)
143     }
144 
get_mut(&mut self) -> &mut ffi::DBusError145     fn get_mut(&mut self) -> &mut ffi::DBusError { &mut self.e }
146 }
147 
148 impl Drop for Error {
drop(&mut self)149     fn drop(&mut self) {
150         unsafe { ffi::dbus_error_free(&mut self.e); }
151     }
152 }
153 
154 impl std::fmt::Debug for Error {
fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error>155     fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
156         write!(f, "D-Bus error: {} ({})", self.message().unwrap_or(""),
157             self.name().unwrap_or(""))
158     }
159 }
160 
161 impl std::error::Error for Error {
description(&self) -> &str162     fn description(&self) -> &str { "D-Bus error" }
163 }
164 
165 impl std::fmt::Display for Error {
fmt(&self, f: &mut std::fmt::Formatter) -> Result<(),std::fmt::Error>166     fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(),std::fmt::Error> {
167         if let Some(x) = self.message() {
168              write!(f, "{:?}", x.to_string())
169         } else { Ok(()) }
170     }
171 }
172 
173 /// When listening for incoming events on the D-Bus, this enum will tell you what type
174 /// of incoming event has happened.
175 #[derive(Debug)]
176 pub enum ConnectionItem {
177     /// No event between now and timeout
178     Nothing,
179     /// Incoming method call
180     MethodCall(Message),
181     /// Incoming signal
182     Signal(Message),
183     /// Incoming method return (mostly used for Async I/O)
184     MethodReturn(Message),
185     /// Indicates whether a file descriptor should be monitored or not.
186     /// Unless you're doing Async I/O, you can simply ignore this variant.
187     WatchFd(Watch),
188 }
189 
190 /// ConnectionItem iterator
191 pub struct ConnectionItems<'a> {
192     c: &'a Connection,
193     timeout_ms: Option<i32>,
194 }
195 
196 impl<'a> Iterator for ConnectionItems<'a> {
197     type Item = ConnectionItem;
next(&mut self) -> Option<ConnectionItem>198     fn next(&mut self) -> Option<ConnectionItem> {
199         loop {
200             let i = self.c.i.pending_items.borrow_mut().pop_front();
201             if i.is_some() { return i; }
202 
203             match self.timeout_ms {
204                 Some(t) => {
205                     let r = unsafe { ffi::dbus_connection_read_write_dispatch(self.c.conn(), t as libc::c_int) };
206                     if !self.c.i.pending_items.borrow().is_empty() { continue };
207                     if r == 0 { return None; }
208                     return Some(ConnectionItem::Nothing);
209                 }
210                 None => {
211                     let r = unsafe { ffi::dbus_connection_dispatch(self.c.conn()) };
212                     if !self.c.i.pending_items.borrow().is_empty() { continue };
213                     if r == ffi::DBusDispatchStatus::DataRemains { continue };
214                     if r == ffi::DBusDispatchStatus::Complete { return None };
215                     panic!("dbus_connection_dispatch failed");
216                 }
217             }
218         }
219     }
220 }
221 
222 /* Since we register callbacks with userdata pointers,
223    we need to make sure the connection pointer does not move around.
224    Hence this extra indirection. */
225 struct IConnection {
226     conn: Cell<*mut ffi::DBusConnection>,
227     pending_items: RefCell<LinkedList<ConnectionItem>>,
228     watches: Option<Box<watch::WatchList>>,
229 }
230 
231 /// A D-Bus connection. Start here if you want to get on the D-Bus!
232 pub struct Connection {
233     i: Box<IConnection>,
234 }
235 
filter_message_cb(conn: *mut ffi::DBusConnection, msg: *mut ffi::DBusMessage, user_data: *mut libc::c_void) -> ffi::DBusHandlerResult236 extern "C" fn filter_message_cb(conn: *mut ffi::DBusConnection, msg: *mut ffi::DBusMessage,
237     user_data: *mut libc::c_void) -> ffi::DBusHandlerResult {
238 
239     let m = message::message_from_ptr(msg, true);
240     let i: &IConnection = unsafe { mem::transmute(user_data) };
241     assert!(i.conn.get() == conn);
242 
243     let mtype: ffi::DBusMessageType = unsafe { mem::transmute(ffi::dbus_message_get_type(msg)) };
244     let r = match mtype {
245         ffi::DBusMessageType::Signal => {
246             i.pending_items.borrow_mut().push_back(ConnectionItem::Signal(m));
247             ffi::DBusHandlerResult::Handled
248         }
249         ffi::DBusMessageType::MethodReturn => {
250             i.pending_items.borrow_mut().push_back(ConnectionItem::MethodReturn(m));
251             ffi::DBusHandlerResult::NotYetHandled
252         }
253         _ => ffi::DBusHandlerResult::NotYetHandled,
254     };
255 
256     r
257 }
258 
object_path_message_cb(conn: *mut ffi::DBusConnection, msg: *mut ffi::DBusMessage, user_data: *mut libc::c_void) -> ffi::DBusHandlerResult259 extern "C" fn object_path_message_cb(conn: *mut ffi::DBusConnection, msg: *mut ffi::DBusMessage,
260     user_data: *mut libc::c_void) -> ffi::DBusHandlerResult {
261 
262     let m = message::message_from_ptr(msg, true);
263     let i: &IConnection = unsafe { mem::transmute(user_data) };
264     assert!(i.conn.get() == conn);
265     i.pending_items.borrow_mut().push_back(ConnectionItem::MethodCall(m));
266     ffi::DBusHandlerResult::Handled
267 }
268 
269 impl Connection {
270 
271     #[inline(always)]
conn(&self) -> *mut ffi::DBusConnection272     fn conn(&self) -> *mut ffi::DBusConnection {
273         self.i.conn.get()
274     }
275 
276     /// Creates a new D-Bus connection.
get_private(bus: BusType) -> Result<Connection, Error>277     pub fn get_private(bus: BusType) -> Result<Connection, Error> {
278         let mut e = Error::empty();
279         let conn = unsafe { ffi::dbus_bus_get_private(bus, e.get_mut()) };
280         if conn == ptr::null_mut() {
281             return Err(e)
282         }
283         let mut c = Connection { i: Box::new(IConnection {
284             conn: Cell::new(conn),
285             pending_items: RefCell::new(LinkedList::new()),
286             watches: None,
287         })};
288 
289         /* No, we don't want our app to suddenly quit if dbus goes down */
290         unsafe { ffi::dbus_connection_set_exit_on_disconnect(conn, 0) };
291         assert!(unsafe {
292             ffi::dbus_connection_add_filter(c.conn(), Some(filter_message_cb as ffi::DBusCallback), mem::transmute(&*c.i), None)
293         } != 0);
294 
295         let iconn: *const IConnection = &*c.i;
296         c.i.watches = Some(watch::WatchList::new(&c, Box::new(move |w| {
297             let i: &IConnection = unsafe { mem::transmute(iconn) };
298             i.pending_items.borrow_mut().push_back(ConnectionItem::WatchFd(w));
299         })));
300 
301         Ok(c)
302     }
303 
304     /// Sends a message over the D-Bus and waits for a reply.
305     /// This is usually used for method calls.
send_with_reply_and_block(&self, msg: Message, timeout_ms: i32) -> Result<Message, Error>306     pub fn send_with_reply_and_block(&self, msg: Message, timeout_ms: i32) -> Result<Message, Error> {
307         let mut e = Error::empty();
308         let response = unsafe {
309             ffi::dbus_connection_send_with_reply_and_block(self.conn(), message::get_message_ptr(&msg),
310                 timeout_ms as libc::c_int, e.get_mut())
311         };
312         if response == ptr::null_mut() {
313             return Err(e);
314         }
315         Ok(message::message_from_ptr(response, false))
316     }
317 
318     /// Sends a message over the D-Bus without waiting. Useful for sending signals and method call replies.
send(&self, msg: Message) -> Result<u32,()>319     pub fn send(&self, msg: Message) -> Result<u32,()> {
320         let mut serial = 0u32;
321         let r = unsafe { ffi::dbus_connection_send(self.conn(), message::get_message_ptr(&msg), &mut serial) };
322         if r == 0 { return Err(()); }
323         unsafe { ffi::dbus_connection_flush(self.conn()) };
324         Ok(serial)
325     }
326 
327     /// Get the connection's unique name.
unique_name(&self) -> String328     pub fn unique_name(&self) -> String {
329         let c = unsafe { ffi::dbus_bus_get_unique_name(self.conn()) };
330         c_str_to_slice(&c).unwrap_or("").to_string()
331     }
332 
333     /// Check if there are new incoming events
iter(&self, timeout_ms: i32) -> ConnectionItems334     pub fn iter(&self, timeout_ms: i32) -> ConnectionItems {
335         ConnectionItems {
336             c: self,
337             timeout_ms: Some(timeout_ms),
338         }
339     }
340 
341     /// Register an object path.
register_object_path(&self, path: &str) -> Result<(), Error>342     pub fn register_object_path(&self, path: &str) -> Result<(), Error> {
343         let mut e = Error::empty();
344         let p = to_c_str(path);
345         let vtable = ffi::DBusObjectPathVTable {
346             unregister_function: None,
347             message_function: Some(object_path_message_cb as ffi::DBusCallback),
348             dbus_internal_pad1: None,
349             dbus_internal_pad2: None,
350             dbus_internal_pad3: None,
351             dbus_internal_pad4: None,
352         };
353         let r = unsafe {
354             let user_data: *mut libc::c_void = mem::transmute(&*self.i);
355             ffi::dbus_connection_try_register_object_path(self.conn(), p.as_ptr(), &vtable, user_data, e.get_mut())
356         };
357         if r == 0 { Err(e) } else { Ok(()) }
358     }
359 
360     /// Unregister an object path.
unregister_object_path(&self, path: &str)361     pub fn unregister_object_path(&self, path: &str) {
362         let p = to_c_str(path);
363         let r = unsafe { ffi::dbus_connection_unregister_object_path(self.conn(), p.as_ptr()) };
364         if r == 0 { panic!("Out of memory"); }
365     }
366 
367     /// List registered object paths.
list_registered_object_paths(&self, path: &str) -> Vec<String>368     pub fn list_registered_object_paths(&self, path: &str) -> Vec<String> {
369         let p = to_c_str(path);
370         let mut clist: *mut *mut libc::c_char = ptr::null_mut();
371         let r = unsafe { ffi::dbus_connection_list_registered(self.conn(), p.as_ptr(), &mut clist) };
372         if r == 0 { panic!("Out of memory"); }
373         let mut v = Vec::new();
374         let mut i = 0;
375         loop {
376             let s = unsafe {
377                 let citer = clist.offset(i);
378                 if *citer == ptr::null_mut() { break };
379                 mem::transmute(citer)
380             };
381             v.push(format!("{}", c_str_to_slice(s).unwrap()));
382             i += 1;
383         }
384         unsafe { ffi::dbus_free_string_array(clist) };
385         v
386     }
387 
388     /// Register a name.
register_name(&self, name: &str, flags: u32) -> Result<RequestNameReply, Error>389     pub fn register_name(&self, name: &str, flags: u32) -> Result<RequestNameReply, Error> {
390         let mut e = Error::empty();
391         let n = to_c_str(name);
392         let r = unsafe { ffi::dbus_bus_request_name(self.conn(), n.as_ptr(), flags, e.get_mut()) };
393         if r == -1 { Err(e) } else { Ok(unsafe { mem::transmute(r) }) }
394     }
395 
396     /// Release a name.
release_name(&self, name: &str) -> Result<ReleaseNameReply, Error>397     pub fn release_name(&self, name: &str) -> Result<ReleaseNameReply, Error> {
398         let mut e = Error::empty();
399         let n = to_c_str(name);
400         let r = unsafe { ffi::dbus_bus_release_name(self.conn(), n.as_ptr(), e.get_mut()) };
401         if r == -1 { Err(e) } else { Ok(unsafe { mem::transmute(r) }) }
402     }
403 
404     /// Add a match rule to match messages on the message bus.
add_match(&self, rule: &str) -> Result<(), Error>405     pub fn add_match(&self, rule: &str) -> Result<(), Error> {
406         let mut e = Error::empty();
407         let n = to_c_str(rule);
408         unsafe { ffi::dbus_bus_add_match(self.conn(), n.as_ptr(), e.get_mut()) };
409         if e.name().is_some() { Err(e) } else { Ok(()) }
410     }
411 
412     /// Remove a match rule to match messages on the message bus.
remove_match(&self, rule: &str) -> Result<(), Error>413     pub fn remove_match(&self, rule: &str) -> Result<(), Error> {
414         let mut e = Error::empty();
415         let n = to_c_str(rule);
416         unsafe { ffi::dbus_bus_remove_match(self.conn(), n.as_ptr(), e.get_mut()) };
417         if e.name().is_some() { Err(e) } else { Ok(()) }
418     }
419 
420     /// Async I/O: Get an up-to-date list of file descriptors to watch.
watch_fds(&self) -> Vec<Watch>421     pub fn watch_fds(&self) -> Vec<Watch> {
422         self.i.watches.as_ref().unwrap().get_enabled_fds()
423     }
424 
425     /// Async I/O: Call this function whenever you detected an event on the Fd,
426     /// Flags are a set of WatchEvent bits.
427     /// The returned iterator will return pending items only, never block for new events.
watch_handle(&self, fd: RawFd, flags: libc::c_uint) -> ConnectionItems428     pub fn watch_handle(&self, fd: RawFd, flags: libc::c_uint) -> ConnectionItems {
429         self.i.watches.as_ref().unwrap().watch_handle(fd, flags);
430         ConnectionItems { c: self, timeout_ms: None }
431     }
432 
433 }
434 
435 impl Drop for Connection {
drop(&mut self)436     fn drop(&mut self) {
437         unsafe {
438             ffi::dbus_connection_close(self.conn());
439             ffi::dbus_connection_unref(self.conn());
440         }
441     }
442 }
443 
444 impl std::fmt::Debug for Connection {
fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error>445     fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
446         write!(f, "D-Bus Connection({})", self.unique_name())
447     }
448 }
449 
450 #[cfg(test)]
451 mod test {
452     use super::{Connection, Message, BusType, MessageItem, ConnectionItem, NameFlag,
453         RequestNameReply, ReleaseNameReply};
454 
455     #[test]
connection()456     fn connection() {
457         let c = Connection::get_private(BusType::Session).unwrap();
458         let n = c.unique_name();
459         assert!(n.starts_with(":1."));
460         println!("Connected to DBus, unique name: {}", n);
461     }
462 
463     #[test]
invalid_message()464     fn invalid_message() {
465         let c = Connection::get_private(BusType::Session).unwrap();
466         let m = Message::new_method_call("foo.bar", "/", "foo.bar", "FooBar").unwrap();
467         let e = c.send_with_reply_and_block(m, 2000).err().unwrap();
468         assert!(e.name().unwrap() == "org.freedesktop.DBus.Error.ServiceUnknown");
469     }
470 
471     #[test]
message_listnames()472     fn message_listnames() {
473         let c = Connection::get_private(BusType::Session).unwrap();
474         let m = Message::method_call(&"org.freedesktop.DBus".into(), &"/".into(),
475             &"org.freedesktop.DBus".into(), &"ListNames".into());
476         let r = c.send_with_reply_and_block(m, 2000).unwrap();
477         let reply = r.get_items();
478         println!("{:?}", reply);
479     }
480 
481     #[test]
message_namehasowner()482     fn message_namehasowner() {
483         let c = Connection::get_private(BusType::Session).unwrap();
484         let mut m = Message::new_method_call("org.freedesktop.DBus", "/", "org.freedesktop.DBus", "NameHasOwner").unwrap();
485         m.append_items(&[MessageItem::Str("org.freedesktop.DBus".to_string())]);
486         let r = c.send_with_reply_and_block(m, 2000).unwrap();
487         let reply = r.get_items();
488         println!("{:?}", reply);
489         assert_eq!(reply, vec!(MessageItem::Bool(true)));
490     }
491 
492     #[test]
object_path()493     fn object_path() {
494         use  std::sync::mpsc;
495         let (tx, rx) = mpsc::channel();
496         let thread = ::std::thread::spawn(move || {
497             let c = Connection::get_private(BusType::Session).unwrap();
498             c.register_object_path("/hello").unwrap();
499             // println!("Waiting...");
500             tx.send(c.unique_name()).unwrap();
501             for n in c.iter(1000) {
502                 // println!("Found message... ({})", n);
503                 match n {
504                     ConnectionItem::MethodCall(ref m) => {
505                         let reply = Message::new_method_return(m).unwrap();
506                         c.send(reply).unwrap();
507                         break;
508                     }
509                     _ => {}
510                 }
511             }
512             c.unregister_object_path("/hello");
513         });
514 
515         let c = Connection::get_private(BusType::Session).unwrap();
516         let n = rx.recv().unwrap();
517         let m = Message::new_method_call(&n, "/hello", "com.example.hello", "Hello").unwrap();
518         println!("Sending...");
519         let r = c.send_with_reply_and_block(m, 8000).unwrap();
520         let reply = r.get_items();
521         println!("{:?}", reply);
522         thread.join().unwrap();
523 
524     }
525 
526     #[test]
register_name()527     fn register_name() {
528         let c = Connection::get_private(BusType::Session).unwrap();
529         let n = format!("com.example.hello.test.register_name");
530         assert_eq!(c.register_name(&n, NameFlag::ReplaceExisting as u32).unwrap(), RequestNameReply::PrimaryOwner);
531         assert_eq!(c.release_name(&n).unwrap(), ReleaseNameReply::Released);
532     }
533 
534     #[test]
signal()535     fn signal() {
536         let c = Connection::get_private(BusType::Session).unwrap();
537         let iface = "com.example.signaltest";
538         let mstr = format!("interface='{}',member='ThisIsASignal'", iface);
539         c.add_match(&mstr).unwrap();
540         let m = Message::new_signal("/mysignal", iface, "ThisIsASignal").unwrap();
541         let uname = c.unique_name();
542         c.send(m).unwrap();
543         for n in c.iter(1000) {
544             match n {
545                 ConnectionItem::Signal(s) => {
546                     let (_, p, i, m) = s.headers();
547                     match (&*p.unwrap(), &*i.unwrap(), &*m.unwrap()) {
548                         ("/mysignal", "com.example.signaltest", "ThisIsASignal") => {
549                             assert_eq!(s.sender().unwrap(), uname);
550                             break;
551                         },
552                         (_, _, _) => println!("Other signal: {:?}", s.headers()),
553                     }
554                 }
555                 _ => {},
556             }
557         }
558         c.remove_match(&mstr).unwrap();
559     }
560 
561     #[test]
watch()562     fn watch() {
563         let c = Connection::get_private(BusType::Session).unwrap();
564         let mut d = c.watch_fds();
565         assert!(d.len() > 0);
566         println!("Fds to watch: {:?}", d);
567         for n in c.iter(1000) {
568             match n {
569                 ConnectionItem::WatchFd(w) => {
570                     assert!(w.readable() || w.writable());
571                     assert!(d.contains(&w));
572                     d.retain(|x| *x != w);
573                     if d.len() == 0 { break };
574                 }
575                 _ => {},
576             }
577         }
578     }
579 }
580