1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef MOJO_CORE_NODE_CONTROLLER_H_
6 #define MOJO_CORE_NODE_CONTROLLER_H_
7 
8 #include <map>
9 #include <memory>
10 #include <string>
11 #include <unordered_map>
12 #include <unordered_set>
13 #include <utility>
14 #include <vector>
15 
16 #include "base/callback.h"
17 #include "base/containers/queue.h"
18 #include "base/containers/span.h"
19 #include "base/macros.h"
20 #include "base/memory/ref_counted.h"
21 #include "base/memory/writable_shared_memory_region.h"
22 #include "base/optional.h"
23 #include "base/single_thread_task_runner.h"
24 #include "build/build_config.h"
25 #include "mojo/core/atomic_flag.h"
26 #include "mojo/core/node_channel.h"
27 #include "mojo/core/ports/event.h"
28 #include "mojo/core/ports/name.h"
29 #include "mojo/core/ports/node.h"
30 #include "mojo/core/ports/node_delegate.h"
31 #include "mojo/core/scoped_process_handle.h"
32 #include "mojo/core/system_impl_export.h"
33 #include "mojo/public/cpp/platform/platform_handle.h"
34 
35 namespace mojo {
36 namespace core {
37 
38 class Broker;
39 class Core;
40 
41 // The owner of ports::Node which facilitates core EDK implementation. All
42 // public interface methods are safe to call from any thread.
43 class MOJO_SYSTEM_IMPL_EXPORT NodeController : public ports::NodeDelegate,
44                                                public NodeChannel::Delegate {
45  public:
46   class PortObserver : public ports::UserData {
47    public:
48     virtual void OnPortStatusChanged() = 0;
49 
50    protected:
~PortObserver()51     ~PortObserver() override {}
52   };
53 
54   // |core| owns and out-lives us.
55   explicit NodeController(Core* core);
56   ~NodeController() override;
57 
name()58   const ports::NodeName& name() const { return name_; }
core()59   Core* core() const { return core_; }
node()60   ports::Node* node() const { return node_.get(); }
io_task_runner()61   scoped_refptr<base::SingleThreadTaskRunner> io_task_runner() const {
62     return io_task_runner_;
63   }
64 
65   // Called exactly once, shortly after construction, and before any other
66   // methods are called on this object.
67   void SetIOTaskRunner(
68       scoped_refptr<base::SingleThreadTaskRunner> io_task_runner);
69 
70   // Sends an invitation to a remote process (via |connection_params|) to join
71   // this process's graph of connected processes as a broker client.
72   void SendBrokerClientInvitation(
73       base::ProcessHandle target_process,
74       ConnectionParams connection_params,
75       const std::vector<std::pair<std::string, ports::PortRef>>& attached_ports,
76       const ProcessErrorCallback& process_error_callback);
77 
78   // Connects this node to the process which invited it to be a broker client.
79   void AcceptBrokerClientInvitation(ConnectionParams connection_params);
80 
81   // Connects this node to a peer node. On success, |port| will be merged with
82   // the corresponding port in the peer node.
83   void ConnectIsolated(ConnectionParams connection_params,
84                        const ports::PortRef& port,
85                        base::StringPiece connection_name);
86 
87   // Sets a port's observer. If |observer| is null the port's current observer
88   // is removed.
89   void SetPortObserver(const ports::PortRef& port,
90                        scoped_refptr<PortObserver> observer);
91 
92   // Closes a port. Use this in lieu of calling Node::ClosePort() directly, as
93   // it ensures the port's observer has also been removed.
94   void ClosePort(const ports::PortRef& port);
95 
96   // Sends a message on a port to its peer.
97   int SendUserMessage(const ports::PortRef& port_ref,
98                       std::unique_ptr<ports::UserMessageEvent> message);
99 
100   // Merges a local port |port| into a port reserved by |name| in the node which
101   // invited this node.
102   void MergePortIntoInviter(const std::string& name,
103                             const ports::PortRef& port);
104 
105   // Merges two local ports together.
106   int MergeLocalPorts(const ports::PortRef& port0, const ports::PortRef& port1);
107 
108   // Creates a new shared buffer for use in the current process.
109   base::WritableSharedMemoryRegion CreateSharedBuffer(size_t num_bytes);
110 
111   // Request that the Node be shut down cleanly. This may take an arbitrarily
112   // long time to complete, at which point |callback| will be called.
113   //
114   // Note that while it is safe to continue using the NodeController's public
115   // interface after requesting shutdown, you do so at your own risk and there
116   // is NO guarantee that new messages will be sent or ports will complete
117   // transfer.
118   void RequestShutdown(base::OnceClosure callback);
119 
120   // Notifies the NodeController that we received a bad message from the given
121   // node.  To avoid losing error reports the caller should ensure that the
122   // source node |HasBadMessageHandler| before calling |NotifyBadMessageFrom|.
123   void NotifyBadMessageFrom(const ports::NodeName& source_node,
124                             const std::string& error);
125 
126   // Returns whether |source_node| exists and has a bad message handler.
127   bool HasBadMessageHandler(const ports::NodeName& source_node);
128 
129   // Force-closes the connection to another process to simulate connection
130   // failures for testing. |process_id| must correspond to a process to which
131   // this node has an active NodeChannel.
132   void ForceDisconnectProcessForTesting(base::ProcessId process_id);
133 
134   static void DeserializeRawBytesAsEventForFuzzer(
135       base::span<const unsigned char> data);
136   static void DeserializeMessageAsEventForFuzzer(Channel::MessagePtr message);
137 
138  private:
139   friend Core;
140 
141   using NodeMap =
142       std::unordered_map<ports::NodeName, scoped_refptr<NodeChannel>>;
143   using OutgoingMessageQueue = base::queue<Channel::MessagePtr>;
144   using PortMap = std::map<std::string, ports::PortRef>;
145 
146   struct IsolatedConnection {
147     IsolatedConnection();
148     IsolatedConnection(const IsolatedConnection& other);
149     IsolatedConnection(IsolatedConnection&& other);
150     IsolatedConnection(scoped_refptr<NodeChannel> channel,
151                        const ports::PortRef& local_port,
152                        base::StringPiece name);
153     ~IsolatedConnection();
154 
155     IsolatedConnection& operator=(const IsolatedConnection& other);
156     IsolatedConnection& operator=(IsolatedConnection&& other);
157 
158     // NOTE: |channel| is null once the connection is fully established.
159     scoped_refptr<NodeChannel> channel;
160     ports::PortRef local_port;
161     std::string name;
162   };
163 
164   void SendBrokerClientInvitationOnIOThread(
165       ScopedProcessHandle target_process,
166       ConnectionParams connection_params,
167       ports::NodeName token,
168       const ProcessErrorCallback& process_error_callback);
169   void AcceptBrokerClientInvitationOnIOThread(
170       ConnectionParams connection_params,
171       base::Optional<PlatformHandle> broker_host_handle);
172 
173   void ConnectIsolatedOnIOThread(ConnectionParams connection_params,
174                                  ports::PortRef port,
175                                  const std::string& connection_name);
176 
177   scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name);
178   scoped_refptr<NodeChannel> GetInviterChannel();
179   scoped_refptr<NodeChannel> GetBrokerChannel();
180 
181   void AddPeer(const ports::NodeName& name,
182                scoped_refptr<NodeChannel> channel,
183                bool start_channel);
184   void DropPeer(const ports::NodeName& name, NodeChannel* channel);
185   void SendPeerEvent(const ports::NodeName& name, ports::ScopedEvent event);
186   void DropAllPeers();
187 
188   // ports::NodeDelegate:
189   void ForwardEvent(const ports::NodeName& node,
190                     ports::ScopedEvent event) override;
191   void BroadcastEvent(ports::ScopedEvent event) override;
192   void PortStatusChanged(const ports::PortRef& port) override;
193 
194   // NodeChannel::Delegate:
195   void OnAcceptInvitee(const ports::NodeName& from_node,
196                        const ports::NodeName& inviter_name,
197                        const ports::NodeName& token) override;
198   void OnAcceptInvitation(const ports::NodeName& from_node,
199                           const ports::NodeName& token,
200                           const ports::NodeName& invitee_name) override;
201   void OnAddBrokerClient(const ports::NodeName& from_node,
202                          const ports::NodeName& client_name,
203                          base::ProcessHandle process_handle) override;
204   void OnBrokerClientAdded(const ports::NodeName& from_node,
205                            const ports::NodeName& client_name,
206                            PlatformHandle broker_channel) override;
207   void OnAcceptBrokerClient(const ports::NodeName& from_node,
208                             const ports::NodeName& broker_name,
209                             PlatformHandle broker_channel) override;
210   void OnEventMessage(const ports::NodeName& from_node,
211                       Channel::MessagePtr message) override;
212   void OnRequestPortMerge(const ports::NodeName& from_node,
213                           const ports::PortName& connector_port_name,
214                           const std::string& token) override;
215   void OnRequestIntroduction(const ports::NodeName& from_node,
216                              const ports::NodeName& name) override;
217   void OnIntroduce(const ports::NodeName& from_node,
218                    const ports::NodeName& name,
219                    PlatformHandle channel_handle) override;
220   void OnBroadcast(const ports::NodeName& from_node,
221                    Channel::MessagePtr message) override;
222 #if defined(OS_WIN)
223   void OnRelayEventMessage(const ports::NodeName& from_node,
224                            base::ProcessHandle from_process,
225                            const ports::NodeName& destination,
226                            Channel::MessagePtr message) override;
227   void OnEventMessageFromRelay(const ports::NodeName& from_node,
228                                const ports::NodeName& source_node,
229                                Channel::MessagePtr message) override;
230 #endif
231   void OnAcceptPeer(const ports::NodeName& from_node,
232                     const ports::NodeName& token,
233                     const ports::NodeName& peer_name,
234                     const ports::PortName& port_name) override;
235   void OnChannelError(const ports::NodeName& from_node,
236                       NodeChannel* channel) override;
237 
238   // Cancels all pending port merges. These are merges which are supposed to
239   // be requested from the inviter ASAP, and they may be cancelled if the
240   // connection to the inviter is broken or never established.
241   void CancelPendingPortMerges();
242 
243   // Marks this NodeController for destruction when the IO thread shuts down.
244   // This is used in case Core is torn down before the IO thread. Must only be
245   // called on the IO thread.
246   void DestroyOnIOThreadShutdown();
247 
248   // If there is a registered shutdown callback (meaning shutdown has been
249   // requested, this checks the Node's status to see if clean shutdown is
250   // possible. If so, shutdown is performed and the shutdown callback is run.
251   void AttemptShutdownIfRequested();
252 
253   // See |ForceDisconnectProcessForTesting()|.
254   void ForceDisconnectProcessForTestingOnIOThread(base::ProcessId process_id);
255 
256   // These are safe to access from any thread as long as the Node is alive.
257   Core* const core_;
258   const ports::NodeName name_;
259   const std::unique_ptr<ports::Node> node_;
260   scoped_refptr<base::SingleThreadTaskRunner> io_task_runner_;
261 
262   // Guards |peers_| and |pending_peer_messages_|.
263   base::Lock peers_lock_;
264 
265   // Channels to known peers, including inviter and invitees, if any.
266   NodeMap peers_;
267 
268   // Outgoing message queues for peers we've heard of but can't yet talk to.
269   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
270       pending_peer_messages_;
271 
272   // Guards |reserved_ports_|.
273   base::Lock reserved_ports_lock_;
274 
275   // Ports reserved by name, per peer.
276   std::map<ports::NodeName, PortMap> reserved_ports_;
277 
278   // Guards |pending_port_merges_| and |reject_pending_merges_|.
279   base::Lock pending_port_merges_lock_;
280 
281   // A set of port merge requests awaiting inviter connection.
282   std::vector<std::pair<std::string, ports::PortRef>> pending_port_merges_;
283 
284   // Indicates that new merge requests should be rejected because the inviter
285   // has disconnected.
286   bool reject_pending_merges_ = false;
287 
288   // Guards |inviter_name_| and |bootstrap_inviter_channel_|.
289   base::Lock inviter_lock_;
290 
291   // The name of the node which invited us to join its network, if any.
292   ports::NodeName inviter_name_;
293 
294   // A temporary reference to the inviter channel before we know their name.
295   scoped_refptr<NodeChannel> bootstrap_inviter_channel_;
296 
297   // Guards |broker_name_|, |pending_broker_clients_|, and
298   // |pending_relay_messages_|.
299   base::Lock broker_lock_;
300 
301   // The name of our broker node, if any.
302   ports::NodeName broker_name_;
303 
304   // A queue of remote broker clients waiting to be connected to the broker.
305   base::queue<ports::NodeName> pending_broker_clients_;
306 
307   // Messages waiting to be relayed by the broker once it's known.
308   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
309       pending_relay_messages_;
310 
311   // Guards |shutdown_callback_|.
312   base::Lock shutdown_lock_;
313 
314   // Set by RequestShutdown(). If this is non-null, the controller will
315   // begin polling the Node to see if clean shutdown is possible any time the
316   // Node's state is modified by the controller.
317   base::OnceClosure shutdown_callback_;
318   // Flag to fast-path checking |shutdown_callback_|.
319   AtomicFlag shutdown_callback_flag_;
320 
321   // All other fields below must only be accessed on the I/O thread, i.e., the
322   // thread on which core_->io_task_runner() runs tasks.
323 
324   // Channels to invitees during handshake.
325   NodeMap pending_invitations_;
326 
327   std::map<ports::NodeName, IsolatedConnection> pending_isolated_connections_;
328   std::map<std::string, ports::NodeName> named_isolated_connections_;
329 
330   // Indicates whether this object should delete itself on IO thread shutdown.
331   // Must only be accessed from the IO thread.
332   bool destroy_on_io_thread_shutdown_ = false;
333 
334 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) && !defined(OS_FUCHSIA)
335   // Broker for sync shared buffer creation on behalf of broker clients.
336   std::unique_ptr<Broker> broker_;
337 #endif
338 
339   DISALLOW_COPY_AND_ASSIGN(NodeController);
340 };
341 
342 }  // namespace core
343 }  // namespace mojo
344 
345 #endif  // MOJO_CORE_NODE_CONTROLLER_H_
346