1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef MOJO_CORE_PORTS_NODE_H_
6 #define MOJO_CORE_PORTS_NODE_H_
7 
8 #include <stddef.h>
9 #include <stdint.h>
10 
11 #include <queue>
12 #include <unordered_map>
13 
14 #include "base/component_export.h"
15 #include "base/containers/flat_map.h"
16 #include "base/macros.h"
17 #include "base/memory/ref_counted.h"
18 #include "base/synchronization/lock.h"
19 #include "mojo/core/ports/event.h"
20 #include "mojo/core/ports/name.h"
21 #include "mojo/core/ports/port.h"
22 #include "mojo/core/ports/port_ref.h"
23 #include "mojo/core/ports/user_data.h"
24 
25 namespace mojo {
26 namespace core {
27 namespace ports {
28 
29 enum : int {
30   OK = 0,
31   ERROR_PORT_UNKNOWN = -10,
32   ERROR_PORT_EXISTS = -11,
33   ERROR_PORT_STATE_UNEXPECTED = -12,
34   ERROR_PORT_CANNOT_SEND_SELF = -13,
35   ERROR_PORT_PEER_CLOSED = -14,
36   ERROR_PORT_CANNOT_SEND_PEER = -15,
37   ERROR_NOT_IMPLEMENTED = -100,
38 };
39 
40 struct PortStatus {
41   bool has_messages;
42   bool receiving_messages;
43   bool peer_closed;
44   bool peer_remote;
45   size_t queued_message_count;
46   size_t queued_num_bytes;
47   size_t unacknowledged_message_count;
48 };
49 
50 class MessageFilter;
51 class NodeDelegate;
52 
53 // A Node maintains a collection of Ports (see port.h) indexed by unique 128-bit
54 // addresses (names), performing routing and processing of events among the
55 // Ports within the Node and to or from other Nodes in the system. Typically
56 // (and practically, in all uses today) there is a single Node per system
57 // process. Thus a Node boundary effectively models a process boundary.
58 //
59 // New Ports can be created uninitialized using CreateUninitializedPort (and
60 // later initialized using InitializePort), or created in a fully initialized
61 // state using CreatePortPair(). Initialized ports have exactly one conjugate
62 // port which is the ultimate receiver of any user messages sent by that port.
63 // See SendUserMessage().
64 //
65 // In addition to routing user message events, various control events are used
66 // by Nodes to coordinate Port behavior and lifetime within and across Nodes.
67 // See Event documentation for description of different types of events used by
68 // a Node to coordinate behavior.
COMPONENT_EXPORT(MOJO_CORE_PORTS)69 class COMPONENT_EXPORT(MOJO_CORE_PORTS) Node {
70  public:
71   enum class ShutdownPolicy {
72     DONT_ALLOW_LOCAL_PORTS,
73     ALLOW_LOCAL_PORTS,
74   };
75 
76   // Does not take ownership of the delegate.
77   Node(const NodeName& name, NodeDelegate* delegate);
78   ~Node();
79 
80   // Returns true iff there are no open ports referring to another node or ports
81   // in the process of being transferred from this node to another. If this
82   // returns false, then to ensure clean shutdown, it is necessary to keep the
83   // node alive and continue routing messages to it via AcceptMessage. This
84   // method may be called again after AcceptMessage to check if the Node is now
85   // ready to be destroyed.
86   //
87   // If |policy| is set to |ShutdownPolicy::ALLOW_LOCAL_PORTS|, this will return
88   // |true| even if some ports remain alive, as long as none of them are proxies
89   // to another node.
90   bool CanShutdownCleanly(
91       ShutdownPolicy policy = ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS);
92 
93   // Lookup the named port.
94   int GetPort(const PortName& port_name, PortRef* port_ref);
95 
96   // Creates a port on this node. Before the port can be used, it must be
97   // initialized using InitializePort. This method is useful for bootstrapping
98   // a connection between two nodes. Generally, ports are created using
99   // CreatePortPair instead.
100   int CreateUninitializedPort(PortRef* port_ref);
101 
102   // Initializes a newly created port.
103   int InitializePort(const PortRef& port_ref,
104                      const NodeName& peer_node_name,
105                      const PortName& peer_port_name);
106 
107   // Generates a new connected pair of ports bound to this node. These ports
108   // are initialized and ready to go.
109   int CreatePortPair(PortRef* port0_ref, PortRef* port1_ref);
110 
111   // User data associated with the port.
112   int SetUserData(const PortRef& port_ref, scoped_refptr<UserData> user_data);
113   int GetUserData(const PortRef& port_ref, scoped_refptr<UserData>* user_data);
114 
115   // Prevents further messages from being sent from this port or delivered to
116   // this port. The port is removed, and the port's peer is notified of the
117   // closure after it has consumed all pending messages.
118   int ClosePort(const PortRef& port_ref);
119 
120   // Returns the current status of the port.
121   int GetStatus(const PortRef& port_ref, PortStatus* port_status);
122 
123   // Returns the next available message on the specified port or returns a null
124   // message if there are none available. Returns ERROR_PORT_PEER_CLOSED to
125   // indicate that this port's peer has closed. In such cases GetMessage may
126   // be called until it yields a null message, indicating that no more messages
127   // may be read from the port.
128   //
129   // If |filter| is non-null, the next available message is returned only if it
130   // is matched by the filter. If the provided filter does not match the next
131   // available message, GetMessage() behaves as if there is no message
132   // available. Ownership of |filter| is not taken, and it must outlive the
133   // extent of this call.
134   int GetMessage(const PortRef& port_ref,
135                  std::unique_ptr<UserMessageEvent>* message,
136                  MessageFilter* filter);
137 
138   // Sends a message from the specified port to its peer. Note that the message
139   // notification may arrive synchronously (via PortStatusChanged() on the
140   // delegate) if the peer is local to this Node.
141   int SendUserMessage(const PortRef& port_ref,
142                       std::unique_ptr<UserMessageEvent> message);
143 
144   // Makes the port send acknowledge requests to its conjugate to acknowledge
145   // at least every |sequence_number_acknowledge_interval| messages as they're
146   // read from the conjugate. The number of unacknowledged messages is exposed
147   // in the |unacknowledged_message_count| field of PortStatus. This allows
148   // bounding the number of unread and/or in-transit messages from this port
149   // to its conjugate between zero and |unacknowledged_message_count|.
150   int SetAcknowledgeRequestInterval(
151       const PortRef& port_ref,
152       uint64_t sequence_number_acknowledge_interval);
153 
154   // Corresponding to NodeDelegate::ForwardEvent.
155   int AcceptEvent(ScopedEvent event);
156 
157   // Called to merge two ports with each other. If you have two independent
158   // port pairs A <=> B and C <=> D, the net result of merging B and C is a
159   // single connected port pair A <=> D.
160   //
161   // Note that the behavior of this operation is undefined if either port to be
162   // merged (B or C above) has ever been read from or written to directly, and
163   // this must ONLY be called on one side of the merge, though it doesn't matter
164   // which side.
165   //
166   // It is safe for the non-merged peers (A and D above) to be transferred,
167   // closed, and/or written to before, during, or after the merge.
168   int MergePorts(const PortRef& port_ref,
169                  const NodeName& destination_node_name,
170                  const PortName& destination_port_name);
171 
172   // Like above but merges two ports local to this node. Because both ports are
173   // local this can also verify that neither port has been written to before the
174   // merge. If this fails for any reason, both ports are closed. Otherwise OK
175   // is returned and the ports' receiving peers are connected to each other.
176   int MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref);
177 
178   // Called to inform this node that communication with another node is lost
179   // indefinitely. This triggers cleanup of ports bound to this node.
180   int LostConnectionToNode(const NodeName& node_name);
181 
182  private:
183   // Helper to ensure that a Node always calls into its delegate safely, i.e.
184   // without holding any internal locks.
185   class DelegateHolder {
186    public:
187     DelegateHolder(Node* node, NodeDelegate* delegate);
188     ~DelegateHolder();
189 
190     NodeDelegate* operator->() const {
191       EnsureSafeDelegateAccess();
192       return delegate_;
193     }
194 
195    private:
196 #if DCHECK_IS_ON()
197     void EnsureSafeDelegateAccess() const;
198 #else
199     void EnsureSafeDelegateAccess() const {}
200 #endif
201 
202     Node* const node_;
203     NodeDelegate* const delegate_;
204 
205     DISALLOW_COPY_AND_ASSIGN(DelegateHolder);
206   };
207 
208   int OnUserMessage(std::unique_ptr<UserMessageEvent> message);
209   int OnPortAccepted(std::unique_ptr<PortAcceptedEvent> event);
210   int OnObserveProxy(std::unique_ptr<ObserveProxyEvent> event);
211   int OnObserveProxyAck(std::unique_ptr<ObserveProxyAckEvent> event);
212   int OnObserveClosure(std::unique_ptr<ObserveClosureEvent> event);
213   int OnMergePort(std::unique_ptr<MergePortEvent> event);
214   int OnUserMessageReadAckRequest(
215       std::unique_ptr<UserMessageReadAckRequestEvent> event);
216   int OnUserMessageReadAck(std::unique_ptr<UserMessageReadAckEvent> event);
217 
218   int AddPortWithName(const PortName& port_name, scoped_refptr<Port> port);
219   void ErasePort(const PortName& port_name);
220 
221   int SendUserMessageInternal(const PortRef& port_ref,
222                               std::unique_ptr<UserMessageEvent>* message);
223   int MergePortsInternal(const PortRef& port0_ref,
224                          const PortRef& port1_ref,
225                          bool allow_close_on_bad_state);
226   void ConvertToProxy(Port* port,
227                       const NodeName& to_node_name,
228                       PortName* port_name,
229                       Event::PortDescriptor* port_descriptor);
230   int AcceptPort(const PortName& port_name,
231                  const Event::PortDescriptor& port_descriptor);
232 
233   int PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
234                                   Port::State expected_port_state,
235                                   bool ignore_closed_peer,
236                                   UserMessageEvent* message,
237                                   NodeName* forward_to_node);
238   int BeginProxying(const PortRef& port_ref);
239   int ForwardUserMessagesFromProxy(const PortRef& port_ref);
240   void InitiateProxyRemoval(const PortRef& port_ref);
241   void TryRemoveProxy(const PortRef& port_ref);
242   void DestroyAllPortsWithPeer(const NodeName& node_name,
243                                const PortName& port_name);
244 
245   // Changes the peer node and port name referenced by |port|. Note that both
246   // |ports_lock_| MUST be held through the extent of this method.
247   // |local_port|'s lock must be held if and only if a reference to |local_port|
248   // exist in |ports_|.
249   void UpdatePortPeerAddress(const PortName& local_port_name,
250                              Port* local_port,
251                              const NodeName& new_peer_node,
252                              const PortName& new_peer_port);
253 
254   // Removes an entry from |peer_port_map_| corresponding to |local_port|'s peer
255   // address, if valid.
256   void RemoveFromPeerPortMap(const PortName& local_port_name, Port* local_port);
257 
258   // Swaps the peer information for two local ports. Used during port merges.
259   // Note that |ports_lock_| must be held along with each of the two port's own
260   // locks, through the extent of this method.
261   void SwapPortPeers(const PortName& port0_name,
262                      Port* port0,
263                      const PortName& port1_name,
264                      Port* port1);
265 
266   // Sends an acknowledge request to the peer if the port has a non-zero
267   // |sequence_num_acknowledge_interval|. This needs to be done when the port's
268   // peer changes, as the previous peer proxy may not have forwarded any prior
269   // acknowledge request before deleting itself.
270   void MaybeResendAckRequest(const PortRef& port_ref);
271 
272   // Forwards a stored acknowledge request to the peer if the proxy has a
273   // non-zero |sequence_num_acknowledge_interval|.
274   void MaybeForwardAckRequest(const PortRef& port_ref);
275 
276   // Sends an acknowledge of the most recently read sequence number to the peer
277   // if any messages have been read, and the port has a non-zero
278   // |sequence_num_to_acknowledge|.
279   void MaybeResendAck(const PortRef& port_ref);
280 
281   const NodeName name_;
282   const DelegateHolder delegate_;
283 
284   // Just to clarify readability of the types below.
285   using LocalPortName = PortName;
286   using PeerPortName = PortName;
287 
288   // Guards access to |ports_| and |peer_port_maps_| below.
289   //
290   // This must never be acquired while an individual port's lock is held on the
291   // same thread. Conversely, individual port locks may be acquired while this
292   // one is held.
293   //
294   // Because UserMessage events may execute arbitrary user code during
295   // destruction, it is also important to ensure that such events are never
296   // destroyed while this (or any individual Port) lock is held.
297   base::Lock ports_lock_;
298   std::unordered_map<LocalPortName, scoped_refptr<Port>> ports_;
299 
300   // Maps a peer port name to a list of PortRefs for all local ports which have
301   // the port name key designated as their peer port. The set of local ports
302   // which have the same peer port is expected to always be relatively small and
303   // usually 1. Hence we just use a flat_map of local PortRefs keyed on each
304   // local port's name.
305   using PeerPortMap =
306       std::unordered_map<PeerPortName, base::flat_map<LocalPortName, PortRef>>;
307 
308   // A reverse mapping which can be used to find all local ports that reference
309   // a given peer node or a local port that references a specific given peer
310   // port on a peer node. The key to this map is the corresponding peer node
311   // name.
312   std::unordered_map<NodeName, PeerPortMap> peer_port_maps_;
313 
314   DISALLOW_COPY_AND_ASSIGN(Node);
315 };
316 
317 }  // namespace ports
318 }  // namespace core
319 }  // namespace mojo
320 
321 #endif  // MOJO_CORE_PORTS_NODE_H_
322