1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef MOJO_CORE_NODE_CONTROLLER_H_ 6 #define MOJO_CORE_NODE_CONTROLLER_H_ 7 8 #include <map> 9 #include <memory> 10 #include <string> 11 #include <unordered_map> 12 #include <unordered_set> 13 #include <utility> 14 #include <vector> 15 16 #include "base/callback.h" 17 #include "base/containers/queue.h" 18 #include "base/containers/span.h" 19 #include "base/macros.h" 20 #include "base/memory/ref_counted.h" 21 #include "base/memory/writable_shared_memory_region.h" 22 #include "base/optional.h" 23 #include "base/single_thread_task_runner.h" 24 #include "build/build_config.h" 25 #include "mojo/core/atomic_flag.h" 26 #include "mojo/core/node_channel.h" 27 #include "mojo/core/ports/event.h" 28 #include "mojo/core/ports/name.h" 29 #include "mojo/core/ports/node.h" 30 #include "mojo/core/ports/node_delegate.h" 31 #include "mojo/core/scoped_process_handle.h" 32 #include "mojo/core/system_impl_export.h" 33 #include "mojo/public/cpp/platform/platform_handle.h" 34 35 namespace mojo { 36 namespace core { 37 38 class Broker; 39 class Core; 40 41 // The owner of ports::Node which facilitates core EDK implementation. All 42 // public interface methods are safe to call from any thread. 43 class MOJO_SYSTEM_IMPL_EXPORT NodeController : public ports::NodeDelegate, 44 public NodeChannel::Delegate { 45 public: 46 class PortObserver : public ports::UserData { 47 public: 48 virtual void OnPortStatusChanged() = 0; 49 50 protected: ~PortObserver()51 ~PortObserver() override {} 52 }; 53 54 // |core| owns and out-lives us. 55 explicit NodeController(Core* core); 56 ~NodeController() override; 57 name()58 const ports::NodeName& name() const { return name_; } core()59 Core* core() const { return core_; } node()60 ports::Node* node() const { return node_.get(); } io_task_runner()61 scoped_refptr<base::SingleThreadTaskRunner> io_task_runner() const { 62 return io_task_runner_; 63 } 64 65 // Called exactly once, shortly after construction, and before any other 66 // methods are called on this object. 67 void SetIOTaskRunner( 68 scoped_refptr<base::SingleThreadTaskRunner> io_task_runner); 69 70 // Sends an invitation to a remote process (via |connection_params|) to join 71 // this process's graph of connected processes as a broker client. 72 void SendBrokerClientInvitation( 73 base::ProcessHandle target_process, 74 ConnectionParams connection_params, 75 const std::vector<std::pair<std::string, ports::PortRef>>& attached_ports, 76 const ProcessErrorCallback& process_error_callback); 77 78 // Connects this node to the process which invited it to be a broker client. 79 void AcceptBrokerClientInvitation(ConnectionParams connection_params); 80 81 // Connects this node to a peer node. On success, |port| will be merged with 82 // the corresponding port in the peer node. 83 void ConnectIsolated(ConnectionParams connection_params, 84 const ports::PortRef& port, 85 base::StringPiece connection_name); 86 87 // Sets a port's observer. If |observer| is null the port's current observer 88 // is removed. 89 void SetPortObserver(const ports::PortRef& port, 90 scoped_refptr<PortObserver> observer); 91 92 // Closes a port. Use this in lieu of calling Node::ClosePort() directly, as 93 // it ensures the port's observer has also been removed. 94 void ClosePort(const ports::PortRef& port); 95 96 // Sends a message on a port to its peer. 97 int SendUserMessage(const ports::PortRef& port_ref, 98 std::unique_ptr<ports::UserMessageEvent> message); 99 100 // Merges a local port |port| into a port reserved by |name| in the node which 101 // invited this node. 102 void MergePortIntoInviter(const std::string& name, 103 const ports::PortRef& port); 104 105 // Merges two local ports together. 106 int MergeLocalPorts(const ports::PortRef& port0, const ports::PortRef& port1); 107 108 // Creates a new shared buffer for use in the current process. 109 base::WritableSharedMemoryRegion CreateSharedBuffer(size_t num_bytes); 110 111 // Request that the Node be shut down cleanly. This may take an arbitrarily 112 // long time to complete, at which point |callback| will be called. 113 // 114 // Note that while it is safe to continue using the NodeController's public 115 // interface after requesting shutdown, you do so at your own risk and there 116 // is NO guarantee that new messages will be sent or ports will complete 117 // transfer. 118 void RequestShutdown(base::OnceClosure callback); 119 120 // Notifies the NodeController that we received a bad message from the given 121 // node. To avoid losing error reports the caller should ensure that the 122 // source node |HasBadMessageHandler| before calling |NotifyBadMessageFrom|. 123 void NotifyBadMessageFrom(const ports::NodeName& source_node, 124 const std::string& error); 125 126 // Returns whether |source_node| exists and has a bad message handler. 127 bool HasBadMessageHandler(const ports::NodeName& source_node); 128 129 // Force-closes the connection to another process to simulate connection 130 // failures for testing. |process_id| must correspond to a process to which 131 // this node has an active NodeChannel. 132 void ForceDisconnectProcessForTesting(base::ProcessId process_id); 133 134 static void DeserializeRawBytesAsEventForFuzzer( 135 base::span<const unsigned char> data); 136 static void DeserializeMessageAsEventForFuzzer(Channel::MessagePtr message); 137 138 private: 139 friend Core; 140 141 using NodeMap = 142 std::unordered_map<ports::NodeName, scoped_refptr<NodeChannel>>; 143 using OutgoingMessageQueue = base::queue<Channel::MessagePtr>; 144 using PortMap = std::map<std::string, ports::PortRef>; 145 146 struct IsolatedConnection { 147 IsolatedConnection(); 148 IsolatedConnection(const IsolatedConnection& other); 149 IsolatedConnection(IsolatedConnection&& other); 150 IsolatedConnection(scoped_refptr<NodeChannel> channel, 151 const ports::PortRef& local_port, 152 base::StringPiece name); 153 ~IsolatedConnection(); 154 155 IsolatedConnection& operator=(const IsolatedConnection& other); 156 IsolatedConnection& operator=(IsolatedConnection&& other); 157 158 // NOTE: |channel| is null once the connection is fully established. 159 scoped_refptr<NodeChannel> channel; 160 ports::PortRef local_port; 161 std::string name; 162 }; 163 164 void SendBrokerClientInvitationOnIOThread( 165 ScopedProcessHandle target_process, 166 ConnectionParams connection_params, 167 ports::NodeName token, 168 const ProcessErrorCallback& process_error_callback); 169 void AcceptBrokerClientInvitationOnIOThread( 170 ConnectionParams connection_params, 171 base::Optional<PlatformHandle> broker_host_handle); 172 173 void ConnectIsolatedOnIOThread(ConnectionParams connection_params, 174 ports::PortRef port, 175 const std::string& connection_name); 176 177 scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name); 178 scoped_refptr<NodeChannel> GetInviterChannel(); 179 scoped_refptr<NodeChannel> GetBrokerChannel(); 180 181 void AddPeer(const ports::NodeName& name, 182 scoped_refptr<NodeChannel> channel, 183 bool start_channel); 184 void DropPeer(const ports::NodeName& name, NodeChannel* channel); 185 void SendPeerEvent(const ports::NodeName& name, ports::ScopedEvent event); 186 void DropAllPeers(); 187 188 // ports::NodeDelegate: 189 void ForwardEvent(const ports::NodeName& node, 190 ports::ScopedEvent event) override; 191 void BroadcastEvent(ports::ScopedEvent event) override; 192 void PortStatusChanged(const ports::PortRef& port) override; 193 194 // NodeChannel::Delegate: 195 void OnAcceptInvitee(const ports::NodeName& from_node, 196 const ports::NodeName& inviter_name, 197 const ports::NodeName& token) override; 198 void OnAcceptInvitation(const ports::NodeName& from_node, 199 const ports::NodeName& token, 200 const ports::NodeName& invitee_name) override; 201 void OnAddBrokerClient(const ports::NodeName& from_node, 202 const ports::NodeName& client_name, 203 base::ProcessHandle process_handle) override; 204 void OnBrokerClientAdded(const ports::NodeName& from_node, 205 const ports::NodeName& client_name, 206 PlatformHandle broker_channel) override; 207 void OnAcceptBrokerClient(const ports::NodeName& from_node, 208 const ports::NodeName& broker_name, 209 PlatformHandle broker_channel) override; 210 void OnEventMessage(const ports::NodeName& from_node, 211 Channel::MessagePtr message) override; 212 void OnRequestPortMerge(const ports::NodeName& from_node, 213 const ports::PortName& connector_port_name, 214 const std::string& token) override; 215 void OnRequestIntroduction(const ports::NodeName& from_node, 216 const ports::NodeName& name) override; 217 void OnIntroduce(const ports::NodeName& from_node, 218 const ports::NodeName& name, 219 PlatformHandle channel_handle) override; 220 void OnBroadcast(const ports::NodeName& from_node, 221 Channel::MessagePtr message) override; 222 #if defined(OS_WIN) 223 void OnRelayEventMessage(const ports::NodeName& from_node, 224 base::ProcessHandle from_process, 225 const ports::NodeName& destination, 226 Channel::MessagePtr message) override; 227 void OnEventMessageFromRelay(const ports::NodeName& from_node, 228 const ports::NodeName& source_node, 229 Channel::MessagePtr message) override; 230 #endif 231 void OnAcceptPeer(const ports::NodeName& from_node, 232 const ports::NodeName& token, 233 const ports::NodeName& peer_name, 234 const ports::PortName& port_name) override; 235 void OnChannelError(const ports::NodeName& from_node, 236 NodeChannel* channel) override; 237 238 // Cancels all pending port merges. These are merges which are supposed to 239 // be requested from the inviter ASAP, and they may be cancelled if the 240 // connection to the inviter is broken or never established. 241 void CancelPendingPortMerges(); 242 243 // Marks this NodeController for destruction when the IO thread shuts down. 244 // This is used in case Core is torn down before the IO thread. Must only be 245 // called on the IO thread. 246 void DestroyOnIOThreadShutdown(); 247 248 // If there is a registered shutdown callback (meaning shutdown has been 249 // requested, this checks the Node's status to see if clean shutdown is 250 // possible. If so, shutdown is performed and the shutdown callback is run. 251 void AttemptShutdownIfRequested(); 252 253 // See |ForceDisconnectProcessForTesting()|. 254 void ForceDisconnectProcessForTestingOnIOThread(base::ProcessId process_id); 255 256 // These are safe to access from any thread as long as the Node is alive. 257 Core* const core_; 258 const ports::NodeName name_; 259 const std::unique_ptr<ports::Node> node_; 260 scoped_refptr<base::SingleThreadTaskRunner> io_task_runner_; 261 262 // Guards |peers_| and |pending_peer_messages_|. 263 base::Lock peers_lock_; 264 265 // Channels to known peers, including inviter and invitees, if any. 266 NodeMap peers_; 267 268 // Outgoing message queues for peers we've heard of but can't yet talk to. 269 std::unordered_map<ports::NodeName, OutgoingMessageQueue> 270 pending_peer_messages_; 271 272 // Guards |reserved_ports_|. 273 base::Lock reserved_ports_lock_; 274 275 // Ports reserved by name, per peer. 276 std::map<ports::NodeName, PortMap> reserved_ports_; 277 278 // Guards |pending_port_merges_| and |reject_pending_merges_|. 279 base::Lock pending_port_merges_lock_; 280 281 // A set of port merge requests awaiting inviter connection. 282 std::vector<std::pair<std::string, ports::PortRef>> pending_port_merges_; 283 284 // Indicates that new merge requests should be rejected because the inviter 285 // has disconnected. 286 bool reject_pending_merges_ = false; 287 288 // Guards |inviter_name_| and |bootstrap_inviter_channel_|. 289 base::Lock inviter_lock_; 290 291 // The name of the node which invited us to join its network, if any. 292 ports::NodeName inviter_name_; 293 294 // A temporary reference to the inviter channel before we know their name. 295 scoped_refptr<NodeChannel> bootstrap_inviter_channel_; 296 297 // Guards |broker_name_|, |pending_broker_clients_|, and 298 // |pending_relay_messages_|. 299 base::Lock broker_lock_; 300 301 // The name of our broker node, if any. 302 ports::NodeName broker_name_; 303 304 // A queue of remote broker clients waiting to be connected to the broker. 305 base::queue<ports::NodeName> pending_broker_clients_; 306 307 // Messages waiting to be relayed by the broker once it's known. 308 std::unordered_map<ports::NodeName, OutgoingMessageQueue> 309 pending_relay_messages_; 310 311 // Guards |shutdown_callback_|. 312 base::Lock shutdown_lock_; 313 314 // Set by RequestShutdown(). If this is non-null, the controller will 315 // begin polling the Node to see if clean shutdown is possible any time the 316 // Node's state is modified by the controller. 317 base::OnceClosure shutdown_callback_; 318 // Flag to fast-path checking |shutdown_callback_|. 319 AtomicFlag shutdown_callback_flag_; 320 321 // All other fields below must only be accessed on the I/O thread, i.e., the 322 // thread on which core_->io_task_runner() runs tasks. 323 324 // Channels to invitees during handshake. 325 NodeMap pending_invitations_; 326 327 std::map<ports::NodeName, IsolatedConnection> pending_isolated_connections_; 328 std::map<std::string, ports::NodeName> named_isolated_connections_; 329 330 // Indicates whether this object should delete itself on IO thread shutdown. 331 // Must only be accessed from the IO thread. 332 bool destroy_on_io_thread_shutdown_ = false; 333 334 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) && !defined(OS_FUCHSIA) 335 // Broker for sync shared buffer creation on behalf of broker clients. 336 std::unique_ptr<Broker> broker_; 337 #endif 338 339 DISALLOW_COPY_AND_ASSIGN(NodeController); 340 }; 341 342 } // namespace core 343 } // namespace mojo 344 345 #endif // MOJO_CORE_NODE_CONTROLLER_H_ 346