1 use async_broadcast::{broadcast, InactiveReceiver, Sender as Broadcaster}; 2 use async_channel::{bounded, Receiver, Sender}; 3 use async_executor::Executor; 4 use async_lock::Mutex; 5 use async_task::Task; 6 use event_listener::EventListener; 7 use once_cell::sync::OnceCell; 8 use ordered_stream::{OrderedFuture, OrderedStream, PollResult}; 9 use static_assertions::assert_impl_all; 10 use std::{ 11 collections::{HashMap, HashSet}, 12 convert::TryInto, 13 io::{self, ErrorKind}, 14 ops::Deref, 15 pin::Pin, 16 sync::{ 17 self, 18 atomic::{AtomicU32, Ordering::SeqCst}, 19 Arc, Weak, 20 }, 21 task::{Context, Poll}, 22 }; 23 use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName}; 24 use zvariant::ObjectPath; 25 26 use futures_core::{ready, Future}; 27 use futures_sink::Sink; 28 use futures_util::{ 29 future::{select, Either}, 30 sink::SinkExt, 31 StreamExt, TryFutureExt, 32 }; 33 34 use crate::{ 35 blocking, fdo, 36 raw::{Connection as RawConnection, Socket}, 37 Authenticated, CacheProperties, ConnectionBuilder, DBusError, Error, Guid, Message, 38 MessageStream, MessageType, ObjectServer, Result, 39 }; 40 41 const DEFAULT_MAX_QUEUED: usize = 64; 42 43 /// Inner state shared by Connection and WeakConnection 44 #[derive(Debug)] 45 pub(crate) struct ConnectionInner { 46 server_guid: Guid, 47 #[cfg(unix)] 48 cap_unix_fd: bool, 49 bus_conn: bool, 50 unique_name: OnceCell<OwnedUniqueName>, 51 registered_names: Mutex<HashSet<WellKnownName<'static>>>, 52 53 raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>, 54 55 // Serial number for next outgoing message 56 serial: AtomicU32, 57 58 // Our executor 59 executor: Arc<Executor<'static>>, 60 61 // Message receiver task 62 #[allow(unused)] 63 msg_receiver_task: Task<()>, 64 65 signal_matches: Mutex<HashMap<String, u64>>, 66 67 object_server: OnceCell<blocking::ObjectServer>, 68 object_server_dispatch_task: OnceCell<Task<()>>, 69 } 70 71 // FIXME: Should really use [`AsyncDrop`] for `ConnectionInner` when we've something like that to 72 // cancel `msg_receiver_task` manually to ensure task is gone before the connection is. Same 73 // goes for the registered well-known names. 74 // 75 // [`AsyncDrop`]: https://github.com/rust-lang/wg-async-foundations/issues/65 76 77 #[derive(Debug)] 78 struct MessageReceiverTask { 79 raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>, 80 81 // Message broadcaster. 82 msg_sender: Broadcaster<Arc<Message>>, 83 84 // Sender side of the error channel 85 error_sender: Sender<Error>, 86 } 87 88 impl MessageReceiverTask { new( raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>, msg_sender: Broadcaster<Arc<Message>>, error_sender: Sender<Error>, ) -> Arc<Self>89 fn new( 90 raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>, 91 msg_sender: Broadcaster<Arc<Message>>, 92 error_sender: Sender<Error>, 93 ) -> Arc<Self> { 94 Arc::new(Self { 95 raw_conn, 96 msg_sender, 97 error_sender, 98 }) 99 } 100 spawn(self: Arc<Self>, executor: &Executor<'_>) -> Task<()>101 fn spawn(self: Arc<Self>, executor: &Executor<'_>) -> Task<()> { 102 executor.spawn(async move { 103 self.receive_msg().await; 104 }) 105 } 106 107 // Keep receiving messages and put them on the queue. receive_msg(self: Arc<Self>)108 async fn receive_msg(self: Arc<Self>) { 109 loop { 110 let receive_msg = ReceiveMessage { 111 raw_conn: &self.raw_conn, 112 }; 113 let msg = match receive_msg.await { 114 Ok(msg) => msg, 115 Err(e) => { 116 // Ignore errors when sending the error; this happens if the channel is 117 // being dropped. 118 // 119 // This can be logged when we have logging, though that's mostly useless: it 120 // only happens when the receive_msg task is running at the time the last 121 // Connection is dropped. 122 let _ = self.error_sender.send(e).await; 123 self.msg_sender.close(); 124 self.error_sender.close(); 125 return; 126 } 127 }; 128 129 let msg = Arc::new(msg); 130 if self.msg_sender.broadcast(msg.clone()).await.is_err() { 131 // An error would be due to the channel being closed, which only happens when the 132 // connection is dropped, so just stop the task. See comment above about logging. 133 return; 134 } 135 } 136 } 137 } 138 139 /// A D-Bus connection. 140 /// 141 /// A connection to a D-Bus bus, or a direct peer. 142 /// 143 /// Once created, the connection is authenticated and negotiated and messages can be sent or 144 /// received, such as [method calls] or [signals]. 145 /// 146 /// For higher-level message handling (typed functions, introspection, documentation reasons etc), 147 /// it is recommended to wrap the low-level D-Bus messages into Rust functions with the 148 /// [`dbus_proxy`] and [`dbus_interface`] macros instead of doing it directly on a `Connection`. 149 /// 150 /// Typically, a connection is made to the session bus with [`Connection::session`], or to the 151 /// system bus with [`Connection::system`]. Then the connection is used with [`crate::Proxy`] 152 /// instances or the on-demand [`ObjectServer`] instance that can be accessed through 153 /// [`Connection::object_server`]. 154 /// 155 /// `Connection` implements [`Clone`] and cloning it is a very cheap operation, as the underlying 156 /// data is not cloned. This makes it very convenient to share the connection between different 157 /// parts of your code. `Connection` also implements [`std::marker::Sync`] and[`std::marker::Send`] 158 /// so you can send and share a connection instance across threads as well. 159 /// 160 /// `Connection` keeps an internal queue of incoming message. The maximum capacity of this queue 161 /// is configurable through the [`set_max_queued`] method. The default size is 64. When the queue is 162 /// full, no more messages can be received until room is created for more. This is why it's 163 /// important to ensure that all [`crate::MessageStream`] and [`crate::blocking::MessageIterator`] 164 /// instances are continuously polled and iterated on, respectively. 165 /// 166 /// For sending messages you can either use [`Connection::send_message`] method or make use of the 167 /// [`Sink`] implementation. For latter, you might find [`SinkExt`] API very useful. Keep in mind 168 /// that [`Connection`] will not manage the serial numbers (cookies) on the messages for you when 169 /// they are sent through the [`Sink`] implementation. You can manually assign unique serial numbers 170 /// to them using the [`Connection::assign_serial_num`] method before sending them off, if needed. 171 /// Having said that, the [`Sink`] is mainly useful for sending out signals, as they do not expect 172 /// a reply, and serial numbers are not very useful for signals either for the same reason. 173 /// 174 /// Since you do not need exclusive access to a `zbus::Connection` to send messages on the bus, 175 /// [`Sink`] is also implemented on `&Connection`. 176 /// 177 /// # Caveats 178 /// 179 /// At the moment, a simultaneous [flush request] from multiple tasks/threads could 180 /// potentially create a busy loop, thus wasting CPU time. This limitation may be removed in the 181 /// future. 182 /// 183 /// [flush request]: https://docs.rs/futures/0.3.15/futures/sink/trait.SinkExt.html#method.flush 184 /// 185 /// [method calls]: struct.Connection.html#method.call_method 186 /// [signals]: struct.Connection.html#method.emit_signal 187 /// [`dbus_proxy`]: attr.dbus_proxy.html 188 /// [`dbus_interface`]: attr.dbus_interface.html 189 /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html 190 /// [`set_max_queued`]: struct.Connection.html#method.set_max_queued 191 /// 192 /// ### Examples 193 /// 194 /// #### Get the session bus ID 195 /// 196 /// ``` 197 ///# use zvariant::Type; 198 ///# 199 ///# async_io::block_on(async { 200 /// use zbus::Connection; 201 /// 202 /// let mut connection = Connection::session().await?; 203 /// 204 /// let reply = connection 205 /// .call_method( 206 /// Some("org.freedesktop.DBus"), 207 /// "/org/freedesktop/DBus", 208 /// Some("org.freedesktop.DBus"), 209 /// "GetId", 210 /// &(), 211 /// ) 212 /// .await?; 213 /// 214 /// let id: &str = reply.body()?; 215 /// println!("Unique ID of the bus: {}", id); 216 ///# Ok::<(), zbus::Error>(()) 217 ///# }); 218 /// ``` 219 /// 220 /// #### Monitoring all messages 221 /// 222 /// Let's eavesdrop on the session bus using the [Monitor] interface: 223 /// 224 /// ```rust,no_run 225 ///# async_io::block_on(async { 226 /// use futures_util::stream::TryStreamExt; 227 /// use zbus::{Connection, MessageStream}; 228 /// 229 /// let connection = Connection::session().await?; 230 /// 231 /// connection 232 /// .call_method( 233 /// Some("org.freedesktop.DBus"), 234 /// "/org/freedesktop/DBus", 235 /// Some("org.freedesktop.DBus.Monitoring"), 236 /// "BecomeMonitor", 237 /// &(&[] as &[&str], 0u32), 238 /// ) 239 /// .await?; 240 /// 241 /// let mut stream = MessageStream::from(connection); 242 /// while let Some(msg) = stream.try_next().await? { 243 /// println!("Got message: {}", msg); 244 /// } 245 /// 246 ///# Ok::<(), zbus::Error>(()) 247 ///# }); 248 /// ``` 249 /// 250 /// This should print something like: 251 /// 252 /// ```console 253 /// Got message: Signal NameAcquired from org.freedesktop.DBus 254 /// Got message: Signal NameLost from org.freedesktop.DBus 255 /// Got message: Method call GetConnectionUnixProcessID from :1.1324 256 /// Got message: Error org.freedesktop.DBus.Error.NameHasNoOwner: 257 /// Could not get PID of name ':1.1332': no such name from org.freedesktop.DBus 258 /// Got message: Method call AddMatch from :1.918 259 /// Got message: Method return from org.freedesktop.DBus 260 /// ``` 261 /// 262 /// [Monitor]: https://dbus.freedesktop.org/doc/dbus-specification.html#bus-messages-become-monitor 263 #[derive(Clone, Debug)] 264 pub struct Connection { 265 pub(crate) inner: Arc<ConnectionInner>, 266 267 pub(crate) msg_receiver: InactiveReceiver<Arc<Message>>, 268 269 // Receiver side of the error channel 270 pub(crate) error_receiver: Receiver<Error>, 271 } 272 273 assert_impl_all!(Connection: Send, Sync, Unpin); 274 275 /// A method call whose completion can be awaited or joined with other streams. 276 /// 277 /// This is useful for cache population method calls, where joining the [`JoinableStream`] with 278 /// an update signal stream can be used to ensure that cache updates are not overwritten by a cache 279 /// population whose task is scheduled later. 280 #[derive(Debug)] 281 pub(crate) struct PendingMethodCall { 282 stream: Option<MessageStream>, 283 serial: u32, 284 } 285 286 impl Future for PendingMethodCall { 287 type Output = Result<Arc<Message>>; 288 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>289 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 290 self.poll_before(cx, None).map(|ret| { 291 ret.map(|(_, r)| r).unwrap_or_else(|| { 292 Err(crate::Error::Io(io::Error::new( 293 ErrorKind::BrokenPipe, 294 "socket closed", 295 ))) 296 }) 297 }) 298 } 299 } 300 301 impl OrderedFuture for PendingMethodCall { 302 type Output = Result<Arc<Message>>; 303 type Ordering = zbus::MessageSequence; 304 poll_before( self: Pin<&mut Self>, cx: &mut Context<'_>, before: Option<&Self::Ordering>, ) -> Poll<Option<(Self::Ordering, Self::Output)>>305 fn poll_before( 306 self: Pin<&mut Self>, 307 cx: &mut Context<'_>, 308 before: Option<&Self::Ordering>, 309 ) -> Poll<Option<(Self::Ordering, Self::Output)>> { 310 let this = self.get_mut(); 311 if let Some(stream) = &mut this.stream { 312 loop { 313 match Pin::new(&mut *stream).poll_next_before(cx, before) { 314 Poll::Ready(PollResult::Item { 315 data: Ok(msg), 316 ordering, 317 }) => { 318 if msg.reply_serial() != Some(this.serial) { 319 continue; 320 } 321 let res = match msg.message_type() { 322 MessageType::Error => Err(msg.into()), 323 MessageType::MethodReturn => Ok(msg), 324 _ => continue, 325 }; 326 this.stream = None; 327 return Poll::Ready(Some((ordering, res))); 328 } 329 Poll::Ready(PollResult::Item { 330 data: Err(e), 331 ordering, 332 }) => { 333 return Poll::Ready(Some((ordering, Err(e)))); 334 } 335 336 Poll::Ready(PollResult::NoneBefore) => { 337 return Poll::Ready(None); 338 } 339 Poll::Ready(PollResult::Terminated) => { 340 return Poll::Ready(None); 341 } 342 Poll::Pending => return Poll::Pending, 343 } 344 } 345 } 346 Poll::Ready(None) 347 } 348 } 349 350 impl Connection { 351 /// Send `msg` to the peer. 352 /// 353 /// Unlike our [`Sink`] implementation, this method sets a unique (to this connection) serial 354 /// number on the message before sending it off, for you. 355 /// 356 /// On successfully sending off `msg`, the assigned serial number is returned. send_message(&self, mut msg: Message) -> Result<u32>357 pub async fn send_message(&self, mut msg: Message) -> Result<u32> { 358 let serial = self.assign_serial_num(&mut msg)?; 359 360 (&*self).send(msg).await?; 361 362 Ok(serial) 363 } 364 365 /// Send a method call. 366 /// 367 /// Create a method-call message, send it over the connection, then wait for the reply. 368 /// 369 /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus 370 /// error replies are returned as [`Error::MethodError`]. call_method<'d, 'p, 'i, 'm, D, P, I, M, B>( &self, destination: Option<D>, path: P, interface: Option<I>, method_name: M, body: &B, ) -> Result<Arc<Message>> where D: TryInto<BusName<'d>>, P: TryInto<ObjectPath<'p>>, I: TryInto<InterfaceName<'i>>, M: TryInto<MemberName<'m>>, D::Error: Into<Error>, P::Error: Into<Error>, I::Error: Into<Error>, M::Error: Into<Error>, B: serde::ser::Serialize + zvariant::DynamicType,371 pub async fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>( 372 &self, 373 destination: Option<D>, 374 path: P, 375 interface: Option<I>, 376 method_name: M, 377 body: &B, 378 ) -> Result<Arc<Message>> 379 where 380 D: TryInto<BusName<'d>>, 381 P: TryInto<ObjectPath<'p>>, 382 I: TryInto<InterfaceName<'i>>, 383 M: TryInto<MemberName<'m>>, 384 D::Error: Into<Error>, 385 P::Error: Into<Error>, 386 I::Error: Into<Error>, 387 M::Error: Into<Error>, 388 B: serde::ser::Serialize + zvariant::DynamicType, 389 { 390 let m = Message::method( 391 self.unique_name(), 392 destination, 393 path, 394 interface, 395 method_name, 396 body, 397 )?; 398 self.call_method_raw(m).await?.await 399 } 400 401 /// Send a method call. 402 /// 403 /// Send the given message, which must be a method call, over the connection and return an 404 /// object that allows the reply to be retrieved. Typically you'd want to use 405 /// [`Connection::call_method`] instead. call_method_raw(&self, msg: Message) -> Result<PendingMethodCall>406 pub(crate) async fn call_method_raw(&self, msg: Message) -> Result<PendingMethodCall> { 407 debug_assert_eq!(msg.message_type(), MessageType::MethodCall); 408 409 let stream = Some(MessageStream::from(self.clone())); 410 let serial = self.send_message(msg).await?; 411 412 Ok(PendingMethodCall { stream, serial }) 413 } 414 415 /// Emit a signal. 416 /// 417 /// Create a signal message, and send it over the connection. emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>( &self, destination: Option<D>, path: P, interface: I, signal_name: M, body: &B, ) -> Result<()> where D: TryInto<BusName<'d>>, P: TryInto<ObjectPath<'p>>, I: TryInto<InterfaceName<'i>>, M: TryInto<MemberName<'m>>, D::Error: Into<Error>, P::Error: Into<Error>, I::Error: Into<Error>, M::Error: Into<Error>, B: serde::ser::Serialize + zvariant::DynamicType,418 pub async fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>( 419 &self, 420 destination: Option<D>, 421 path: P, 422 interface: I, 423 signal_name: M, 424 body: &B, 425 ) -> Result<()> 426 where 427 D: TryInto<BusName<'d>>, 428 P: TryInto<ObjectPath<'p>>, 429 I: TryInto<InterfaceName<'i>>, 430 M: TryInto<MemberName<'m>>, 431 D::Error: Into<Error>, 432 P::Error: Into<Error>, 433 I::Error: Into<Error>, 434 M::Error: Into<Error>, 435 B: serde::ser::Serialize + zvariant::DynamicType, 436 { 437 let m = Message::signal( 438 self.unique_name(), 439 destination, 440 path, 441 interface, 442 signal_name, 443 body, 444 )?; 445 446 self.send_message(m).await.map(|_| ()) 447 } 448 449 /// Reply to a message. 450 /// 451 /// Given an existing message (likely a method call), send a reply back to the caller with the 452 /// given `body`. 453 /// 454 /// Returns the message serial number. reply<B>(&self, call: &Message, body: &B) -> Result<u32> where B: serde::ser::Serialize + zvariant::DynamicType,455 pub async fn reply<B>(&self, call: &Message, body: &B) -> Result<u32> 456 where 457 B: serde::ser::Serialize + zvariant::DynamicType, 458 { 459 let m = Message::method_reply(self.unique_name(), call, body)?; 460 self.send_message(m).await 461 } 462 463 /// Reply an error to a message. 464 /// 465 /// Given an existing message (likely a method call), send an error reply back to the caller 466 /// with the given `error_name` and `body`. 467 /// 468 /// Returns the message serial number. reply_error<'e, E, B>( &self, call: &Message, error_name: E, body: &B, ) -> Result<u32> where B: serde::ser::Serialize + zvariant::DynamicType, E: TryInto<ErrorName<'e>>, E::Error: Into<Error>,469 pub async fn reply_error<'e, E, B>( 470 &self, 471 call: &Message, 472 error_name: E, 473 body: &B, 474 ) -> Result<u32> 475 where 476 B: serde::ser::Serialize + zvariant::DynamicType, 477 E: TryInto<ErrorName<'e>>, 478 E::Error: Into<Error>, 479 { 480 let m = Message::method_error(self.unique_name(), call, error_name, body)?; 481 self.send_message(m).await 482 } 483 484 /// Reply an error to a message. 485 /// 486 /// Given an existing message (likely a method call), send an error reply back to the caller 487 /// using one of the standard interface reply types. 488 /// 489 /// Returns the message serial number. reply_dbus_error( &self, call: &zbus::MessageHeader<'_>, err: impl DBusError, ) -> Result<u32>490 pub async fn reply_dbus_error( 491 &self, 492 call: &zbus::MessageHeader<'_>, 493 err: impl DBusError, 494 ) -> Result<u32> { 495 let m = err.create_reply(call); 496 self.send_message(m?).await 497 } 498 499 /// Register a well-known name for this service on the bus. 500 /// 501 /// You can request multiple names for the same `ObjectServer`. Use [`Connection::release_name`] 502 /// for deregistering names registered through this method. 503 /// 504 /// Note that exclusive ownership without queueing is requested (using 505 /// [`fdo::RequestNameFlags::ReplaceExisting`] and [`fdo::RequestNameFlags::DoNotQueue`] flags) 506 /// since that is the most typical case. If that is not what you want, you should use 507 /// [`fdo::DBusProxy::request_name`] instead (but make sure then that name is requested 508 /// **after** you've setup your service implementation with the `ObjectServer`). 509 /// 510 /// # Errors 511 /// 512 /// Fails with `zbus::Error::NameTaken` if the name is already owned by another peer. request_name<'w, W>(&self, well_known_name: W) -> Result<()> where W: TryInto<WellKnownName<'w>>, W::Error: Into<Error>,513 pub async fn request_name<'w, W>(&self, well_known_name: W) -> Result<()> 514 where 515 W: TryInto<WellKnownName<'w>>, 516 W::Error: Into<Error>, 517 { 518 let well_known_name = well_known_name.try_into().map_err(Into::into)?; 519 let mut names = self.inner.registered_names.lock().await; 520 521 if !names.contains(&well_known_name) { 522 // Ensure ObjectServer and its msg stream exists and reading before registering any 523 // names. Otherwise we get issue#68 (that we warn the user about in the docs of this 524 // method). 525 self.object_server(); 526 527 let reply = fdo::DBusProxy::builder(self) 528 .cache_properties(CacheProperties::No) 529 .build() 530 .await? 531 .request_name( 532 well_known_name.clone(), 533 fdo::RequestNameFlags::ReplaceExisting | fdo::RequestNameFlags::DoNotQueue, 534 ) 535 .await?; 536 if let fdo::RequestNameReply::Exists = reply { 537 Err(Error::NameTaken) 538 } else { 539 names.insert(well_known_name.to_owned()); 540 Ok(()) 541 } 542 } else { 543 Ok(()) 544 } 545 } 546 547 /// Deregister a previously registered well-known name for this service on the bus. 548 /// 549 /// Use this method to deregister a well-known name, registered through 550 /// [`Connection::request_name`]. 551 /// 552 /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with 553 /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name 554 /// was not previously registered or already deregistered. release_name<'w, W>(&self, well_known_name: W) -> Result<bool> where W: TryInto<WellKnownName<'w>>, W::Error: Into<Error>,555 pub async fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool> 556 where 557 W: TryInto<WellKnownName<'w>>, 558 W::Error: Into<Error>, 559 { 560 let well_known_name: WellKnownName<'w> = well_known_name.try_into().map_err(Into::into)?; 561 let mut names = self.inner.registered_names.lock().await; 562 // FIXME: Should be possible to avoid cloning/allocation here 563 if !names.remove(&well_known_name.to_owned()) { 564 return Ok(false); 565 } 566 567 fdo::DBusProxy::builder(self) 568 .cache_properties(CacheProperties::No) 569 .build() 570 .await? 571 .release_name(well_known_name) 572 .await 573 .map(|_| true) 574 .map_err(Into::into) 575 } 576 577 /// Checks if `self` is a connection to a message bus. 578 /// 579 /// This will return `false` for p2p connections. is_bus(&self) -> bool580 pub fn is_bus(&self) -> bool { 581 self.inner.bus_conn 582 } 583 584 /// Assigns a serial number to `msg` that is unique to this connection. 585 /// 586 /// This method can fail if `msg` is corrupted. assign_serial_num(&self, msg: &mut Message) -> Result<u32>587 pub fn assign_serial_num(&self, msg: &mut Message) -> Result<u32> { 588 let mut serial = 0; 589 msg.modify_primary_header(|primary| { 590 serial = *primary.serial_num_or_init(|| self.next_serial()); 591 Ok(()) 592 })?; 593 594 Ok(serial) 595 } 596 597 /// The unique name as assigned by the message bus or `None` if not a message bus connection. unique_name(&self) -> Option<&OwnedUniqueName>598 pub fn unique_name(&self) -> Option<&OwnedUniqueName> { 599 self.inner.unique_name.get() 600 } 601 602 /// Max number of messages to queue. max_queued(&self) -> usize603 pub fn max_queued(&self) -> usize { 604 self.msg_receiver.capacity() 605 } 606 607 /// Set the max number of messages to queue. set_max_queued(&mut self, max: usize)608 pub fn set_max_queued(&mut self, max: usize) { 609 self.msg_receiver.set_capacity(max); 610 } 611 612 /// The server's GUID. server_guid(&self) -> &str613 pub fn server_guid(&self) -> &str { 614 self.inner.server_guid.as_str() 615 } 616 617 /// The underlying executor. 618 /// 619 /// When a connection is built with internal_executor set to false, zbus will not spawn a 620 /// thread to run the executor. You're responsible to continuously [tick the executor][tte]. 621 /// Failure to do so will result in hangs. 622 /// 623 /// # Examples 624 /// 625 /// Here is how one would typically run the zbus executor through tokio's single-threaded 626 /// scheduler: 627 /// 628 /// ``` 629 /// use zbus::ConnectionBuilder; 630 /// use tokio::runtime; 631 /// 632 /// runtime::Builder::new_current_thread() 633 /// .build() 634 /// .unwrap() 635 /// .block_on(async { 636 /// let conn = ConnectionBuilder::session() 637 /// .unwrap() 638 /// .internal_executor(false) 639 /// .build() 640 /// .await 641 /// .unwrap(); 642 /// { 643 /// let conn = conn.clone(); 644 /// tokio::task::spawn(async move { 645 /// loop { 646 /// conn.executor().tick().await; 647 /// } 648 /// }); 649 /// } 650 /// 651 /// // All your other async code goes here. 652 /// }); 653 /// ``` 654 /// 655 /// **Note**: zbus 2.1 added support for tight integration with tokio. This means, if you use 656 /// zbus with tokio, you do not need to worry about this at all. All you need to do is enable 657 /// `tokio` feature and disable the (default) `async-io` feature in your `Cargo.toml`. 658 /// 659 /// [tte]: https://docs.rs/async-executor/1.4.1/async_executor/struct.Executor.html#method.tick executor(&self) -> &Executor<'static>660 pub fn executor(&self) -> &Executor<'static> { 661 &self.inner.executor 662 } 663 664 /// Get a reference to the associated [`ObjectServer`]. 665 /// 666 /// The `ObjectServer` is created on-demand. object_server(&self) -> impl Deref<Target = ObjectServer> + '_667 pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ { 668 // FIXME: Maybe it makes sense after all to implement Deref<Target= ObjectServer> for 669 // crate::ObjectServer instead of this wrapper? 670 struct Wrapper<'a>(&'a blocking::ObjectServer); 671 impl<'a> Deref for Wrapper<'a> { 672 type Target = ObjectServer; 673 674 fn deref(&self) -> &Self::Target { 675 self.0.inner() 676 } 677 } 678 679 Wrapper(self.sync_object_server(true)) 680 } 681 sync_object_server(&self, start: bool) -> &blocking::ObjectServer682 pub(crate) fn sync_object_server(&self, start: bool) -> &blocking::ObjectServer { 683 self.inner 684 .object_server 685 .get_or_init(|| self.setup_object_server(start)) 686 } 687 setup_object_server(&self, start: bool) -> blocking::ObjectServer688 fn setup_object_server(&self, start: bool) -> blocking::ObjectServer { 689 if start { 690 self.start_object_server(); 691 } 692 693 blocking::ObjectServer::new(self) 694 } 695 start_object_server(&self)696 pub(crate) fn start_object_server(&self) { 697 self.inner.object_server_dispatch_task.get_or_init(|| { 698 let weak_conn = WeakConnection::from(self); 699 let mut stream = MessageStream::from(self.clone()); 700 701 self.inner.executor.spawn(async move { 702 // TODO: Log errors when we've logging. 703 while let Some(msg) = stream.next().await.and_then(|m| m.ok()) { 704 if let Some(conn) = weak_conn.upgrade() { 705 let executor = conn.inner.executor.clone(); 706 executor 707 .spawn(async move { 708 let server = conn.object_server(); 709 let _ = server.dispatch_message(&msg).await; 710 }) 711 .detach(); 712 } else { 713 // If connection is completely gone, no reason to keep running the task anymore. 714 break; 715 } 716 } 717 }) 718 }); 719 } 720 add_match(&self, expr: String) -> Result<()>721 pub(crate) async fn add_match(&self, expr: String) -> Result<()> { 722 use std::collections::hash_map::Entry; 723 if !self.is_bus() { 724 return Ok(()); 725 } 726 let mut subscriptions = self.inner.signal_matches.lock().await; 727 match subscriptions.entry(expr) { 728 Entry::Vacant(e) => { 729 fdo::DBusProxy::builder(self) 730 .cache_properties(CacheProperties::No) 731 .build() 732 .await? 733 .add_match(e.key()) 734 .await?; 735 e.insert(1); 736 } 737 Entry::Occupied(mut e) => { 738 *e.get_mut() += 1; 739 } 740 } 741 Ok(()) 742 } 743 remove_match(&self, expr: String) -> Result<bool>744 pub(crate) async fn remove_match(&self, expr: String) -> Result<bool> { 745 use std::collections::hash_map::Entry; 746 if !self.is_bus() { 747 return Ok(true); 748 } 749 let mut subscriptions = self.inner.signal_matches.lock().await; 750 // TODO when it becomes stable, use HashMap::raw_entry and only require expr: &str 751 // (both here and in add_match) 752 match subscriptions.entry(expr) { 753 Entry::Vacant(_) => Ok(false), 754 Entry::Occupied(mut e) => { 755 *e.get_mut() -= 1; 756 if *e.get() == 0 { 757 fdo::DBusProxy::builder(self) 758 .cache_properties(CacheProperties::No) 759 .build() 760 .await? 761 .remove_match(e.key()) 762 .await?; 763 e.remove(); 764 } 765 Ok(true) 766 } 767 } 768 } 769 queue_remove_match(&self, expr: String)770 pub(crate) fn queue_remove_match(&self, expr: String) { 771 let conn = self.clone(); 772 self.inner 773 .executor 774 .spawn(async move { conn.remove_match(expr).await }) 775 .detach() 776 } 777 hello_bus(&self) -> Result<()>778 async fn hello_bus(&self) -> Result<()> { 779 let dbus_proxy = fdo::DBusProxy::builder(self) 780 .cache_properties(CacheProperties::No) 781 .build() 782 .await?; 783 let future = dbus_proxy.hello().map_err(Into::into); 784 let name = self.run_future_at_init(future).await?; 785 786 self.inner 787 .unique_name 788 .set(name) 789 // programmer (probably our) error if this fails. 790 .expect("Attempted to set unique_name twice"); 791 792 Ok(()) 793 } 794 795 // With external executor, our executor is only run after the connection construction is 796 // completed and some futures need to run to completion before that is done so we need to tick 797 // the executor ourselves in parallel to making the method call. With the internal executor, 798 /// this is not needed but harmless. run_future_at_init<F, O>(&self, future: F) -> Result<O> where F: Future<Output = Result<O>>,799 pub(crate) async fn run_future_at_init<F, O>(&self, future: F) -> Result<O> 800 where 801 F: Future<Output = Result<O>>, 802 { 803 let executor = self.inner.executor.clone(); 804 let ticking_future = async move { 805 // Keep running as long as this task/future is not cancelled. 806 loop { 807 executor.tick().await; 808 } 809 }; 810 811 futures_util::pin_mut!(future); 812 futures_util::pin_mut!(ticking_future); 813 814 match select(future, ticking_future).await { 815 Either::Left((res, _)) => res, 816 Either::Right((_, _)) => unreachable!("ticking task future shouldn't finish"), 817 } 818 } 819 new( auth: Authenticated<Box<dyn Socket>>, bus_connection: bool, internal_executor: bool, ) -> Result<Self>820 pub(crate) async fn new( 821 auth: Authenticated<Box<dyn Socket>>, 822 bus_connection: bool, 823 internal_executor: bool, 824 ) -> Result<Self> { 825 let auth = auth.into_inner(); 826 #[cfg(unix)] 827 let cap_unix_fd = auth.cap_unix_fd; 828 829 let (msg_sender, msg_receiver) = broadcast(DEFAULT_MAX_QUEUED); 830 let msg_receiver = msg_receiver.deactivate(); 831 let (error_sender, error_receiver) = bounded(1); 832 let executor = Arc::new(Executor::new()); 833 let raw_conn = Arc::new(sync::Mutex::new(auth.conn)); 834 835 // Start the message receiver task. 836 let msg_receiver_task = 837 MessageReceiverTask::new(raw_conn.clone(), msg_sender, error_sender).spawn(&executor); 838 839 let connection = Self { 840 error_receiver, 841 msg_receiver, 842 inner: Arc::new(ConnectionInner { 843 raw_conn, 844 server_guid: auth.server_guid, 845 #[cfg(unix)] 846 cap_unix_fd, 847 bus_conn: bus_connection, 848 serial: AtomicU32::new(1), 849 unique_name: OnceCell::new(), 850 signal_matches: Mutex::new(HashMap::new()), 851 object_server: OnceCell::new(), 852 object_server_dispatch_task: OnceCell::new(), 853 executor: executor.clone(), 854 msg_receiver_task, 855 registered_names: Mutex::new(HashSet::new()), 856 }), 857 }; 858 859 if internal_executor { 860 let ticker_future = async move { 861 // Run as long as there is a task to run. 862 while !executor.is_empty() { 863 executor.tick().await; 864 } 865 }; 866 #[cfg(feature = "async-io")] 867 std::thread::Builder::new() 868 .name("zbus::Connection executor".into()) 869 .spawn(move || crate::utils::block_on(ticker_future))?; 870 871 #[cfg(all(not(feature = "async-io"), feature = "tokio"))] 872 tokio::task::spawn(ticker_future); 873 } 874 875 if !bus_connection { 876 return Ok(connection); 877 } 878 879 // Now that the server has approved us, we must send the bus Hello, as per specs 880 connection.hello_bus().await?; 881 882 Ok(connection) 883 } 884 next_serial(&self) -> u32885 fn next_serial(&self) -> u32 { 886 self.inner.serial.fetch_add(1, SeqCst) 887 } 888 889 /// Create a `Connection` to the session/user message bus. session() -> Result<Self>890 pub async fn session() -> Result<Self> { 891 ConnectionBuilder::session()?.build().await 892 } 893 894 /// Create a `Connection` to the system-wide message bus. system() -> Result<Self>895 pub async fn system() -> Result<Self> { 896 ConnectionBuilder::system()?.build().await 897 } 898 899 /// Returns a listener, notified on various connection activity. 900 /// 901 /// This function is meant for the caller to implement idle or timeout on inactivity. monitor_activity(&self) -> EventListener902 pub fn monitor_activity(&self) -> EventListener { 903 self.inner 904 .raw_conn 905 .lock() 906 .expect("poisoned lock") 907 .monitor_activity() 908 } 909 } 910 911 impl Sink<Message> for Connection { 912 type Error = Error; 913 poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>914 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 915 Pin::new(&mut &*self).poll_ready(cx) 916 } 917 start_send(self: Pin<&mut Self>, msg: Message) -> Result<()>918 fn start_send(self: Pin<&mut Self>, msg: Message) -> Result<()> { 919 Pin::new(&mut &*self).start_send(msg) 920 } 921 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>922 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 923 Pin::new(&mut &*self).poll_flush(cx) 924 } 925 poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>926 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 927 Pin::new(&mut &*self).poll_close(cx) 928 } 929 } 930 931 impl<'a> Sink<Message> for &'a Connection { 932 type Error = Error; 933 poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>>934 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> { 935 // TODO: We should have a max queue length in raw::Socket for outgoing messages. 936 Poll::Ready(Ok(())) 937 } 938 start_send(self: Pin<&mut Self>, msg: Message) -> Result<()>939 fn start_send(self: Pin<&mut Self>, msg: Message) -> Result<()> { 940 #[cfg(unix)] 941 if !msg.fds().is_empty() && !self.inner.cap_unix_fd { 942 return Err(Error::Unsupported); 943 } 944 945 self.inner 946 .raw_conn 947 .lock() 948 .expect("poisoned lock") 949 .enqueue_message(msg); 950 951 Ok(()) 952 } 953 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>954 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 955 self.inner.raw_conn.lock().expect("poisoned lock").flush(cx) 956 } 957 poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>958 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { 959 let mut raw_conn = self.inner.raw_conn.lock().expect("poisoned lock"); 960 match ready!(raw_conn.flush(cx)) { 961 Ok(_) => (), 962 Err(e) => return Poll::Ready(Err(e)), 963 } 964 965 Poll::Ready(raw_conn.close()) 966 } 967 } 968 969 struct ReceiveMessage<'r> { 970 raw_conn: &'r sync::Mutex<RawConnection<Box<dyn Socket>>>, 971 } 972 973 impl<'r> Future for ReceiveMessage<'r> { 974 type Output = Result<Message>; 975 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>976 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 977 let mut raw_conn = self.raw_conn.lock().expect("poisoned lock"); 978 raw_conn.try_receive_message(cx) 979 } 980 } 981 982 impl From<crate::blocking::Connection> for Connection { from(conn: crate::blocking::Connection) -> Self983 fn from(conn: crate::blocking::Connection) -> Self { 984 conn.into_inner() 985 } 986 } 987 988 // Internal API that allows keeping a weak connection ref around. 989 #[derive(Debug)] 990 pub(crate) struct WeakConnection { 991 inner: Weak<ConnectionInner>, 992 msg_receiver: InactiveReceiver<Arc<Message>>, 993 error_receiver: Receiver<Error>, 994 } 995 996 impl WeakConnection { 997 /// Upgrade to a Connection. upgrade(&self) -> Option<Connection>998 pub fn upgrade(&self) -> Option<Connection> { 999 self.inner.upgrade().map(|inner| Connection { 1000 inner, 1001 msg_receiver: self.msg_receiver.clone(), 1002 error_receiver: self.error_receiver.clone(), 1003 }) 1004 } 1005 } 1006 1007 impl From<&Connection> for WeakConnection { from(conn: &Connection) -> Self1008 fn from(conn: &Connection) -> Self { 1009 Self { 1010 inner: Arc::downgrade(&conn.inner), 1011 msg_receiver: conn.msg_receiver.clone(), 1012 error_receiver: conn.error_receiver.clone(), 1013 } 1014 } 1015 } 1016 1017 #[cfg(test)] 1018 mod tests { 1019 use futures_util::stream::TryStreamExt; 1020 use ntest::timeout; 1021 use test_log::test; 1022 1023 #[cfg(all(unix, feature = "async-io"))] 1024 use crate::AuthMechanism; 1025 1026 use super::*; 1027 test_p2p(server: Connection, client: Connection) -> Result<()>1028 async fn test_p2p(server: Connection, client: Connection) -> Result<()> { 1029 let server_future = async { 1030 let mut method: Option<Arc<Message>> = None; 1031 let mut stream = MessageStream::from(&server); 1032 while let Some(m) = stream.try_next().await? { 1033 if m.to_string() == "Method call Test" { 1034 method.replace(m); 1035 1036 break; 1037 } 1038 } 1039 let method = method.unwrap(); 1040 1041 // Send another message first to check the queueing function on client side. 1042 server 1043 .emit_signal(None::<()>, "/", "org.zbus.p2p", "ASignalForYou", &()) 1044 .await?; 1045 server.reply(&method, &("yay")).await 1046 }; 1047 1048 let client_future = async { 1049 let mut stream = MessageStream::from(&client); 1050 let reply = client 1051 .call_method(None::<()>, "/", Some("org.zbus.p2p"), "Test", &()) 1052 .await?; 1053 assert_eq!(reply.to_string(), "Method return"); 1054 // Check we didn't miss the signal that was sent during the call. 1055 let m = stream.try_next().await?.unwrap(); 1056 assert_eq!(m.to_string(), "Signal ASignalForYou"); 1057 reply.body::<String>() 1058 }; 1059 1060 let (val, _) = futures_util::try_join!(client_future, server_future)?; 1061 assert_eq!(val, "yay"); 1062 1063 Ok(()) 1064 } 1065 1066 // FIXME: Make it work with tokio as well. 1067 #[cfg(feature = "async-io")] 1068 #[test] 1069 #[timeout(15000)] tcp_p2p()1070 fn tcp_p2p() { 1071 crate::utils::block_on(test_tcp_p2p()).unwrap(); 1072 } 1073 1074 #[cfg(feature = "async-io")] test_tcp_p2p() -> Result<()>1075 async fn test_tcp_p2p() -> Result<()> { 1076 let guid = Guid::generate(); 1077 1078 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); 1079 let addr = listener.local_addr().unwrap(); 1080 let client = std::net::TcpStream::connect(addr).unwrap(); 1081 let server = listener.incoming().next().unwrap().unwrap(); 1082 1083 let server = { 1084 let c = ConnectionBuilder::tcp_stream(server).server(&guid).p2p(); 1085 1086 // EXTERNAL is only implemented on win32 with TCP sockets 1087 #[cfg(unix)] 1088 let c = c.auth_mechanisms(&[AuthMechanism::Anonymous]); 1089 1090 c.build() 1091 }; 1092 let client = ConnectionBuilder::tcp_stream(client).p2p().build(); 1093 let (client, server) = futures_util::try_join!(client, server)?; 1094 1095 test_p2p(server, client).await 1096 } 1097 1098 #[cfg(unix)] 1099 #[test] 1100 #[timeout(15000)] unix_p2p()1101 fn unix_p2p() { 1102 crate::utils::block_on(test_unix_p2p()).unwrap(); 1103 } 1104 1105 #[cfg(unix)] test_unix_p2p() -> Result<()>1106 async fn test_unix_p2p() -> Result<()> { 1107 #[cfg(feature = "async-io")] 1108 use std::os::unix::net::UnixStream; 1109 #[cfg(all(not(feature = "async-io"), feature = "tokio"))] 1110 use tokio::net::UnixStream; 1111 1112 let guid = Guid::generate(); 1113 1114 let (p0, p1) = UnixStream::pair().unwrap(); 1115 1116 let server = ConnectionBuilder::unix_stream(p0) 1117 .server(&guid) 1118 .p2p() 1119 .build(); 1120 let client = ConnectionBuilder::unix_stream(p1).p2p().build(); 1121 let (client, server) = futures_util::try_join!(client, server)?; 1122 1123 test_p2p(server, client).await 1124 } 1125 1126 #[test] 1127 #[timeout(15000)] serial_monotonically_increases()1128 fn serial_monotonically_increases() { 1129 crate::utils::block_on(test_serial_monotonically_increases()); 1130 } 1131 test_serial_monotonically_increases()1132 async fn test_serial_monotonically_increases() { 1133 let c = Connection::session().await.unwrap(); 1134 let serial = c.next_serial() + 1; 1135 1136 for next in serial..serial + 10 { 1137 assert_eq!(next, c.next_serial()); 1138 } 1139 } 1140 } 1141