1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/core/node_channel.h"
6 
7 #include <cstring>
8 #include <limits>
9 #include <sstream>
10 
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/memory/ptr_util.h"
15 #include "mojo/core/broker_host.h"
16 #include "mojo/core/channel.h"
17 #include "mojo/core/configuration.h"
18 #include "mojo/core/core.h"
19 #include "mojo/core/request_context.h"
20 
21 namespace mojo {
22 namespace core {
23 
24 namespace {
25 
26 // NOTE: Please ONLY append messages to the end of this enum.
27 enum class MessageType : uint32_t {
28   ACCEPT_INVITEE,
29   ACCEPT_INVITATION,
30   ADD_BROKER_CLIENT,
31   BROKER_CLIENT_ADDED,
32   ACCEPT_BROKER_CLIENT,
33   EVENT_MESSAGE,
34   REQUEST_PORT_MERGE,
35   REQUEST_INTRODUCTION,
36   INTRODUCE,
37 #if defined(OS_WIN)
38   RELAY_EVENT_MESSAGE,
39 #endif
40   BROADCAST_EVENT,
41 #if defined(OS_WIN)
42   EVENT_MESSAGE_FROM_RELAY,
43 #endif
44   ACCEPT_PEER,
45   BIND_BROKER_HOST,
46 };
47 
48 struct Header {
49   MessageType type;
50   uint32_t padding;
51 };
52 
53 static_assert(IsAlignedForChannelMessage(sizeof(Header)),
54               "Invalid header size.");
55 
56 struct AcceptInviteeData {
57   ports::NodeName inviter_name;
58   ports::NodeName token;
59 };
60 
61 struct AcceptInvitationData {
62   ports::NodeName token;
63   ports::NodeName invitee_name;
64 };
65 
66 struct AcceptPeerData {
67   ports::NodeName token;
68   ports::NodeName peer_name;
69   ports::PortName port_name;
70 };
71 
72 // This message may include a process handle on plaforms that require it.
73 struct AddBrokerClientData {
74   ports::NodeName client_name;
75 #if !defined(OS_WIN)
76   uint32_t process_handle;
77   uint32_t padding;
78 #endif
79 };
80 
81 #if !defined(OS_WIN)
82 static_assert(sizeof(base::ProcessHandle) == sizeof(uint32_t),
83               "Unexpected pid size");
84 static_assert(sizeof(AddBrokerClientData) % kChannelMessageAlignment == 0,
85               "Invalid AddBrokerClientData size.");
86 #endif
87 
88 // This data is followed by a platform channel handle to the broker.
89 struct BrokerClientAddedData {
90   ports::NodeName client_name;
91 };
92 
93 // This data may be followed by a platform channel handle to the broker. If not,
94 // then the inviter is the broker and its channel should be used as such.
95 struct AcceptBrokerClientData {
96   ports::NodeName broker_name;
97 };
98 
99 // This is followed by arbitrary payload data which is interpreted as a token
100 // string for port location.
101 struct RequestPortMergeData {
102   ports::PortName connector_port_name;
103 };
104 
105 // Used for both REQUEST_INTRODUCTION and INTRODUCE.
106 //
107 // For INTRODUCE the message also includes a valid platform handle for a channel
108 // the receiver may use to communicate with the named node directly, or an
109 // invalid platform handle if the node is unknown to the sender or otherwise
110 // cannot be introduced.
111 struct IntroductionData {
112   ports::NodeName name;
113 };
114 
115 // This message is just a PlatformHandle. The data struct here has only a
116 // padding field to ensure an aligned, non-zero-length payload.
117 struct BindBrokerHostData {
118   uint64_t padding;
119 };
120 
121 #if defined(OS_WIN)
122 // This struct is followed by the full payload of a message to be relayed.
123 struct RelayEventMessageData {
124   ports::NodeName destination;
125 };
126 
127 // This struct is followed by the full payload of a relayed message.
128 struct EventMessageFromRelayData {
129   ports::NodeName source;
130 };
131 #endif
132 
133 template <typename DataType>
CreateMessage(MessageType type,size_t payload_size,size_t num_handles,DataType ** out_data,size_t capacity=0)134 Channel::MessagePtr CreateMessage(MessageType type,
135                                   size_t payload_size,
136                                   size_t num_handles,
137                                   DataType** out_data,
138                                   size_t capacity = 0) {
139   const size_t total_size = payload_size + sizeof(Header);
140   if (capacity == 0)
141     capacity = total_size;
142   else
143     capacity = std::max(total_size, capacity);
144   auto message =
145       std::make_unique<Channel::Message>(capacity, total_size, num_handles);
146   Header* header = reinterpret_cast<Header*>(message->mutable_payload());
147   header->type = type;
148   header->padding = 0;
149   *out_data = reinterpret_cast<DataType*>(&header[1]);
150   return message;
151 }
152 
153 template <typename DataType>
GetMessagePayload(const void * bytes,size_t num_bytes,DataType ** out_data)154 bool GetMessagePayload(const void* bytes,
155                        size_t num_bytes,
156                        DataType** out_data) {
157   static_assert(sizeof(DataType) > 0, "DataType must have non-zero size.");
158   if (num_bytes < sizeof(Header) + sizeof(DataType))
159     return false;
160   *out_data = reinterpret_cast<const DataType*>(
161       static_cast<const char*>(bytes) + sizeof(Header));
162   return true;
163 }
164 
165 }  // namespace
166 
167 // static
Create(Delegate * delegate,ConnectionParams connection_params,Channel::HandlePolicy channel_handle_policy,scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,const ProcessErrorCallback & process_error_callback)168 scoped_refptr<NodeChannel> NodeChannel::Create(
169     Delegate* delegate,
170     ConnectionParams connection_params,
171     Channel::HandlePolicy channel_handle_policy,
172     scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,
173     const ProcessErrorCallback& process_error_callback) {
174 #if defined(OS_NACL_SFI)
175   LOG(FATAL) << "Multi-process not yet supported on NaCl-SFI";
176   return nullptr;
177 #else
178   return new NodeChannel(delegate, std::move(connection_params),
179                          channel_handle_policy, io_task_runner,
180                          process_error_callback);
181 #endif
182 }
183 
184 // static
CreateEventMessage(size_t capacity,size_t payload_size,void ** payload,size_t num_handles)185 Channel::MessagePtr NodeChannel::CreateEventMessage(size_t capacity,
186                                                     size_t payload_size,
187                                                     void** payload,
188                                                     size_t num_handles) {
189   return CreateMessage(MessageType::EVENT_MESSAGE, payload_size, num_handles,
190                        payload, capacity);
191 }
192 
193 // static
GetEventMessageData(Channel::Message * message,void ** data,size_t * num_data_bytes)194 void NodeChannel::GetEventMessageData(Channel::Message* message,
195                                       void** data,
196                                       size_t* num_data_bytes) {
197   // NOTE: OnChannelMessage guarantees that we never accept a Channel::Message
198   // with a payload of fewer than |sizeof(Header)| bytes.
199   *data = reinterpret_cast<Header*>(message->mutable_payload()) + 1;
200   *num_data_bytes = message->payload_size() - sizeof(Header);
201 }
202 
Start()203 void NodeChannel::Start() {
204   base::AutoLock lock(channel_lock_);
205   // ShutDown() may have already been called, in which case |channel_| is null.
206   if (channel_)
207     channel_->Start();
208 }
209 
ShutDown()210 void NodeChannel::ShutDown() {
211   base::AutoLock lock(channel_lock_);
212   if (channel_) {
213     channel_->ShutDown();
214     channel_ = nullptr;
215   }
216 }
217 
LeakHandleOnShutdown()218 void NodeChannel::LeakHandleOnShutdown() {
219   base::AutoLock lock(channel_lock_);
220   if (channel_) {
221     channel_->LeakHandle();
222   }
223 }
224 
NotifyBadMessage(const std::string & error)225 void NodeChannel::NotifyBadMessage(const std::string& error) {
226   DCHECK(HasBadMessageHandler());
227   process_error_callback_.Run("Received bad user message: " + error);
228 }
229 
SetRemoteProcessHandle(ScopedProcessHandle process_handle)230 void NodeChannel::SetRemoteProcessHandle(ScopedProcessHandle process_handle) {
231   DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
232   {
233     base::AutoLock lock(channel_lock_);
234     if (channel_)
235       channel_->set_remote_process(process_handle.Clone());
236   }
237   base::AutoLock lock(remote_process_handle_lock_);
238   DCHECK(!remote_process_handle_.is_valid());
239   CHECK_NE(remote_process_handle_.get(), base::GetCurrentProcessHandle());
240   remote_process_handle_ = std::move(process_handle);
241 }
242 
HasRemoteProcessHandle()243 bool NodeChannel::HasRemoteProcessHandle() {
244   base::AutoLock lock(remote_process_handle_lock_);
245   return remote_process_handle_.is_valid();
246 }
247 
CloneRemoteProcessHandle()248 ScopedProcessHandle NodeChannel::CloneRemoteProcessHandle() {
249   base::AutoLock lock(remote_process_handle_lock_);
250   if (!remote_process_handle_.is_valid())
251     return ScopedProcessHandle();
252   return remote_process_handle_.Clone();
253 }
254 
SetRemoteNodeName(const ports::NodeName & name)255 void NodeChannel::SetRemoteNodeName(const ports::NodeName& name) {
256   DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
257   remote_node_name_ = name;
258 }
259 
AcceptInvitee(const ports::NodeName & inviter_name,const ports::NodeName & token)260 void NodeChannel::AcceptInvitee(const ports::NodeName& inviter_name,
261                                 const ports::NodeName& token) {
262   AcceptInviteeData* data;
263   Channel::MessagePtr message = CreateMessage(
264       MessageType::ACCEPT_INVITEE, sizeof(AcceptInviteeData), 0, &data);
265   data->inviter_name = inviter_name;
266   data->token = token;
267   WriteChannelMessage(std::move(message));
268 }
269 
AcceptInvitation(const ports::NodeName & token,const ports::NodeName & invitee_name)270 void NodeChannel::AcceptInvitation(const ports::NodeName& token,
271                                    const ports::NodeName& invitee_name) {
272   AcceptInvitationData* data;
273   Channel::MessagePtr message = CreateMessage(
274       MessageType::ACCEPT_INVITATION, sizeof(AcceptInvitationData), 0, &data);
275   data->token = token;
276   data->invitee_name = invitee_name;
277   WriteChannelMessage(std::move(message));
278 }
279 
AcceptPeer(const ports::NodeName & sender_name,const ports::NodeName & token,const ports::PortName & port_name)280 void NodeChannel::AcceptPeer(const ports::NodeName& sender_name,
281                              const ports::NodeName& token,
282                              const ports::PortName& port_name) {
283   AcceptPeerData* data;
284   Channel::MessagePtr message =
285       CreateMessage(MessageType::ACCEPT_PEER, sizeof(AcceptPeerData), 0, &data);
286   data->token = token;
287   data->peer_name = sender_name;
288   data->port_name = port_name;
289   WriteChannelMessage(std::move(message));
290 }
291 
AddBrokerClient(const ports::NodeName & client_name,ScopedProcessHandle process_handle)292 void NodeChannel::AddBrokerClient(const ports::NodeName& client_name,
293                                   ScopedProcessHandle process_handle) {
294   AddBrokerClientData* data;
295   std::vector<PlatformHandle> handles;
296 #if defined(OS_WIN)
297   handles.emplace_back(base::win::ScopedHandle(process_handle.release()));
298 #endif
299   Channel::MessagePtr message =
300       CreateMessage(MessageType::ADD_BROKER_CLIENT, sizeof(AddBrokerClientData),
301                     handles.size(), &data);
302   message->SetHandles(std::move(handles));
303   data->client_name = client_name;
304 #if !defined(OS_WIN)
305   data->process_handle = process_handle.get();
306   data->padding = 0;
307 #endif
308   WriteChannelMessage(std::move(message));
309 }
310 
BrokerClientAdded(const ports::NodeName & client_name,PlatformHandle broker_channel)311 void NodeChannel::BrokerClientAdded(const ports::NodeName& client_name,
312                                     PlatformHandle broker_channel) {
313   BrokerClientAddedData* data;
314   std::vector<PlatformHandle> handles;
315   if (broker_channel.is_valid())
316     handles.emplace_back(std::move(broker_channel));
317   Channel::MessagePtr message =
318       CreateMessage(MessageType::BROKER_CLIENT_ADDED,
319                     sizeof(BrokerClientAddedData), handles.size(), &data);
320   message->SetHandles(std::move(handles));
321   data->client_name = client_name;
322   WriteChannelMessage(std::move(message));
323 }
324 
AcceptBrokerClient(const ports::NodeName & broker_name,PlatformHandle broker_channel)325 void NodeChannel::AcceptBrokerClient(const ports::NodeName& broker_name,
326                                      PlatformHandle broker_channel) {
327   AcceptBrokerClientData* data;
328   std::vector<PlatformHandle> handles;
329   if (broker_channel.is_valid())
330     handles.emplace_back(std::move(broker_channel));
331   Channel::MessagePtr message =
332       CreateMessage(MessageType::ACCEPT_BROKER_CLIENT,
333                     sizeof(AcceptBrokerClientData), handles.size(), &data);
334   message->SetHandles(std::move(handles));
335   data->broker_name = broker_name;
336   WriteChannelMessage(std::move(message));
337 }
338 
RequestPortMerge(const ports::PortName & connector_port_name,const std::string & token)339 void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name,
340                                    const std::string& token) {
341   RequestPortMergeData* data;
342   Channel::MessagePtr message =
343       CreateMessage(MessageType::REQUEST_PORT_MERGE,
344                     sizeof(RequestPortMergeData) + token.size(), 0, &data);
345   data->connector_port_name = connector_port_name;
346   memcpy(data + 1, token.data(), token.size());
347   WriteChannelMessage(std::move(message));
348 }
349 
RequestIntroduction(const ports::NodeName & name)350 void NodeChannel::RequestIntroduction(const ports::NodeName& name) {
351   IntroductionData* data;
352   Channel::MessagePtr message = CreateMessage(
353       MessageType::REQUEST_INTRODUCTION, sizeof(IntroductionData), 0, &data);
354   data->name = name;
355   WriteChannelMessage(std::move(message));
356 }
357 
Introduce(const ports::NodeName & name,PlatformHandle channel_handle)358 void NodeChannel::Introduce(const ports::NodeName& name,
359                             PlatformHandle channel_handle) {
360   IntroductionData* data;
361   std::vector<PlatformHandle> handles;
362   if (channel_handle.is_valid())
363     handles.emplace_back(std::move(channel_handle));
364   Channel::MessagePtr message = CreateMessage(
365       MessageType::INTRODUCE, sizeof(IntroductionData), handles.size(), &data);
366   message->SetHandles(std::move(handles));
367   data->name = name;
368   WriteChannelMessage(std::move(message));
369 }
370 
SendChannelMessage(Channel::MessagePtr message)371 void NodeChannel::SendChannelMessage(Channel::MessagePtr message) {
372   WriteChannelMessage(std::move(message));
373 }
374 
Broadcast(Channel::MessagePtr message)375 void NodeChannel::Broadcast(Channel::MessagePtr message) {
376   DCHECK(!message->has_handles());
377   void* data;
378   Channel::MessagePtr broadcast_message = CreateMessage(
379       MessageType::BROADCAST_EVENT, message->data_num_bytes(), 0, &data);
380   memcpy(data, message->data(), message->data_num_bytes());
381   WriteChannelMessage(std::move(broadcast_message));
382 }
383 
BindBrokerHost(PlatformHandle broker_host_handle)384 void NodeChannel::BindBrokerHost(PlatformHandle broker_host_handle) {
385 #if !defined(OS_MACOSX) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
386   DCHECK(broker_host_handle.is_valid());
387   BindBrokerHostData* data;
388   std::vector<PlatformHandle> handles;
389   handles.push_back(std::move(broker_host_handle));
390   Channel::MessagePtr message =
391       CreateMessage(MessageType::BIND_BROKER_HOST, sizeof(BindBrokerHostData),
392                     handles.size(), &data);
393   data->padding = 0;
394   message->SetHandles(std::move(handles));
395   WriteChannelMessage(std::move(message));
396 #endif
397 }
398 
399 #if defined(OS_WIN)
RelayEventMessage(const ports::NodeName & destination,Channel::MessagePtr message)400 void NodeChannel::RelayEventMessage(const ports::NodeName& destination,
401                                     Channel::MessagePtr message) {
402 #if defined(OS_WIN)
403   DCHECK(message->has_handles());
404 
405   // Note that this is only used on Windows, and on Windows all platform
406   // handles are included in the message data. We blindly copy all the data
407   // here and the relay node (the broker) will duplicate handles as needed.
408   size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes();
409   RelayEventMessageData* data;
410   Channel::MessagePtr relay_message =
411       CreateMessage(MessageType::RELAY_EVENT_MESSAGE, num_bytes, 0, &data);
412   data->destination = destination;
413   memcpy(data + 1, message->data(), message->data_num_bytes());
414 
415   // When the handles are duplicated in the broker, the source handles will
416   // be closed. If the broker never receives this message then these handles
417   // will leak, but that means something else has probably broken and the
418   // sending process won't likely be around much longer.
419   //
420   // TODO(https://crbug.com/813112): We would like to be able to violate the
421   // above stated assumption. We should not leak handles in cases where we
422   // outlive the broker, as we may continue existing and eventually accept a new
423   // broker invitation.
424   std::vector<PlatformHandleInTransit> handles = message->TakeHandles();
425   for (auto& handle : handles)
426     handle.TakeHandle().release();
427 
428 #else
429   DCHECK(message->has_mach_ports());
430 
431   // On OSX, the handles are extracted from the relayed message and attached to
432   // the wrapper. The broker then takes the handles attached to the wrapper and
433   // moves them back to the relayed message. This is necessary because the
434   // message may contain fds which need to be attached to the outer message so
435   // that they can be transferred to the broker.
436   std::vector<PlatformHandleInTransit> handles = message->TakeHandles();
437   size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes();
438   RelayEventMessageData* data;
439   Channel::MessagePtr relay_message = CreateMessage(
440       MessageType::RELAY_EVENT_MESSAGE, num_bytes, handles.size(), &data);
441   data->destination = destination;
442   memcpy(data + 1, message->data(), message->data_num_bytes());
443   relay_message->SetHandles(std::move(handles));
444 #endif  // defined(OS_WIN)
445 
446   WriteChannelMessage(std::move(relay_message));
447 }
448 
EventMessageFromRelay(const ports::NodeName & source,Channel::MessagePtr message)449 void NodeChannel::EventMessageFromRelay(const ports::NodeName& source,
450                                         Channel::MessagePtr message) {
451   size_t num_bytes =
452       sizeof(EventMessageFromRelayData) + message->payload_size();
453   EventMessageFromRelayData* data;
454   Channel::MessagePtr relayed_message =
455       CreateMessage(MessageType::EVENT_MESSAGE_FROM_RELAY, num_bytes,
456                     message->num_handles(), &data);
457   data->source = source;
458   if (message->payload_size())
459     memcpy(data + 1, message->payload(), message->payload_size());
460   relayed_message->SetHandles(message->TakeHandles());
461   WriteChannelMessage(std::move(relayed_message));
462 }
463 #endif  // defined(OS_WIN)
464 
NodeChannel(Delegate * delegate,ConnectionParams connection_params,Channel::HandlePolicy channel_handle_policy,scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,const ProcessErrorCallback & process_error_callback)465 NodeChannel::NodeChannel(
466     Delegate* delegate,
467     ConnectionParams connection_params,
468     Channel::HandlePolicy channel_handle_policy,
469     scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,
470     const ProcessErrorCallback& process_error_callback)
471     : base::RefCountedDeleteOnSequence<NodeChannel>(io_task_runner),
472       delegate_(delegate),
473       process_error_callback_(process_error_callback)
474 #if !defined(OS_NACL_SFI)
475       ,
476       channel_(Channel::Create(this,
477                                std::move(connection_params),
478                                channel_handle_policy,
479                                std::move(io_task_runner)))
480 #endif
481 {
482 }
483 
~NodeChannel()484 NodeChannel::~NodeChannel() {
485   ShutDown();
486 }
487 
CreateAndBindLocalBrokerHost(PlatformHandle broker_host_handle)488 void NodeChannel::CreateAndBindLocalBrokerHost(
489     PlatformHandle broker_host_handle) {
490 #if !defined(OS_MACOSX) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
491   // Self-owned.
492   ConnectionParams connection_params(
493       PlatformChannelEndpoint(std::move(broker_host_handle)));
494   new BrokerHost(remote_process_handle_.get(), std::move(connection_params),
495                  process_error_callback_);
496 #endif
497 }
498 
OnChannelMessage(const void * payload,size_t payload_size,std::vector<PlatformHandle> handles)499 void NodeChannel::OnChannelMessage(const void* payload,
500                                    size_t payload_size,
501                                    std::vector<PlatformHandle> handles) {
502   DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
503 
504   RequestContext request_context(RequestContext::Source::SYSTEM);
505 
506   if (payload_size <= sizeof(Header)) {
507     delegate_->OnChannelError(remote_node_name_, this);
508     return;
509   }
510 
511   const Header* header = static_cast<const Header*>(payload);
512   switch (header->type) {
513     case MessageType::ACCEPT_INVITEE: {
514       const AcceptInviteeData* data;
515       if (GetMessagePayload(payload, payload_size, &data)) {
516         delegate_->OnAcceptInvitee(remote_node_name_, data->inviter_name,
517                                    data->token);
518         return;
519       }
520       break;
521     }
522 
523     case MessageType::ACCEPT_INVITATION: {
524       const AcceptInvitationData* data;
525       if (GetMessagePayload(payload, payload_size, &data)) {
526         delegate_->OnAcceptInvitation(remote_node_name_, data->token,
527                                       data->invitee_name);
528         return;
529       }
530       break;
531     }
532 
533     case MessageType::ADD_BROKER_CLIENT: {
534       const AddBrokerClientData* data;
535       if (GetMessagePayload(payload, payload_size, &data)) {
536 #if defined(OS_WIN)
537         if (handles.size() != 1) {
538           DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
539           break;
540         }
541         delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
542                                      handles[0].ReleaseHandle());
543 #else
544         if (!handles.empty()) {
545           DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
546           break;
547         }
548         delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
549                                      data->process_handle);
550 #endif
551         return;
552       }
553       break;
554     }
555 
556     case MessageType::BROKER_CLIENT_ADDED: {
557       const BrokerClientAddedData* data;
558       if (GetMessagePayload(payload, payload_size, &data)) {
559         if (handles.size() != 1) {
560           DLOG(ERROR) << "Dropping invalid BrokerClientAdded message.";
561           break;
562         }
563         delegate_->OnBrokerClientAdded(remote_node_name_, data->client_name,
564                                        std::move(handles[0]));
565         return;
566       }
567       break;
568     }
569 
570     case MessageType::ACCEPT_BROKER_CLIENT: {
571       const AcceptBrokerClientData* data;
572       if (GetMessagePayload(payload, payload_size, &data)) {
573         PlatformHandle broker_channel;
574         if (handles.size() > 1) {
575           DLOG(ERROR) << "Dropping invalid AcceptBrokerClient message.";
576           break;
577         }
578         if (handles.size() == 1)
579           broker_channel = std::move(handles[0]);
580 
581         delegate_->OnAcceptBrokerClient(remote_node_name_, data->broker_name,
582                                         std::move(broker_channel));
583         return;
584       }
585       break;
586     }
587 
588     case MessageType::EVENT_MESSAGE: {
589       Channel::MessagePtr message(
590           new Channel::Message(payload_size, handles.size()));
591       message->SetHandles(std::move(handles));
592       memcpy(message->mutable_payload(), payload, payload_size);
593       delegate_->OnEventMessage(remote_node_name_, std::move(message));
594       return;
595     }
596 
597     case MessageType::REQUEST_PORT_MERGE: {
598       const RequestPortMergeData* data;
599       if (GetMessagePayload(payload, payload_size, &data)) {
600         // Don't accept an empty token.
601         size_t token_size = payload_size - sizeof(*data) - sizeof(Header);
602         if (token_size == 0)
603           break;
604         std::string token(reinterpret_cast<const char*>(data + 1), token_size);
605         delegate_->OnRequestPortMerge(remote_node_name_,
606                                       data->connector_port_name, token);
607         return;
608       }
609       break;
610     }
611 
612     case MessageType::REQUEST_INTRODUCTION: {
613       const IntroductionData* data;
614       if (GetMessagePayload(payload, payload_size, &data)) {
615         delegate_->OnRequestIntroduction(remote_node_name_, data->name);
616         return;
617       }
618       break;
619     }
620 
621     case MessageType::INTRODUCE: {
622       const IntroductionData* data;
623       if (GetMessagePayload(payload, payload_size, &data)) {
624         if (handles.size() > 1) {
625           DLOG(ERROR) << "Dropping invalid introduction message.";
626           break;
627         }
628         PlatformHandle channel_handle;
629         if (handles.size() == 1)
630           channel_handle = std::move(handles[0]);
631 
632         delegate_->OnIntroduce(remote_node_name_, data->name,
633                                std::move(channel_handle));
634         return;
635       }
636       break;
637     }
638 
639 #if defined(OS_WIN)
640     case MessageType::RELAY_EVENT_MESSAGE: {
641       base::ProcessHandle from_process;
642       {
643         base::AutoLock lock(remote_process_handle_lock_);
644         // NOTE: It's safe to retain a weak reference to this process handle
645         // through the extent of this call because |this| is kept alive and
646         // |remote_process_handle_| is never reset once set.
647         from_process = remote_process_handle_.get();
648       }
649       const RelayEventMessageData* data;
650       if (GetMessagePayload(payload, payload_size, &data)) {
651         // Don't try to relay an empty message.
652         if (payload_size <= sizeof(Header) + sizeof(RelayEventMessageData))
653           break;
654 
655         const void* message_start = data + 1;
656         Channel::MessagePtr message = Channel::Message::Deserialize(
657             message_start, payload_size - sizeof(Header) - sizeof(*data),
658             from_process);
659         if (!message) {
660           DLOG(ERROR) << "Dropping invalid relay message.";
661           break;
662         }
663         delegate_->OnRelayEventMessage(remote_node_name_, from_process,
664                                        data->destination, std::move(message));
665         return;
666       }
667       break;
668     }
669 #endif
670 
671     case MessageType::BROADCAST_EVENT: {
672       if (payload_size <= sizeof(Header))
673         break;
674       const void* data = static_cast<const void*>(
675           reinterpret_cast<const Header*>(payload) + 1);
676       Channel::MessagePtr message =
677           Channel::Message::Deserialize(data, payload_size - sizeof(Header));
678       if (!message || message->has_handles()) {
679         DLOG(ERROR) << "Dropping invalid broadcast message.";
680         break;
681       }
682       delegate_->OnBroadcast(remote_node_name_, std::move(message));
683       return;
684     }
685 
686 #if defined(OS_WIN)
687     case MessageType::EVENT_MESSAGE_FROM_RELAY: {
688       const EventMessageFromRelayData* data;
689       if (GetMessagePayload(payload, payload_size, &data)) {
690         size_t num_bytes = payload_size - sizeof(*data);
691         if (num_bytes < sizeof(Header))
692           break;
693         num_bytes -= sizeof(Header);
694 
695         Channel::MessagePtr message(
696             new Channel::Message(num_bytes, handles.size()));
697         message->SetHandles(std::move(handles));
698         if (num_bytes)
699           memcpy(message->mutable_payload(), data + 1, num_bytes);
700         delegate_->OnEventMessageFromRelay(remote_node_name_, data->source,
701                                            std::move(message));
702         return;
703       }
704       break;
705     }
706 #endif  // defined(OS_WIN)
707 
708     case MessageType::ACCEPT_PEER: {
709       const AcceptPeerData* data;
710       if (GetMessagePayload(payload, payload_size, &data)) {
711         delegate_->OnAcceptPeer(remote_node_name_, data->token, data->peer_name,
712                                 data->port_name);
713         return;
714       }
715       break;
716     }
717 
718     case MessageType::BIND_BROKER_HOST:
719       if (handles.size() == 1) {
720         CreateAndBindLocalBrokerHost(std::move(handles[0]));
721         return;
722       }
723       break;
724 
725     default:
726       // Ignore unrecognized message types, allowing for future extensibility.
727       return;
728   }
729 
730   DLOG(ERROR) << "Received invalid message. Closing channel.";
731   if (process_error_callback_)
732     process_error_callback_.Run("NodeChannel received a malformed message");
733   delegate_->OnChannelError(remote_node_name_, this);
734 }
735 
OnChannelError(Channel::Error error)736 void NodeChannel::OnChannelError(Channel::Error error) {
737   DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
738 
739   RequestContext request_context(RequestContext::Source::SYSTEM);
740 
741   ShutDown();
742 
743   if (process_error_callback_ &&
744       error == Channel::Error::kReceivedMalformedData) {
745     process_error_callback_.Run("Channel received a malformed message");
746   }
747 
748   // |OnChannelError()| may cause |this| to be destroyed, but still need access
749   // to the name after that destruction. So make a copy of
750   // |remote_node_name_| so it can be used if |this| becomes destroyed.
751   ports::NodeName node_name = remote_node_name_;
752   delegate_->OnChannelError(node_name, this);
753 }
754 
WriteChannelMessage(Channel::MessagePtr message)755 void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) {
756   // Force a crash if this process attempts to send a message larger than the
757   // maximum allowed size. This is more useful than killing a Channel when we
758   // *receive* an oversized message, as we should consider oversized message
759   // transmission to be a bug and this helps easily identify offending code.
760   CHECK_LT(message->data_num_bytes(), GetConfiguration().max_message_num_bytes);
761 
762   base::AutoLock lock(channel_lock_);
763   if (!channel_)
764     DLOG(ERROR) << "Dropping message on closed channel.";
765   else
766     channel_->Write(std::move(message));
767 }
768 
769 }  // namespace core
770 }  // namespace mojo
771