1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_SHARED_REMOTE_H_
6 #define MOJO_PUBLIC_CPP_BINDINGS_SHARED_REMOTE_H_
7 
8 #include <memory>
9 
10 #include "base/bind.h"
11 #include "base/macros.h"
12 #include "base/memory/ptr_util.h"
13 #include "base/memory/ref_counted.h"
14 #include "base/stl_util.h"
15 #include "base/synchronization/waitable_event.h"
16 #include "base/task_runner.h"
17 #include "base/threading/sequenced_task_runner_handle.h"
18 #include "mojo/public/cpp/bindings/associated_group.h"
19 #include "mojo/public/cpp/bindings/message.h"
20 #include "mojo/public/cpp/bindings/remote.h"
21 #include "mojo/public/cpp/bindings/sync_call_restrictions.h"
22 #include "mojo/public/cpp/bindings/sync_event_watcher.h"
23 #include "mojo/public/cpp/bindings/thread_safe_interface_ptr.h"
24 
25 namespace mojo {
26 
27 template <typename Interface>
28 class SharedRemote;
29 
30 template <typename RemoteType>
31 class SharedRemoteBase
32     : public base::RefCountedThreadSafe<SharedRemoteBase<RemoteType>> {
33  public:
34   using InterfaceType = typename RemoteType::InterfaceType;
35   using PendingType = typename RemoteType::PendingType;
36 
get()37   InterfaceType* get() { return &forwarder_->proxy(); }
38   InterfaceType* operator->() { return get(); }
39   InterfaceType& operator*() { return *get(); }
40 
set_disconnect_handler(base::OnceClosure handler,scoped_refptr<base::SequencedTaskRunner> handler_task_runner)41   void set_disconnect_handler(
42       base::OnceClosure handler,
43       scoped_refptr<base::SequencedTaskRunner> handler_task_runner) {
44     wrapper_->set_disconnect_handler(std::move(handler),
45                                      std::move(handler_task_runner));
46   }
47 
48  private:
49   friend class base::RefCountedThreadSafe<SharedRemoteBase<RemoteType>>;
50   template <typename Interface>
51   friend class SharedRemote;
52   template <typename Interface>
53   friend class SharedAssociatedRemote;
54 
55   struct RemoteWrapperDeleter;
56 
57   // Helper class which owns a |RemoteType| instance on an appropriate sequence.
58   // This is kept alive as long as it's bound within some ThreadSafeForwarder's
59   // callbacks.
60   class RemoteWrapper
61       : public base::RefCountedThreadSafe<RemoteWrapper, RemoteWrapperDeleter> {
62    public:
RemoteWrapper(RemoteType remote)63     explicit RemoteWrapper(RemoteType remote)
64         : RemoteWrapper(base::SequencedTaskRunnerHandle::Get()) {
65       remote_ = std::move(remote);
66       associated_group_ = *remote_.internal_state()->associated_group();
67 
68       // By default we force all messages to behave as if async within the
69       // Remote, as SharedRemote implements its own waiting mechanism to block
70       // only the calling thread when making sync calls.
71       remote_.internal_state()->force_outgoing_messages_async(true);
72     }
73 
RemoteWrapper(scoped_refptr<base::SequencedTaskRunner> task_runner)74     explicit RemoteWrapper(scoped_refptr<base::SequencedTaskRunner> task_runner)
75         : task_runner_(std::move(task_runner)) {}
76 
BindOnTaskRunner(PendingType remote)77     void BindOnTaskRunner(PendingType remote) {
78       // TODO(https://crbug.com/682334): At the moment we don't have a group
79       // controller available. That means the user won't be able to pass
80       // associated endpoints on this interface (at least not immediately). In
81       // order to fix this, we need to create a MultiplexRouter immediately and
82       // bind it to the interface pointer on the |task_runner_|. Therefore,
83       // MultiplexRouter should be able to be created on a sequence different
84       // than the one that it is supposed to listen on.
85       task_runner_->PostTask(
86           FROM_HERE,
87           base::BindOnce(&RemoteWrapper::Bind, this, std::move(remote)));
88     }
89 
CreateForwarder()90     std::unique_ptr<ThreadSafeForwarder<InterfaceType>> CreateForwarder() {
91       return std::make_unique<ThreadSafeForwarder<InterfaceType>>(
92           task_runner_, base::BindRepeating(&RemoteWrapper::Accept, this),
93           base::BindRepeating(&RemoteWrapper::AcceptWithResponder, this),
94           base::BindRepeating(&RemoteWrapper::ForceAsyncSend, this),
95           associated_group_);
96     }
97 
set_disconnect_handler(base::OnceClosure handler,scoped_refptr<base::SequencedTaskRunner> handler_task_runner)98     void set_disconnect_handler(
99         base::OnceClosure handler,
100         scoped_refptr<base::SequencedTaskRunner> handler_task_runner) {
101       if (!task_runner_->RunsTasksInCurrentSequence()) {
102         // Make sure we modify the remote's disconnect handler on the
103         // correct sequence.
104         task_runner_->PostTask(
105             FROM_HERE,
106             base::BindOnce(&RemoteWrapper::set_disconnect_handler, this,
107                            std::move(handler), std::move(handler_task_runner)));
108         return;
109       }
110       // The actual handler will post a task to run |handler| on
111       // |handler_task_runner|.
112       auto wrapped_handler =
113           base::BindOnce(base::IgnoreResult(&base::TaskRunner::PostTask),
114                          handler_task_runner, FROM_HERE, std::move(handler));
115       // Because we may have had to post a task to set this handler,
116       // this call may land after the remote has just been disconnected.
117       // Manually invoke the handler in that case.
118       if (!remote_.is_connected()) {
119         std::move(wrapped_handler).Run();
120         return;
121       }
122       remote_.set_disconnect_handler(std::move(wrapped_handler));
123     }
124 
125    private:
126     friend struct RemoteWrapperDeleter;
127 
~RemoteWrapper()128     ~RemoteWrapper() {}
129 
Bind(PendingType remote)130     void Bind(PendingType remote) {
131       DCHECK(task_runner_->RunsTasksInCurrentSequence());
132       remote_.Bind(std::move(remote));
133 
134       // By default we force all messages to behave as if async within the
135       // Remote, as SharedRemote implements its own waiting mechanism to block
136       // only the calling thread when making sync calls.
137       remote_.internal_state()->force_outgoing_messages_async(true);
138     }
139 
Accept(Message message)140     void Accept(Message message) {
141       remote_.internal_state()->ForwardMessage(std::move(message));
142     }
143 
AcceptWithResponder(Message message,std::unique_ptr<MessageReceiver> responder)144     void AcceptWithResponder(Message message,
145                              std::unique_ptr<MessageReceiver> responder) {
146       remote_.internal_state()->ForwardMessageWithResponder(
147           std::move(message), std::move(responder));
148     }
149 
ForceAsyncSend(bool force)150     void ForceAsyncSend(bool force) {
151       remote_.internal_state()->force_outgoing_messages_async(force);
152     }
153 
DeleteOnCorrectThread()154     void DeleteOnCorrectThread() const {
155       if (!task_runner_->RunsTasksInCurrentSequence()) {
156         // NOTE: This is only called when there are no more references to
157         // |this|, so binding it unretained is both safe and necessary.
158         task_runner_->PostTask(
159             FROM_HERE, base::BindOnce(&RemoteWrapper::DeleteOnCorrectThread,
160                                       base::Unretained(this)));
161       } else {
162         delete this;
163       }
164     }
165 
166     RemoteType remote_;
167     const scoped_refptr<base::SequencedTaskRunner> task_runner_;
168     AssociatedGroup associated_group_;
169 
170     DISALLOW_COPY_AND_ASSIGN(RemoteWrapper);
171   };
172 
173   struct RemoteWrapperDeleter {
DestructRemoteWrapperDeleter174     static void Destruct(const RemoteWrapper* wrapper) {
175       wrapper->DeleteOnCorrectThread();
176     }
177   };
178 
SharedRemoteBase(scoped_refptr<RemoteWrapper> wrapper)179   explicit SharedRemoteBase(scoped_refptr<RemoteWrapper> wrapper)
180       : wrapper_(std::move(wrapper)), forwarder_(wrapper_->CreateForwarder()) {}
181 
182   // Creates a SharedRemoteBase wrapping an underlying non-thread-safe
183   // PendingType which is bound to the calling sequence. All messages sent
184   // via this thread-safe proxy will internally be sent by first posting to this
185   // (the calling) sequence's TaskRunner.
Create(PendingType pending_remote)186   static scoped_refptr<SharedRemoteBase> Create(PendingType pending_remote) {
187     scoped_refptr<RemoteWrapper> wrapper =
188         new RemoteWrapper(RemoteType(std::move(pending_remote)));
189     return new SharedRemoteBase(wrapper);
190   }
191 
192   // Creates a SharedRemoteBase which binds the underlying
193   // non-thread-safe InterfacePtrType on the specified TaskRunner. All messages
194   // sent via this thread-safe proxy will internally be sent by first posting to
195   // that TaskRunner.
Create(PendingType pending_remote,scoped_refptr<base::SequencedTaskRunner> bind_task_runner)196   static scoped_refptr<SharedRemoteBase> Create(
197       PendingType pending_remote,
198       scoped_refptr<base::SequencedTaskRunner> bind_task_runner) {
199     scoped_refptr<RemoteWrapper> wrapper =
200         new RemoteWrapper(std::move(bind_task_runner));
201     wrapper->BindOnTaskRunner(std::move(pending_remote));
202     return new SharedRemoteBase(wrapper);
203   }
204 
~SharedRemoteBase()205   ~SharedRemoteBase() {}
206 
207   const scoped_refptr<RemoteWrapper> wrapper_;
208   const std::unique_ptr<ThreadSafeForwarder<InterfaceType>> forwarder_;
209 
210   DISALLOW_COPY_AND_ASSIGN(SharedRemoteBase);
211 };
212 
213 // SharedRemote wraps a non-thread-safe Remote and proxies messages to it.
214 // Unlike normal Remote objects, SharedRemote is copyable and usable from any
215 // thread, but has some additional overhead and latency in message transmission
216 // as a trade-off.
217 //
218 // Async calls are posted to the bound sequence (the sequence that the
219 // underlying Remote is bound to, i.e. |bind_task_runner| below), and responses
220 // are posted back to the calling sequence. Sync calls are dispatched directly
221 // if the call is made on the bound sequence, or posted otherwise.
222 //
223 // This means that in general, when making calls from sequences other than the
224 // bound sequence, a hop is first made *to* the bound sequence; and when
225 // receiving replies, a hop is made *from* the bound the sequence.
226 //
227 // Note that sync calls only block the calling sequence.
228 template <typename Interface>
229 class SharedRemote {
230  public:
231   // Constructs an unbound SharedRemote. This object cannot issue Interface
232   // method calls and does not schedule any tasks. A default-constructed
233   // SharedRemote may be replaced with a bound one via copy- or move-assignment.
234   SharedRemote() = default;
235 
236   // Constructs a SharedRemote bound to `pending_remote` on the calling
237   // sequence. See `Bind()` below for more details.
SharedRemote(PendingRemote<Interface> pending_remote)238   explicit SharedRemote(PendingRemote<Interface> pending_remote) {
239     Bind(std::move(pending_remote), nullptr);
240   }
241 
242   // Constructs a SharedRemote bound to `pending_remote` on the sequence given
243   // by `bind_task_runner`. See `Bind()` below for more details.
SharedRemote(PendingRemote<Interface> pending_remote,scoped_refptr<base::SequencedTaskRunner> bind_task_runner)244   SharedRemote(PendingRemote<Interface> pending_remote,
245                scoped_refptr<base::SequencedTaskRunner> bind_task_runner) {
246     Bind(std::move(pending_remote), std::move(bind_task_runner));
247   }
248 
249   // SharedRemote supports both copy and move construction and assignment. These
250   // are explicitly defaulted here for clarity.
251   SharedRemote(const SharedRemote&) = default;
252   SharedRemote(SharedRemote&&) = default;
253   SharedRemote& operator=(const SharedRemote&) = default;
254   SharedRemote& operator=(SharedRemote&&) = default;
255 
is_bound()256   bool is_bound() const { return remote_ != nullptr; }
257   explicit operator bool() const { return is_bound(); }
258 
get()259   Interface* get() const { return remote_->get(); }
260   Interface* operator->() const { return get(); }
261   Interface& operator*() const { return *get(); }
262 
set_disconnect_handler(base::OnceClosure handler,scoped_refptr<base::SequencedTaskRunner> handler_task_runner)263   void set_disconnect_handler(
264       base::OnceClosure handler,
265       scoped_refptr<base::SequencedTaskRunner> handler_task_runner) {
266     remote_->set_disconnect_handler(std::move(handler),
267                                     std::move(handler_task_runner));
268   }
269 
270   // Clears this SharedRemote. Note that this does *not* necessarily close the
271   // remote's endpoint as other SharedRemote instances may reference the same
272   // underlying endpoint.
reset()273   void reset() { remote_.reset(); }
274 
275   // Binds this SharedRemote to `pending_remote` on the sequence given by
276   // `bind_task_runner`, or the calling sequence if `bind_task_runner` is null.
277   // Once bound, the SharedRemote may be used to send messages on the underlying
278   // Remote. Messages always bounce through `bind_task_runner` before sending,
279   // unless the caller is issuing a [Sync] call from `bind_task_runner` already;
280   // in which case this behaves exactly like a regular Remote for that call.
281   //
282   // Any reply received by the SharedRemote is dispatched to whatever
283   // SequencedTaskRunner was current when the corresponding request was made.
284   //
285   // A bound SharedRemote may be copied any number of times, to any number of
286   // threads. Each copy sends messages through the same underlying Remote, after
287   // bouncing through the same `bind_task_runner`.
288   //
289   // If this SharedRemote was already bound, it will be effectively unbound by
290   // this call and re-bound to `pending_remote`. Any prior copies made are NOT
291   // affected and will retain their reference to the original Remote.
Bind(PendingRemote<Interface> pending_remote,scoped_refptr<base::SequencedTaskRunner> bind_task_runner)292   void Bind(PendingRemote<Interface> pending_remote,
293             scoped_refptr<base::SequencedTaskRunner> bind_task_runner) {
294     if (bind_task_runner && pending_remote) {
295       remote_ = SharedRemoteBase<Remote<Interface>>::Create(
296           std::move(pending_remote), std::move(bind_task_runner));
297     } else if (pending_remote) {
298       remote_ = SharedRemoteBase<Remote<Interface>>::Create(
299           std::move(pending_remote));
300     }
301   }
302 
303  private:
304   scoped_refptr<SharedRemoteBase<Remote<Interface>>> remote_;
305 };
306 
307 }  // namespace mojo
308 
309 #endif  // MOJO_PUBLIC_CPP_BINDINGS_SHARED_REMOTE_H_
310