1 use async_broadcast::{broadcast, InactiveReceiver, Sender as Broadcaster};
2 use async_channel::{bounded, Receiver, Sender};
3 use async_executor::Executor;
4 use async_lock::Mutex;
5 use async_task::Task;
6 use event_listener::EventListener;
7 use once_cell::sync::OnceCell;
8 use ordered_stream::{OrderedFuture, OrderedStream, PollResult};
9 use static_assertions::assert_impl_all;
10 use std::{
11     collections::{HashMap, HashSet},
12     convert::TryInto,
13     io::{self, ErrorKind},
14     ops::Deref,
15     pin::Pin,
16     sync::{
17         self,
18         atomic::{AtomicU32, Ordering::SeqCst},
19         Arc, Weak,
20     },
21     task::{Context, Poll},
22 };
23 use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName};
24 use zvariant::ObjectPath;
25 
26 use futures_core::{ready, Future};
27 use futures_sink::Sink;
28 use futures_util::{
29     future::{select, Either},
30     sink::SinkExt,
31     StreamExt, TryFutureExt,
32 };
33 
34 use crate::{
35     blocking, fdo,
36     raw::{Connection as RawConnection, Socket},
37     Authenticated, CacheProperties, ConnectionBuilder, DBusError, Error, Guid, Message,
38     MessageStream, MessageType, ObjectServer, Result,
39 };
40 
41 const DEFAULT_MAX_QUEUED: usize = 64;
42 
43 /// Inner state shared by Connection and WeakConnection
44 #[derive(Debug)]
45 pub(crate) struct ConnectionInner {
46     server_guid: Guid,
47     #[cfg(unix)]
48     cap_unix_fd: bool,
49     bus_conn: bool,
50     unique_name: OnceCell<OwnedUniqueName>,
51     registered_names: Mutex<HashSet<WellKnownName<'static>>>,
52 
53     raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>,
54 
55     // Serial number for next outgoing message
56     serial: AtomicU32,
57 
58     // Our executor
59     executor: Arc<Executor<'static>>,
60 
61     // Message receiver task
62     #[allow(unused)]
63     msg_receiver_task: Task<()>,
64 
65     signal_matches: Mutex<HashMap<String, u64>>,
66 
67     object_server: OnceCell<blocking::ObjectServer>,
68     object_server_dispatch_task: OnceCell<Task<()>>,
69 }
70 
71 // FIXME: Should really use [`AsyncDrop`] for `ConnectionInner` when we've something like that to
72 //        cancel `msg_receiver_task` manually to ensure task is gone before the connection is. Same
73 //        goes for the registered well-known names.
74 //
75 // [`AsyncDrop`]: https://github.com/rust-lang/wg-async-foundations/issues/65
76 
77 #[derive(Debug)]
78 struct MessageReceiverTask {
79     raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>,
80 
81     // Message broadcaster.
82     msg_sender: Broadcaster<Arc<Message>>,
83 
84     // Sender side of the error channel
85     error_sender: Sender<Error>,
86 }
87 
88 impl MessageReceiverTask {
new( raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>, msg_sender: Broadcaster<Arc<Message>>, error_sender: Sender<Error>, ) -> Arc<Self>89     fn new(
90         raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>,
91         msg_sender: Broadcaster<Arc<Message>>,
92         error_sender: Sender<Error>,
93     ) -> Arc<Self> {
94         Arc::new(Self {
95             raw_conn,
96             msg_sender,
97             error_sender,
98         })
99     }
100 
spawn(self: Arc<Self>, executor: &Executor<'_>) -> Task<()>101     fn spawn(self: Arc<Self>, executor: &Executor<'_>) -> Task<()> {
102         executor.spawn(async move {
103             self.receive_msg().await;
104         })
105     }
106 
107     // Keep receiving messages and put them on the queue.
receive_msg(self: Arc<Self>)108     async fn receive_msg(self: Arc<Self>) {
109         loop {
110             let receive_msg = ReceiveMessage {
111                 raw_conn: &self.raw_conn,
112             };
113             let msg = match receive_msg.await {
114                 Ok(msg) => msg,
115                 Err(e) => {
116                     // Ignore errors when sending the error; this happens if the channel is
117                     // being dropped.
118                     //
119                     // This can be logged when we have logging, though that's mostly useless: it
120                     // only happens when the receive_msg task is running at the time the last
121                     // Connection is dropped.
122                     let _ = self.error_sender.send(e).await;
123                     self.msg_sender.close();
124                     self.error_sender.close();
125                     return;
126                 }
127             };
128 
129             let msg = Arc::new(msg);
130             if self.msg_sender.broadcast(msg.clone()).await.is_err() {
131                 // An error would be due to the channel being closed, which only happens when the
132                 // connection is dropped, so just stop the task.  See comment above about logging.
133                 return;
134             }
135         }
136     }
137 }
138 
139 /// A D-Bus connection.
140 ///
141 /// A connection to a D-Bus bus, or a direct peer.
142 ///
143 /// Once created, the connection is authenticated and negotiated and messages can be sent or
144 /// received, such as [method calls] or [signals].
145 ///
146 /// For higher-level message handling (typed functions, introspection, documentation reasons etc),
147 /// it is recommended to wrap the low-level D-Bus messages into Rust functions with the
148 /// [`dbus_proxy`] and [`dbus_interface`] macros instead of doing it directly on a `Connection`.
149 ///
150 /// Typically, a connection is made to the session bus with [`Connection::session`], or to the
151 /// system bus with [`Connection::system`]. Then the connection is used with [`crate::Proxy`]
152 /// instances or the on-demand [`ObjectServer`] instance that can be accessed through
153 /// [`Connection::object_server`].
154 ///
155 /// `Connection` implements [`Clone`] and cloning it is a very cheap operation, as the underlying
156 /// data is not cloned. This makes it very convenient to share the connection between different
157 /// parts of your code. `Connection` also implements [`std::marker::Sync`] and[`std::marker::Send`]
158 /// so you can send and share a connection instance across threads as well.
159 ///
160 /// `Connection` keeps an internal queue of incoming message. The maximum capacity of this queue
161 /// is configurable through the [`set_max_queued`] method. The default size is 64. When the queue is
162 /// full, no more messages can be received until room is created for more. This is why it's
163 /// important to ensure that all [`crate::MessageStream`] and [`crate::blocking::MessageIterator`]
164 /// instances are continuously polled and iterated on, respectively.
165 ///
166 /// For sending messages you can either use [`Connection::send_message`] method or make use of the
167 /// [`Sink`] implementation. For latter, you might find [`SinkExt`] API very useful. Keep in mind
168 /// that [`Connection`] will not manage the serial numbers (cookies) on the messages for you when
169 /// they are sent through the [`Sink`] implementation. You can manually assign unique serial numbers
170 /// to them using the [`Connection::assign_serial_num`] method before sending them off, if needed.
171 /// Having said that, the [`Sink`] is mainly useful for sending out signals, as they do not expect
172 /// a reply, and serial numbers are not very useful for signals either for the same reason.
173 ///
174 /// Since you do not need exclusive access to a `zbus::Connection` to send messages on the bus,
175 /// [`Sink`] is also implemented on `&Connection`.
176 ///
177 /// # Caveats
178 ///
179 /// At the moment, a simultaneous [flush request] from multiple tasks/threads could
180 /// potentially create a busy loop, thus wasting CPU time. This limitation may be removed in the
181 /// future.
182 ///
183 /// [flush request]: https://docs.rs/futures/0.3.15/futures/sink/trait.SinkExt.html#method.flush
184 ///
185 /// [method calls]: struct.Connection.html#method.call_method
186 /// [signals]: struct.Connection.html#method.emit_signal
187 /// [`dbus_proxy`]: attr.dbus_proxy.html
188 /// [`dbus_interface`]: attr.dbus_interface.html
189 /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
190 /// [`set_max_queued`]: struct.Connection.html#method.set_max_queued
191 ///
192 /// ### Examples
193 ///
194 /// #### Get the session bus ID
195 ///
196 /// ```
197 ///# use zvariant::Type;
198 ///#
199 ///# async_io::block_on(async {
200 /// use zbus::Connection;
201 ///
202 /// let mut connection = Connection::session().await?;
203 ///
204 /// let reply = connection
205 ///     .call_method(
206 ///         Some("org.freedesktop.DBus"),
207 ///         "/org/freedesktop/DBus",
208 ///         Some("org.freedesktop.DBus"),
209 ///         "GetId",
210 ///         &(),
211 ///     )
212 ///     .await?;
213 ///
214 /// let id: &str = reply.body()?;
215 /// println!("Unique ID of the bus: {}", id);
216 ///# Ok::<(), zbus::Error>(())
217 ///# });
218 /// ```
219 ///
220 /// #### Monitoring all messages
221 ///
222 /// Let's eavesdrop on the session bus �� using the [Monitor] interface:
223 ///
224 /// ```rust,no_run
225 ///# async_io::block_on(async {
226 /// use futures_util::stream::TryStreamExt;
227 /// use zbus::{Connection, MessageStream};
228 ///
229 /// let connection = Connection::session().await?;
230 ///
231 /// connection
232 ///     .call_method(
233 ///         Some("org.freedesktop.DBus"),
234 ///         "/org/freedesktop/DBus",
235 ///         Some("org.freedesktop.DBus.Monitoring"),
236 ///         "BecomeMonitor",
237 ///         &(&[] as &[&str], 0u32),
238 ///     )
239 ///     .await?;
240 ///
241 /// let mut stream = MessageStream::from(connection);
242 /// while let Some(msg) = stream.try_next().await? {
243 ///     println!("Got message: {}", msg);
244 /// }
245 ///
246 ///# Ok::<(), zbus::Error>(())
247 ///# });
248 /// ```
249 ///
250 /// This should print something like:
251 ///
252 /// ```console
253 /// Got message: Signal NameAcquired from org.freedesktop.DBus
254 /// Got message: Signal NameLost from org.freedesktop.DBus
255 /// Got message: Method call GetConnectionUnixProcessID from :1.1324
256 /// Got message: Error org.freedesktop.DBus.Error.NameHasNoOwner:
257 ///              Could not get PID of name ':1.1332': no such name from org.freedesktop.DBus
258 /// Got message: Method call AddMatch from :1.918
259 /// Got message: Method return from org.freedesktop.DBus
260 /// ```
261 ///
262 /// [Monitor]: https://dbus.freedesktop.org/doc/dbus-specification.html#bus-messages-become-monitor
263 #[derive(Clone, Debug)]
264 pub struct Connection {
265     pub(crate) inner: Arc<ConnectionInner>,
266 
267     pub(crate) msg_receiver: InactiveReceiver<Arc<Message>>,
268 
269     // Receiver side of the error channel
270     pub(crate) error_receiver: Receiver<Error>,
271 }
272 
273 assert_impl_all!(Connection: Send, Sync, Unpin);
274 
275 /// A method call whose completion can be awaited or joined with other streams.
276 ///
277 /// This is useful for cache population method calls, where joining the [`JoinableStream`] with
278 /// an update signal stream can be used to ensure that cache updates are not overwritten by a cache
279 /// population whose task is scheduled later.
280 #[derive(Debug)]
281 pub(crate) struct PendingMethodCall {
282     stream: Option<MessageStream>,
283     serial: u32,
284 }
285 
286 impl Future for PendingMethodCall {
287     type Output = Result<Arc<Message>>;
288 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>289     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
290         self.poll_before(cx, None).map(|ret| {
291             ret.map(|(_, r)| r).unwrap_or_else(|| {
292                 Err(crate::Error::Io(io::Error::new(
293                     ErrorKind::BrokenPipe,
294                     "socket closed",
295                 )))
296             })
297         })
298     }
299 }
300 
301 impl OrderedFuture for PendingMethodCall {
302     type Output = Result<Arc<Message>>;
303     type Ordering = zbus::MessageSequence;
304 
poll_before( self: Pin<&mut Self>, cx: &mut Context<'_>, before: Option<&Self::Ordering>, ) -> Poll<Option<(Self::Ordering, Self::Output)>>305     fn poll_before(
306         self: Pin<&mut Self>,
307         cx: &mut Context<'_>,
308         before: Option<&Self::Ordering>,
309     ) -> Poll<Option<(Self::Ordering, Self::Output)>> {
310         let this = self.get_mut();
311         if let Some(stream) = &mut this.stream {
312             loop {
313                 match Pin::new(&mut *stream).poll_next_before(cx, before) {
314                     Poll::Ready(PollResult::Item {
315                         data: Ok(msg),
316                         ordering,
317                     }) => {
318                         if msg.reply_serial() != Some(this.serial) {
319                             continue;
320                         }
321                         let res = match msg.message_type() {
322                             MessageType::Error => Err(msg.into()),
323                             MessageType::MethodReturn => Ok(msg),
324                             _ => continue,
325                         };
326                         this.stream = None;
327                         return Poll::Ready(Some((ordering, res)));
328                     }
329                     Poll::Ready(PollResult::Item {
330                         data: Err(e),
331                         ordering,
332                     }) => {
333                         return Poll::Ready(Some((ordering, Err(e))));
334                     }
335 
336                     Poll::Ready(PollResult::NoneBefore) => {
337                         return Poll::Ready(None);
338                     }
339                     Poll::Ready(PollResult::Terminated) => {
340                         return Poll::Ready(None);
341                     }
342                     Poll::Pending => return Poll::Pending,
343                 }
344             }
345         }
346         Poll::Ready(None)
347     }
348 }
349 
350 impl Connection {
351     /// Send `msg` to the peer.
352     ///
353     /// Unlike our [`Sink`] implementation, this method sets a unique (to this connection) serial
354     /// number on the message before sending it off, for you.
355     ///
356     /// On successfully sending off `msg`, the assigned serial number is returned.
send_message(&self, mut msg: Message) -> Result<u32>357     pub async fn send_message(&self, mut msg: Message) -> Result<u32> {
358         let serial = self.assign_serial_num(&mut msg)?;
359 
360         (&*self).send(msg).await?;
361 
362         Ok(serial)
363     }
364 
365     /// Send a method call.
366     ///
367     /// Create a method-call message, send it over the connection, then wait for the reply.
368     ///
369     /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus
370     /// error replies are returned as [`Error::MethodError`].
call_method<'d, 'p, 'i, 'm, D, P, I, M, B>( &self, destination: Option<D>, path: P, interface: Option<I>, method_name: M, body: &B, ) -> Result<Arc<Message>> where D: TryInto<BusName<'d>>, P: TryInto<ObjectPath<'p>>, I: TryInto<InterfaceName<'i>>, M: TryInto<MemberName<'m>>, D::Error: Into<Error>, P::Error: Into<Error>, I::Error: Into<Error>, M::Error: Into<Error>, B: serde::ser::Serialize + zvariant::DynamicType,371     pub async fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
372         &self,
373         destination: Option<D>,
374         path: P,
375         interface: Option<I>,
376         method_name: M,
377         body: &B,
378     ) -> Result<Arc<Message>>
379     where
380         D: TryInto<BusName<'d>>,
381         P: TryInto<ObjectPath<'p>>,
382         I: TryInto<InterfaceName<'i>>,
383         M: TryInto<MemberName<'m>>,
384         D::Error: Into<Error>,
385         P::Error: Into<Error>,
386         I::Error: Into<Error>,
387         M::Error: Into<Error>,
388         B: serde::ser::Serialize + zvariant::DynamicType,
389     {
390         let m = Message::method(
391             self.unique_name(),
392             destination,
393             path,
394             interface,
395             method_name,
396             body,
397         )?;
398         self.call_method_raw(m).await?.await
399     }
400 
401     /// Send a method call.
402     ///
403     /// Send the given message, which must be a method call, over the connection and return an
404     /// object that allows the reply to be retrieved.  Typically you'd want to use
405     /// [`Connection::call_method`] instead.
call_method_raw(&self, msg: Message) -> Result<PendingMethodCall>406     pub(crate) async fn call_method_raw(&self, msg: Message) -> Result<PendingMethodCall> {
407         debug_assert_eq!(msg.message_type(), MessageType::MethodCall);
408 
409         let stream = Some(MessageStream::from(self.clone()));
410         let serial = self.send_message(msg).await?;
411 
412         Ok(PendingMethodCall { stream, serial })
413     }
414 
415     /// Emit a signal.
416     ///
417     /// Create a signal message, and send it over the connection.
emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>( &self, destination: Option<D>, path: P, interface: I, signal_name: M, body: &B, ) -> Result<()> where D: TryInto<BusName<'d>>, P: TryInto<ObjectPath<'p>>, I: TryInto<InterfaceName<'i>>, M: TryInto<MemberName<'m>>, D::Error: Into<Error>, P::Error: Into<Error>, I::Error: Into<Error>, M::Error: Into<Error>, B: serde::ser::Serialize + zvariant::DynamicType,418     pub async fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
419         &self,
420         destination: Option<D>,
421         path: P,
422         interface: I,
423         signal_name: M,
424         body: &B,
425     ) -> Result<()>
426     where
427         D: TryInto<BusName<'d>>,
428         P: TryInto<ObjectPath<'p>>,
429         I: TryInto<InterfaceName<'i>>,
430         M: TryInto<MemberName<'m>>,
431         D::Error: Into<Error>,
432         P::Error: Into<Error>,
433         I::Error: Into<Error>,
434         M::Error: Into<Error>,
435         B: serde::ser::Serialize + zvariant::DynamicType,
436     {
437         let m = Message::signal(
438             self.unique_name(),
439             destination,
440             path,
441             interface,
442             signal_name,
443             body,
444         )?;
445 
446         self.send_message(m).await.map(|_| ())
447     }
448 
449     /// Reply to a message.
450     ///
451     /// Given an existing message (likely a method call), send a reply back to the caller with the
452     /// given `body`.
453     ///
454     /// Returns the message serial number.
reply<B>(&self, call: &Message, body: &B) -> Result<u32> where B: serde::ser::Serialize + zvariant::DynamicType,455     pub async fn reply<B>(&self, call: &Message, body: &B) -> Result<u32>
456     where
457         B: serde::ser::Serialize + zvariant::DynamicType,
458     {
459         let m = Message::method_reply(self.unique_name(), call, body)?;
460         self.send_message(m).await
461     }
462 
463     /// Reply an error to a message.
464     ///
465     /// Given an existing message (likely a method call), send an error reply back to the caller
466     /// with the given `error_name` and `body`.
467     ///
468     /// Returns the message serial number.
reply_error<'e, E, B>( &self, call: &Message, error_name: E, body: &B, ) -> Result<u32> where B: serde::ser::Serialize + zvariant::DynamicType, E: TryInto<ErrorName<'e>>, E::Error: Into<Error>,469     pub async fn reply_error<'e, E, B>(
470         &self,
471         call: &Message,
472         error_name: E,
473         body: &B,
474     ) -> Result<u32>
475     where
476         B: serde::ser::Serialize + zvariant::DynamicType,
477         E: TryInto<ErrorName<'e>>,
478         E::Error: Into<Error>,
479     {
480         let m = Message::method_error(self.unique_name(), call, error_name, body)?;
481         self.send_message(m).await
482     }
483 
484     /// Reply an error to a message.
485     ///
486     /// Given an existing message (likely a method call), send an error reply back to the caller
487     /// using one of the standard interface reply types.
488     ///
489     /// Returns the message serial number.
reply_dbus_error( &self, call: &zbus::MessageHeader<'_>, err: impl DBusError, ) -> Result<u32>490     pub async fn reply_dbus_error(
491         &self,
492         call: &zbus::MessageHeader<'_>,
493         err: impl DBusError,
494     ) -> Result<u32> {
495         let m = err.create_reply(call);
496         self.send_message(m?).await
497     }
498 
499     /// Register a well-known name for this service on the bus.
500     ///
501     /// You can request multiple names for the same `ObjectServer`. Use [`Connection::release_name`]
502     /// for deregistering names registered through this method.
503     ///
504     /// Note that exclusive ownership without queueing is requested (using
505     /// [`fdo::RequestNameFlags::ReplaceExisting`] and [`fdo::RequestNameFlags::DoNotQueue`] flags)
506     /// since that is the most typical case. If that is not what you want, you should use
507     /// [`fdo::DBusProxy::request_name`] instead (but make sure then that name is requested
508     /// **after** you've setup your service implementation with the `ObjectServer`).
509     ///
510     /// # Errors
511     ///
512     /// Fails with `zbus::Error::NameTaken` if the name is already owned by another peer.
request_name<'w, W>(&self, well_known_name: W) -> Result<()> where W: TryInto<WellKnownName<'w>>, W::Error: Into<Error>,513     pub async fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
514     where
515         W: TryInto<WellKnownName<'w>>,
516         W::Error: Into<Error>,
517     {
518         let well_known_name = well_known_name.try_into().map_err(Into::into)?;
519         let mut names = self.inner.registered_names.lock().await;
520 
521         if !names.contains(&well_known_name) {
522             // Ensure ObjectServer and its msg stream exists and reading before registering any
523             // names. Otherwise we get issue#68 (that we warn the user about in the docs of this
524             // method).
525             self.object_server();
526 
527             let reply = fdo::DBusProxy::builder(self)
528                 .cache_properties(CacheProperties::No)
529                 .build()
530                 .await?
531                 .request_name(
532                     well_known_name.clone(),
533                     fdo::RequestNameFlags::ReplaceExisting | fdo::RequestNameFlags::DoNotQueue,
534                 )
535                 .await?;
536             if let fdo::RequestNameReply::Exists = reply {
537                 Err(Error::NameTaken)
538             } else {
539                 names.insert(well_known_name.to_owned());
540                 Ok(())
541             }
542         } else {
543             Ok(())
544         }
545     }
546 
547     /// Deregister a previously registered well-known name for this service on the bus.
548     ///
549     /// Use this method to deregister a well-known name, registered through
550     /// [`Connection::request_name`].
551     ///
552     /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with
553     /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name
554     /// was not previously registered or already deregistered.
release_name<'w, W>(&self, well_known_name: W) -> Result<bool> where W: TryInto<WellKnownName<'w>>, W::Error: Into<Error>,555     pub async fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
556     where
557         W: TryInto<WellKnownName<'w>>,
558         W::Error: Into<Error>,
559     {
560         let well_known_name: WellKnownName<'w> = well_known_name.try_into().map_err(Into::into)?;
561         let mut names = self.inner.registered_names.lock().await;
562         // FIXME: Should be possible to avoid cloning/allocation here
563         if !names.remove(&well_known_name.to_owned()) {
564             return Ok(false);
565         }
566 
567         fdo::DBusProxy::builder(self)
568             .cache_properties(CacheProperties::No)
569             .build()
570             .await?
571             .release_name(well_known_name)
572             .await
573             .map(|_| true)
574             .map_err(Into::into)
575     }
576 
577     /// Checks if `self` is a connection to a message bus.
578     ///
579     /// This will return `false` for p2p connections.
is_bus(&self) -> bool580     pub fn is_bus(&self) -> bool {
581         self.inner.bus_conn
582     }
583 
584     /// Assigns a serial number to `msg` that is unique to this connection.
585     ///
586     /// This method can fail if `msg` is corrupted.
assign_serial_num(&self, msg: &mut Message) -> Result<u32>587     pub fn assign_serial_num(&self, msg: &mut Message) -> Result<u32> {
588         let mut serial = 0;
589         msg.modify_primary_header(|primary| {
590             serial = *primary.serial_num_or_init(|| self.next_serial());
591             Ok(())
592         })?;
593 
594         Ok(serial)
595     }
596 
597     /// The unique name as assigned by the message bus or `None` if not a message bus connection.
unique_name(&self) -> Option<&OwnedUniqueName>598     pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
599         self.inner.unique_name.get()
600     }
601 
602     /// Max number of messages to queue.
max_queued(&self) -> usize603     pub fn max_queued(&self) -> usize {
604         self.msg_receiver.capacity()
605     }
606 
607     /// Set the max number of messages to queue.
set_max_queued(&mut self, max: usize)608     pub fn set_max_queued(&mut self, max: usize) {
609         self.msg_receiver.set_capacity(max);
610     }
611 
612     /// The server's GUID.
server_guid(&self) -> &str613     pub fn server_guid(&self) -> &str {
614         self.inner.server_guid.as_str()
615     }
616 
617     /// The underlying executor.
618     ///
619     /// When a connection is built with internal_executor set to false, zbus will not spawn a
620     /// thread to run the executor. You're responsible to continuously [tick the executor][tte].
621     /// Failure to do so will result in hangs.
622     ///
623     /// # Examples
624     ///
625     /// Here is how one would typically run the zbus executor through tokio's single-threaded
626     /// scheduler:
627     ///
628     /// ```
629     /// use zbus::ConnectionBuilder;
630     /// use tokio::runtime;
631     ///
632     /// runtime::Builder::new_current_thread()
633     ///        .build()
634     ///        .unwrap()
635     ///        .block_on(async {
636     ///     let conn = ConnectionBuilder::session()
637     ///         .unwrap()
638     ///         .internal_executor(false)
639     ///         .build()
640     ///         .await
641     ///         .unwrap();
642     ///     {
643     ///        let conn = conn.clone();
644     ///        tokio::task::spawn(async move {
645     ///            loop {
646     ///                conn.executor().tick().await;
647     ///            }
648     ///        });
649     ///     }
650     ///
651     ///     // All your other async code goes here.
652     /// });
653     /// ```
654     ///
655     /// **Note**: zbus 2.1 added support for tight integration with tokio. This means, if you use
656     /// zbus with tokio, you do not need to worry about this at all. All you need to do is enable
657     /// `tokio` feature and disable the (default) `async-io` feature in your `Cargo.toml`.
658     ///
659     /// [tte]: https://docs.rs/async-executor/1.4.1/async_executor/struct.Executor.html#method.tick
executor(&self) -> &Executor<'static>660     pub fn executor(&self) -> &Executor<'static> {
661         &self.inner.executor
662     }
663 
664     /// Get a reference to the associated [`ObjectServer`].
665     ///
666     /// The `ObjectServer` is created on-demand.
object_server(&self) -> impl Deref<Target = ObjectServer> + '_667     pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
668         // FIXME: Maybe it makes sense after all to implement Deref<Target= ObjectServer> for
669         // crate::ObjectServer instead of this wrapper?
670         struct Wrapper<'a>(&'a blocking::ObjectServer);
671         impl<'a> Deref for Wrapper<'a> {
672             type Target = ObjectServer;
673 
674             fn deref(&self) -> &Self::Target {
675                 self.0.inner()
676             }
677         }
678 
679         Wrapper(self.sync_object_server(true))
680     }
681 
sync_object_server(&self, start: bool) -> &blocking::ObjectServer682     pub(crate) fn sync_object_server(&self, start: bool) -> &blocking::ObjectServer {
683         self.inner
684             .object_server
685             .get_or_init(|| self.setup_object_server(start))
686     }
687 
setup_object_server(&self, start: bool) -> blocking::ObjectServer688     fn setup_object_server(&self, start: bool) -> blocking::ObjectServer {
689         if start {
690             self.start_object_server();
691         }
692 
693         blocking::ObjectServer::new(self)
694     }
695 
start_object_server(&self)696     pub(crate) fn start_object_server(&self) {
697         self.inner.object_server_dispatch_task.get_or_init(|| {
698             let weak_conn = WeakConnection::from(self);
699             let mut stream = MessageStream::from(self.clone());
700 
701             self.inner.executor.spawn(async move {
702                 // TODO: Log errors when we've logging.
703                 while let Some(msg) = stream.next().await.and_then(|m| m.ok()) {
704                     if let Some(conn) = weak_conn.upgrade() {
705                         let executor = conn.inner.executor.clone();
706                         executor
707                             .spawn(async move {
708                                 let server = conn.object_server();
709                                 let _ = server.dispatch_message(&msg).await;
710                             })
711                             .detach();
712                     } else {
713                         // If connection is completely gone, no reason to keep running the task anymore.
714                         break;
715                     }
716                 }
717             })
718         });
719     }
720 
add_match(&self, expr: String) -> Result<()>721     pub(crate) async fn add_match(&self, expr: String) -> Result<()> {
722         use std::collections::hash_map::Entry;
723         if !self.is_bus() {
724             return Ok(());
725         }
726         let mut subscriptions = self.inner.signal_matches.lock().await;
727         match subscriptions.entry(expr) {
728             Entry::Vacant(e) => {
729                 fdo::DBusProxy::builder(self)
730                     .cache_properties(CacheProperties::No)
731                     .build()
732                     .await?
733                     .add_match(e.key())
734                     .await?;
735                 e.insert(1);
736             }
737             Entry::Occupied(mut e) => {
738                 *e.get_mut() += 1;
739             }
740         }
741         Ok(())
742     }
743 
remove_match(&self, expr: String) -> Result<bool>744     pub(crate) async fn remove_match(&self, expr: String) -> Result<bool> {
745         use std::collections::hash_map::Entry;
746         if !self.is_bus() {
747             return Ok(true);
748         }
749         let mut subscriptions = self.inner.signal_matches.lock().await;
750         // TODO when it becomes stable, use HashMap::raw_entry and only require expr: &str
751         // (both here and in add_match)
752         match subscriptions.entry(expr) {
753             Entry::Vacant(_) => Ok(false),
754             Entry::Occupied(mut e) => {
755                 *e.get_mut() -= 1;
756                 if *e.get() == 0 {
757                     fdo::DBusProxy::builder(self)
758                         .cache_properties(CacheProperties::No)
759                         .build()
760                         .await?
761                         .remove_match(e.key())
762                         .await?;
763                     e.remove();
764                 }
765                 Ok(true)
766             }
767         }
768     }
769 
queue_remove_match(&self, expr: String)770     pub(crate) fn queue_remove_match(&self, expr: String) {
771         let conn = self.clone();
772         self.inner
773             .executor
774             .spawn(async move { conn.remove_match(expr).await })
775             .detach()
776     }
777 
hello_bus(&self) -> Result<()>778     async fn hello_bus(&self) -> Result<()> {
779         let dbus_proxy = fdo::DBusProxy::builder(self)
780             .cache_properties(CacheProperties::No)
781             .build()
782             .await?;
783         let future = dbus_proxy.hello().map_err(Into::into);
784         let name = self.run_future_at_init(future).await?;
785 
786         self.inner
787             .unique_name
788             .set(name)
789             // programmer (probably our) error if this fails.
790             .expect("Attempted to set unique_name twice");
791 
792         Ok(())
793     }
794 
795     // With external executor, our executor is only run after the connection construction is
796     // completed and some futures need to run to completion before that is done so we need to tick
797     // the executor ourselves in parallel to making the method call. With the internal executor,
798     /// this is not needed but harmless.
run_future_at_init<F, O>(&self, future: F) -> Result<O> where F: Future<Output = Result<O>>,799     pub(crate) async fn run_future_at_init<F, O>(&self, future: F) -> Result<O>
800     where
801         F: Future<Output = Result<O>>,
802     {
803         let executor = self.inner.executor.clone();
804         let ticking_future = async move {
805             // Keep running as long as this task/future is not cancelled.
806             loop {
807                 executor.tick().await;
808             }
809         };
810 
811         futures_util::pin_mut!(future);
812         futures_util::pin_mut!(ticking_future);
813 
814         match select(future, ticking_future).await {
815             Either::Left((res, _)) => res,
816             Either::Right((_, _)) => unreachable!("ticking task future shouldn't finish"),
817         }
818     }
819 
new( auth: Authenticated<Box<dyn Socket>>, bus_connection: bool, internal_executor: bool, ) -> Result<Self>820     pub(crate) async fn new(
821         auth: Authenticated<Box<dyn Socket>>,
822         bus_connection: bool,
823         internal_executor: bool,
824     ) -> Result<Self> {
825         let auth = auth.into_inner();
826         #[cfg(unix)]
827         let cap_unix_fd = auth.cap_unix_fd;
828 
829         let (msg_sender, msg_receiver) = broadcast(DEFAULT_MAX_QUEUED);
830         let msg_receiver = msg_receiver.deactivate();
831         let (error_sender, error_receiver) = bounded(1);
832         let executor = Arc::new(Executor::new());
833         let raw_conn = Arc::new(sync::Mutex::new(auth.conn));
834 
835         // Start the message receiver task.
836         let msg_receiver_task =
837             MessageReceiverTask::new(raw_conn.clone(), msg_sender, error_sender).spawn(&executor);
838 
839         let connection = Self {
840             error_receiver,
841             msg_receiver,
842             inner: Arc::new(ConnectionInner {
843                 raw_conn,
844                 server_guid: auth.server_guid,
845                 #[cfg(unix)]
846                 cap_unix_fd,
847                 bus_conn: bus_connection,
848                 serial: AtomicU32::new(1),
849                 unique_name: OnceCell::new(),
850                 signal_matches: Mutex::new(HashMap::new()),
851                 object_server: OnceCell::new(),
852                 object_server_dispatch_task: OnceCell::new(),
853                 executor: executor.clone(),
854                 msg_receiver_task,
855                 registered_names: Mutex::new(HashSet::new()),
856             }),
857         };
858 
859         if internal_executor {
860             let ticker_future = async move {
861                 // Run as long as there is a task to run.
862                 while !executor.is_empty() {
863                     executor.tick().await;
864                 }
865             };
866             #[cfg(feature = "async-io")]
867             std::thread::Builder::new()
868                 .name("zbus::Connection executor".into())
869                 .spawn(move || crate::utils::block_on(ticker_future))?;
870 
871             #[cfg(all(not(feature = "async-io"), feature = "tokio"))]
872             tokio::task::spawn(ticker_future);
873         }
874 
875         if !bus_connection {
876             return Ok(connection);
877         }
878 
879         // Now that the server has approved us, we must send the bus Hello, as per specs
880         connection.hello_bus().await?;
881 
882         Ok(connection)
883     }
884 
next_serial(&self) -> u32885     fn next_serial(&self) -> u32 {
886         self.inner.serial.fetch_add(1, SeqCst)
887     }
888 
889     /// Create a `Connection` to the session/user message bus.
session() -> Result<Self>890     pub async fn session() -> Result<Self> {
891         ConnectionBuilder::session()?.build().await
892     }
893 
894     /// Create a `Connection` to the system-wide message bus.
system() -> Result<Self>895     pub async fn system() -> Result<Self> {
896         ConnectionBuilder::system()?.build().await
897     }
898 
899     /// Returns a listener, notified on various connection activity.
900     ///
901     /// This function is meant for the caller to implement idle or timeout on inactivity.
monitor_activity(&self) -> EventListener902     pub fn monitor_activity(&self) -> EventListener {
903         self.inner
904             .raw_conn
905             .lock()
906             .expect("poisoned lock")
907             .monitor_activity()
908     }
909 }
910 
911 impl Sink<Message> for Connection {
912     type Error = Error;
913 
poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>914     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
915         Pin::new(&mut &*self).poll_ready(cx)
916     }
917 
start_send(self: Pin<&mut Self>, msg: Message) -> Result<()>918     fn start_send(self: Pin<&mut Self>, msg: Message) -> Result<()> {
919         Pin::new(&mut &*self).start_send(msg)
920     }
921 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>922     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
923         Pin::new(&mut &*self).poll_flush(cx)
924     }
925 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>926     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
927         Pin::new(&mut &*self).poll_close(cx)
928     }
929 }
930 
931 impl<'a> Sink<Message> for &'a Connection {
932     type Error = Error;
933 
poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>>934     fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
935         // TODO: We should have a max queue length in raw::Socket for outgoing messages.
936         Poll::Ready(Ok(()))
937     }
938 
start_send(self: Pin<&mut Self>, msg: Message) -> Result<()>939     fn start_send(self: Pin<&mut Self>, msg: Message) -> Result<()> {
940         #[cfg(unix)]
941         if !msg.fds().is_empty() && !self.inner.cap_unix_fd {
942             return Err(Error::Unsupported);
943         }
944 
945         self.inner
946             .raw_conn
947             .lock()
948             .expect("poisoned lock")
949             .enqueue_message(msg);
950 
951         Ok(())
952     }
953 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>954     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
955         self.inner.raw_conn.lock().expect("poisoned lock").flush(cx)
956     }
957 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>958     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
959         let mut raw_conn = self.inner.raw_conn.lock().expect("poisoned lock");
960         match ready!(raw_conn.flush(cx)) {
961             Ok(_) => (),
962             Err(e) => return Poll::Ready(Err(e)),
963         }
964 
965         Poll::Ready(raw_conn.close())
966     }
967 }
968 
969 struct ReceiveMessage<'r> {
970     raw_conn: &'r sync::Mutex<RawConnection<Box<dyn Socket>>>,
971 }
972 
973 impl<'r> Future for ReceiveMessage<'r> {
974     type Output = Result<Message>;
975 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>976     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
977         let mut raw_conn = self.raw_conn.lock().expect("poisoned lock");
978         raw_conn.try_receive_message(cx)
979     }
980 }
981 
982 impl From<crate::blocking::Connection> for Connection {
from(conn: crate::blocking::Connection) -> Self983     fn from(conn: crate::blocking::Connection) -> Self {
984         conn.into_inner()
985     }
986 }
987 
988 // Internal API that allows keeping a weak connection ref around.
989 #[derive(Debug)]
990 pub(crate) struct WeakConnection {
991     inner: Weak<ConnectionInner>,
992     msg_receiver: InactiveReceiver<Arc<Message>>,
993     error_receiver: Receiver<Error>,
994 }
995 
996 impl WeakConnection {
997     /// Upgrade to a Connection.
upgrade(&self) -> Option<Connection>998     pub fn upgrade(&self) -> Option<Connection> {
999         self.inner.upgrade().map(|inner| Connection {
1000             inner,
1001             msg_receiver: self.msg_receiver.clone(),
1002             error_receiver: self.error_receiver.clone(),
1003         })
1004     }
1005 }
1006 
1007 impl From<&Connection> for WeakConnection {
from(conn: &Connection) -> Self1008     fn from(conn: &Connection) -> Self {
1009         Self {
1010             inner: Arc::downgrade(&conn.inner),
1011             msg_receiver: conn.msg_receiver.clone(),
1012             error_receiver: conn.error_receiver.clone(),
1013         }
1014     }
1015 }
1016 
1017 #[cfg(test)]
1018 mod tests {
1019     use futures_util::stream::TryStreamExt;
1020     use ntest::timeout;
1021     use test_log::test;
1022 
1023     #[cfg(all(unix, feature = "async-io"))]
1024     use crate::AuthMechanism;
1025 
1026     use super::*;
1027 
test_p2p(server: Connection, client: Connection) -> Result<()>1028     async fn test_p2p(server: Connection, client: Connection) -> Result<()> {
1029         let server_future = async {
1030             let mut method: Option<Arc<Message>> = None;
1031             let mut stream = MessageStream::from(&server);
1032             while let Some(m) = stream.try_next().await? {
1033                 if m.to_string() == "Method call Test" {
1034                     method.replace(m);
1035 
1036                     break;
1037                 }
1038             }
1039             let method = method.unwrap();
1040 
1041             // Send another message first to check the queueing function on client side.
1042             server
1043                 .emit_signal(None::<()>, "/", "org.zbus.p2p", "ASignalForYou", &())
1044                 .await?;
1045             server.reply(&method, &("yay")).await
1046         };
1047 
1048         let client_future = async {
1049             let mut stream = MessageStream::from(&client);
1050             let reply = client
1051                 .call_method(None::<()>, "/", Some("org.zbus.p2p"), "Test", &())
1052                 .await?;
1053             assert_eq!(reply.to_string(), "Method return");
1054             // Check we didn't miss the signal that was sent during the call.
1055             let m = stream.try_next().await?.unwrap();
1056             assert_eq!(m.to_string(), "Signal ASignalForYou");
1057             reply.body::<String>()
1058         };
1059 
1060         let (val, _) = futures_util::try_join!(client_future, server_future)?;
1061         assert_eq!(val, "yay");
1062 
1063         Ok(())
1064     }
1065 
1066     // FIXME: Make it work with tokio as well.
1067     #[cfg(feature = "async-io")]
1068     #[test]
1069     #[timeout(15000)]
tcp_p2p()1070     fn tcp_p2p() {
1071         crate::utils::block_on(test_tcp_p2p()).unwrap();
1072     }
1073 
1074     #[cfg(feature = "async-io")]
test_tcp_p2p() -> Result<()>1075     async fn test_tcp_p2p() -> Result<()> {
1076         let guid = Guid::generate();
1077 
1078         let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1079         let addr = listener.local_addr().unwrap();
1080         let client = std::net::TcpStream::connect(addr).unwrap();
1081         let server = listener.incoming().next().unwrap().unwrap();
1082 
1083         let server = {
1084             let c = ConnectionBuilder::tcp_stream(server).server(&guid).p2p();
1085 
1086             // EXTERNAL is only implemented on win32 with TCP sockets
1087             #[cfg(unix)]
1088             let c = c.auth_mechanisms(&[AuthMechanism::Anonymous]);
1089 
1090             c.build()
1091         };
1092         let client = ConnectionBuilder::tcp_stream(client).p2p().build();
1093         let (client, server) = futures_util::try_join!(client, server)?;
1094 
1095         test_p2p(server, client).await
1096     }
1097 
1098     #[cfg(unix)]
1099     #[test]
1100     #[timeout(15000)]
unix_p2p()1101     fn unix_p2p() {
1102         crate::utils::block_on(test_unix_p2p()).unwrap();
1103     }
1104 
1105     #[cfg(unix)]
test_unix_p2p() -> Result<()>1106     async fn test_unix_p2p() -> Result<()> {
1107         #[cfg(feature = "async-io")]
1108         use std::os::unix::net::UnixStream;
1109         #[cfg(all(not(feature = "async-io"), feature = "tokio"))]
1110         use tokio::net::UnixStream;
1111 
1112         let guid = Guid::generate();
1113 
1114         let (p0, p1) = UnixStream::pair().unwrap();
1115 
1116         let server = ConnectionBuilder::unix_stream(p0)
1117             .server(&guid)
1118             .p2p()
1119             .build();
1120         let client = ConnectionBuilder::unix_stream(p1).p2p().build();
1121         let (client, server) = futures_util::try_join!(client, server)?;
1122 
1123         test_p2p(server, client).await
1124     }
1125 
1126     #[test]
1127     #[timeout(15000)]
serial_monotonically_increases()1128     fn serial_monotonically_increases() {
1129         crate::utils::block_on(test_serial_monotonically_increases());
1130     }
1131 
test_serial_monotonically_increases()1132     async fn test_serial_monotonically_increases() {
1133         let c = Connection::session().await.unwrap();
1134         let serial = c.next_serial() + 1;
1135 
1136         for next in serial..serial + 10 {
1137             assert_eq!(next, c.next_serial());
1138         }
1139     }
1140 }
1141