1 // Copyright 2019 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_SHARED_REMOTE_H_ 6 #define MOJO_PUBLIC_CPP_BINDINGS_SHARED_REMOTE_H_ 7 8 #include <memory> 9 10 #include "base/bind.h" 11 #include "base/macros.h" 12 #include "base/memory/ptr_util.h" 13 #include "base/memory/ref_counted.h" 14 #include "base/stl_util.h" 15 #include "base/synchronization/waitable_event.h" 16 #include "base/task_runner.h" 17 #include "base/threading/sequenced_task_runner_handle.h" 18 #include "mojo/public/cpp/bindings/associated_group.h" 19 #include "mojo/public/cpp/bindings/message.h" 20 #include "mojo/public/cpp/bindings/remote.h" 21 #include "mojo/public/cpp/bindings/sync_call_restrictions.h" 22 #include "mojo/public/cpp/bindings/sync_event_watcher.h" 23 #include "mojo/public/cpp/bindings/thread_safe_interface_ptr.h" 24 25 namespace mojo { 26 27 template <typename Interface> 28 class SharedRemote; 29 30 template <typename RemoteType> 31 class SharedRemoteBase 32 : public base::RefCountedThreadSafe<SharedRemoteBase<RemoteType>> { 33 public: 34 using InterfaceType = typename RemoteType::InterfaceType; 35 using PendingType = typename RemoteType::PendingType; 36 get()37 InterfaceType* get() { return &forwarder_->proxy(); } 38 InterfaceType* operator->() { return get(); } 39 InterfaceType& operator*() { return *get(); } 40 set_disconnect_handler(base::OnceClosure handler,scoped_refptr<base::SequencedTaskRunner> handler_task_runner)41 void set_disconnect_handler( 42 base::OnceClosure handler, 43 scoped_refptr<base::SequencedTaskRunner> handler_task_runner) { 44 wrapper_->set_disconnect_handler(std::move(handler), 45 std::move(handler_task_runner)); 46 } 47 48 private: 49 friend class base::RefCountedThreadSafe<SharedRemoteBase<RemoteType>>; 50 template <typename Interface> 51 friend class SharedRemote; 52 template <typename Interface> 53 friend class SharedAssociatedRemote; 54 55 struct RemoteWrapperDeleter; 56 57 // Helper class which owns a |RemoteType| instance on an appropriate sequence. 58 // This is kept alive as long as it's bound within some ThreadSafeForwarder's 59 // callbacks. 60 class RemoteWrapper 61 : public base::RefCountedThreadSafe<RemoteWrapper, RemoteWrapperDeleter> { 62 public: RemoteWrapper(RemoteType remote)63 explicit RemoteWrapper(RemoteType remote) 64 : RemoteWrapper(base::SequencedTaskRunnerHandle::Get()) { 65 remote_ = std::move(remote); 66 associated_group_ = *remote_.internal_state()->associated_group(); 67 68 // By default we force all messages to behave as if async within the 69 // Remote, as SharedRemote implements its own waiting mechanism to block 70 // only the calling thread when making sync calls. 71 remote_.internal_state()->force_outgoing_messages_async(true); 72 } 73 RemoteWrapper(scoped_refptr<base::SequencedTaskRunner> task_runner)74 explicit RemoteWrapper(scoped_refptr<base::SequencedTaskRunner> task_runner) 75 : task_runner_(std::move(task_runner)) {} 76 BindOnTaskRunner(PendingType remote)77 void BindOnTaskRunner(PendingType remote) { 78 // TODO(https://crbug.com/682334): At the moment we don't have a group 79 // controller available. That means the user won't be able to pass 80 // associated endpoints on this interface (at least not immediately). In 81 // order to fix this, we need to create a MultiplexRouter immediately and 82 // bind it to the interface pointer on the |task_runner_|. Therefore, 83 // MultiplexRouter should be able to be created on a sequence different 84 // than the one that it is supposed to listen on. 85 task_runner_->PostTask( 86 FROM_HERE, 87 base::BindOnce(&RemoteWrapper::Bind, this, std::move(remote))); 88 } 89 CreateForwarder()90 std::unique_ptr<ThreadSafeForwarder<InterfaceType>> CreateForwarder() { 91 return std::make_unique<ThreadSafeForwarder<InterfaceType>>( 92 task_runner_, base::BindRepeating(&RemoteWrapper::Accept, this), 93 base::BindRepeating(&RemoteWrapper::AcceptWithResponder, this), 94 base::BindRepeating(&RemoteWrapper::ForceAsyncSend, this), 95 associated_group_); 96 } 97 set_disconnect_handler(base::OnceClosure handler,scoped_refptr<base::SequencedTaskRunner> handler_task_runner)98 void set_disconnect_handler( 99 base::OnceClosure handler, 100 scoped_refptr<base::SequencedTaskRunner> handler_task_runner) { 101 if (!task_runner_->RunsTasksInCurrentSequence()) { 102 // Make sure we modify the remote's disconnect handler on the 103 // correct sequence. 104 task_runner_->PostTask( 105 FROM_HERE, 106 base::BindOnce(&RemoteWrapper::set_disconnect_handler, this, 107 std::move(handler), std::move(handler_task_runner))); 108 return; 109 } 110 // The actual handler will post a task to run |handler| on 111 // |handler_task_runner|. 112 auto wrapped_handler = 113 base::BindOnce(base::IgnoreResult(&base::TaskRunner::PostTask), 114 handler_task_runner, FROM_HERE, std::move(handler)); 115 // Because we may have had to post a task to set this handler, 116 // this call may land after the remote has just been disconnected. 117 // Manually invoke the handler in that case. 118 if (!remote_.is_connected()) { 119 std::move(wrapped_handler).Run(); 120 return; 121 } 122 remote_.set_disconnect_handler(std::move(wrapped_handler)); 123 } 124 125 private: 126 friend struct RemoteWrapperDeleter; 127 ~RemoteWrapper()128 ~RemoteWrapper() {} 129 Bind(PendingType remote)130 void Bind(PendingType remote) { 131 DCHECK(task_runner_->RunsTasksInCurrentSequence()); 132 remote_.Bind(std::move(remote)); 133 134 // By default we force all messages to behave as if async within the 135 // Remote, as SharedRemote implements its own waiting mechanism to block 136 // only the calling thread when making sync calls. 137 remote_.internal_state()->force_outgoing_messages_async(true); 138 } 139 Accept(Message message)140 void Accept(Message message) { 141 remote_.internal_state()->ForwardMessage(std::move(message)); 142 } 143 AcceptWithResponder(Message message,std::unique_ptr<MessageReceiver> responder)144 void AcceptWithResponder(Message message, 145 std::unique_ptr<MessageReceiver> responder) { 146 remote_.internal_state()->ForwardMessageWithResponder( 147 std::move(message), std::move(responder)); 148 } 149 ForceAsyncSend(bool force)150 void ForceAsyncSend(bool force) { 151 remote_.internal_state()->force_outgoing_messages_async(force); 152 } 153 DeleteOnCorrectThread()154 void DeleteOnCorrectThread() const { 155 if (!task_runner_->RunsTasksInCurrentSequence()) { 156 // NOTE: This is only called when there are no more references to 157 // |this|, so binding it unretained is both safe and necessary. 158 task_runner_->PostTask( 159 FROM_HERE, base::BindOnce(&RemoteWrapper::DeleteOnCorrectThread, 160 base::Unretained(this))); 161 } else { 162 delete this; 163 } 164 } 165 166 RemoteType remote_; 167 const scoped_refptr<base::SequencedTaskRunner> task_runner_; 168 AssociatedGroup associated_group_; 169 170 DISALLOW_COPY_AND_ASSIGN(RemoteWrapper); 171 }; 172 173 struct RemoteWrapperDeleter { DestructRemoteWrapperDeleter174 static void Destruct(const RemoteWrapper* wrapper) { 175 wrapper->DeleteOnCorrectThread(); 176 } 177 }; 178 SharedRemoteBase(scoped_refptr<RemoteWrapper> wrapper)179 explicit SharedRemoteBase(scoped_refptr<RemoteWrapper> wrapper) 180 : wrapper_(std::move(wrapper)), forwarder_(wrapper_->CreateForwarder()) {} 181 182 // Creates a SharedRemoteBase wrapping an underlying non-thread-safe 183 // PendingType which is bound to the calling sequence. All messages sent 184 // via this thread-safe proxy will internally be sent by first posting to this 185 // (the calling) sequence's TaskRunner. Create(PendingType pending_remote)186 static scoped_refptr<SharedRemoteBase> Create(PendingType pending_remote) { 187 scoped_refptr<RemoteWrapper> wrapper = 188 new RemoteWrapper(RemoteType(std::move(pending_remote))); 189 return new SharedRemoteBase(wrapper); 190 } 191 192 // Creates a SharedRemoteBase which binds the underlying 193 // non-thread-safe InterfacePtrType on the specified TaskRunner. All messages 194 // sent via this thread-safe proxy will internally be sent by first posting to 195 // that TaskRunner. Create(PendingType pending_remote,scoped_refptr<base::SequencedTaskRunner> bind_task_runner)196 static scoped_refptr<SharedRemoteBase> Create( 197 PendingType pending_remote, 198 scoped_refptr<base::SequencedTaskRunner> bind_task_runner) { 199 scoped_refptr<RemoteWrapper> wrapper = 200 new RemoteWrapper(std::move(bind_task_runner)); 201 wrapper->BindOnTaskRunner(std::move(pending_remote)); 202 return new SharedRemoteBase(wrapper); 203 } 204 ~SharedRemoteBase()205 ~SharedRemoteBase() {} 206 207 const scoped_refptr<RemoteWrapper> wrapper_; 208 const std::unique_ptr<ThreadSafeForwarder<InterfaceType>> forwarder_; 209 210 DISALLOW_COPY_AND_ASSIGN(SharedRemoteBase); 211 }; 212 213 // SharedRemote wraps a non-thread-safe Remote and proxies messages to it. 214 // Unlike normal Remote objects, SharedRemote is copyable and usable from any 215 // thread, but has some additional overhead and latency in message transmission 216 // as a trade-off. 217 // 218 // Async calls are posted to the bound sequence (the sequence that the 219 // underlying Remote is bound to, i.e. |bind_task_runner| below), and responses 220 // are posted back to the calling sequence. Sync calls are dispatched directly 221 // if the call is made on the bound sequence, or posted otherwise. 222 // 223 // This means that in general, when making calls from sequences other than the 224 // bound sequence, a hop is first made *to* the bound sequence; and when 225 // receiving replies, a hop is made *from* the bound the sequence. 226 // 227 // Note that sync calls only block the calling sequence. 228 template <typename Interface> 229 class SharedRemote { 230 public: 231 // Constructs an unbound SharedRemote. This object cannot issue Interface 232 // method calls and does not schedule any tasks. A default-constructed 233 // SharedRemote may be replaced with a bound one via copy- or move-assignment. 234 SharedRemote() = default; 235 236 // Constructs a SharedRemote bound to `pending_remote` on the calling 237 // sequence. See `Bind()` below for more details. SharedRemote(PendingRemote<Interface> pending_remote)238 explicit SharedRemote(PendingRemote<Interface> pending_remote) { 239 Bind(std::move(pending_remote), nullptr); 240 } 241 242 // Constructs a SharedRemote bound to `pending_remote` on the sequence given 243 // by `bind_task_runner`. See `Bind()` below for more details. SharedRemote(PendingRemote<Interface> pending_remote,scoped_refptr<base::SequencedTaskRunner> bind_task_runner)244 SharedRemote(PendingRemote<Interface> pending_remote, 245 scoped_refptr<base::SequencedTaskRunner> bind_task_runner) { 246 Bind(std::move(pending_remote), std::move(bind_task_runner)); 247 } 248 249 // SharedRemote supports both copy and move construction and assignment. These 250 // are explicitly defaulted here for clarity. 251 SharedRemote(const SharedRemote&) = default; 252 SharedRemote(SharedRemote&&) = default; 253 SharedRemote& operator=(const SharedRemote&) = default; 254 SharedRemote& operator=(SharedRemote&&) = default; 255 is_bound()256 bool is_bound() const { return remote_ != nullptr; } 257 explicit operator bool() const { return is_bound(); } 258 get()259 Interface* get() const { return remote_->get(); } 260 Interface* operator->() const { return get(); } 261 Interface& operator*() const { return *get(); } 262 set_disconnect_handler(base::OnceClosure handler,scoped_refptr<base::SequencedTaskRunner> handler_task_runner)263 void set_disconnect_handler( 264 base::OnceClosure handler, 265 scoped_refptr<base::SequencedTaskRunner> handler_task_runner) { 266 remote_->set_disconnect_handler(std::move(handler), 267 std::move(handler_task_runner)); 268 } 269 270 // Clears this SharedRemote. Note that this does *not* necessarily close the 271 // remote's endpoint as other SharedRemote instances may reference the same 272 // underlying endpoint. reset()273 void reset() { remote_.reset(); } 274 275 // Binds this SharedRemote to `pending_remote` on the sequence given by 276 // `bind_task_runner`, or the calling sequence if `bind_task_runner` is null. 277 // Once bound, the SharedRemote may be used to send messages on the underlying 278 // Remote. Messages always bounce through `bind_task_runner` before sending, 279 // unless the caller is issuing a [Sync] call from `bind_task_runner` already; 280 // in which case this behaves exactly like a regular Remote for that call. 281 // 282 // Any reply received by the SharedRemote is dispatched to whatever 283 // SequencedTaskRunner was current when the corresponding request was made. 284 // 285 // A bound SharedRemote may be copied any number of times, to any number of 286 // threads. Each copy sends messages through the same underlying Remote, after 287 // bouncing through the same `bind_task_runner`. 288 // 289 // If this SharedRemote was already bound, it will be effectively unbound by 290 // this call and re-bound to `pending_remote`. Any prior copies made are NOT 291 // affected and will retain their reference to the original Remote. Bind(PendingRemote<Interface> pending_remote,scoped_refptr<base::SequencedTaskRunner> bind_task_runner)292 void Bind(PendingRemote<Interface> pending_remote, 293 scoped_refptr<base::SequencedTaskRunner> bind_task_runner) { 294 if (bind_task_runner && pending_remote) { 295 remote_ = SharedRemoteBase<Remote<Interface>>::Create( 296 std::move(pending_remote), std::move(bind_task_runner)); 297 } else if (pending_remote) { 298 remote_ = SharedRemoteBase<Remote<Interface>>::Create( 299 std::move(pending_remote)); 300 } 301 } 302 303 private: 304 scoped_refptr<SharedRemoteBase<Remote<Interface>>> remote_; 305 }; 306 307 } // namespace mojo 308 309 #endif // MOJO_PUBLIC_CPP_BINDINGS_SHARED_REMOTE_H_ 310