1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/core/node_channel.h"
6 
7 #include <cstring>
8 #include <limits>
9 #include <sstream>
10 
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/memory/ptr_util.h"
15 #include "mojo/core/broker_host.h"
16 #include "mojo/core/channel.h"
17 #include "mojo/core/configuration.h"
18 #include "mojo/core/core.h"
19 #include "mojo/core/request_context.h"
20 
21 namespace mojo {
22 namespace core {
23 
24 namespace {
25 
26 // NOTE: Please ONLY append messages to the end of this enum.
27 enum class MessageType : uint32_t {
28   ACCEPT_INVITEE,
29   ACCEPT_INVITATION,
30   ADD_BROKER_CLIENT,
31   BROKER_CLIENT_ADDED,
32   ACCEPT_BROKER_CLIENT,
33   EVENT_MESSAGE,
34   REQUEST_PORT_MERGE,
35   REQUEST_INTRODUCTION,
36   INTRODUCE,
37 #if defined(OS_WIN)
38   RELAY_EVENT_MESSAGE,
39 #endif
40   BROADCAST_EVENT,
41 #if defined(OS_WIN)
42   EVENT_MESSAGE_FROM_RELAY,
43 #endif
44   ACCEPT_PEER,
45   BIND_BROKER_HOST,
46 };
47 
48 #pragma pack(push, 1)
49 
50 struct Header {
51   MessageType type;
52   uint32_t padding;
53 };
54 
55 static_assert(IsAlignedForChannelMessage(sizeof(Header)),
56               "Invalid header size.");
57 
58 struct AcceptInviteeData {
59   ports::NodeName inviter_name;
60   ports::NodeName token;
61 };
62 
63 struct AcceptInvitationData {
64   ports::NodeName token;
65   ports::NodeName invitee_name;
66 };
67 
68 struct AcceptPeerData {
69   ports::NodeName token;
70   ports::NodeName peer_name;
71   ports::PortName port_name;
72 };
73 
74 // This message may include a process handle on plaforms that require it.
75 struct AddBrokerClientData {
76   ports::NodeName client_name;
77 #if !defined(OS_WIN)
78   uint32_t process_handle;
79   uint32_t padding;
80 #endif
81 };
82 
83 #if !defined(OS_WIN)
84 static_assert(sizeof(base::ProcessHandle) == sizeof(uint32_t),
85               "Unexpected pid size");
86 static_assert(sizeof(AddBrokerClientData) % kChannelMessageAlignment == 0,
87               "Invalid AddBrokerClientData size.");
88 #endif
89 
90 // This data is followed by a platform channel handle to the broker.
91 struct BrokerClientAddedData {
92   ports::NodeName client_name;
93 };
94 
95 // This data may be followed by a platform channel handle to the broker. If not,
96 // then the inviter is the broker and its channel should be used as such.
97 struct AcceptBrokerClientData {
98   ports::NodeName broker_name;
99 };
100 
101 // This is followed by arbitrary payload data which is interpreted as a token
102 // string for port location.
103 struct RequestPortMergeData {
104   ports::PortName connector_port_name;
105 };
106 
107 // Used for both REQUEST_INTRODUCTION and INTRODUCE.
108 //
109 // For INTRODUCE the message also includes a valid platform handle for a channel
110 // the receiver may use to communicate with the named node directly, or an
111 // invalid platform handle if the node is unknown to the sender or otherwise
112 // cannot be introduced.
113 struct IntroductionData {
114   ports::NodeName name;
115 };
116 
117 // This message is just a PlatformHandle. The data struct here has only a
118 // padding field to ensure an aligned, non-zero-length payload.
119 struct BindBrokerHostData {
120   uint64_t padding;
121 };
122 
123 #if defined(OS_WIN)
124 // This struct is followed by the full payload of a message to be relayed.
125 struct RelayEventMessageData {
126   ports::NodeName destination;
127 };
128 
129 // This struct is followed by the full payload of a relayed message.
130 struct EventMessageFromRelayData {
131   ports::NodeName source;
132 };
133 #endif
134 
135 #pragma pack(pop)
136 
137 template <typename DataType>
CreateMessage(MessageType type,size_t payload_size,size_t num_handles,DataType ** out_data,size_t capacity=0)138 Channel::MessagePtr CreateMessage(MessageType type,
139                                   size_t payload_size,
140                                   size_t num_handles,
141                                   DataType** out_data,
142                                   size_t capacity = 0) {
143   const size_t total_size = payload_size + sizeof(Header);
144   if (capacity == 0)
145     capacity = total_size;
146   else
147     capacity = std::max(total_size, capacity);
148   auto message =
149       std::make_unique<Channel::Message>(capacity, total_size, num_handles);
150   Header* header = reinterpret_cast<Header*>(message->mutable_payload());
151   header->type = type;
152   header->padding = 0;
153   *out_data = reinterpret_cast<DataType*>(&header[1]);
154   return message;
155 }
156 
157 template <typename DataType>
GetMessagePayload(const void * bytes,size_t num_bytes,DataType ** out_data)158 bool GetMessagePayload(const void* bytes,
159                        size_t num_bytes,
160                        DataType** out_data) {
161   static_assert(sizeof(DataType) > 0, "DataType must have non-zero size.");
162   if (num_bytes < sizeof(Header) + sizeof(DataType))
163     return false;
164   *out_data = reinterpret_cast<const DataType*>(
165       static_cast<const char*>(bytes) + sizeof(Header));
166   return true;
167 }
168 
169 }  // namespace
170 
171 // static
Create(Delegate * delegate,ConnectionParams connection_params,Channel::HandlePolicy channel_handle_policy,scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,const ProcessErrorCallback & process_error_callback)172 scoped_refptr<NodeChannel> NodeChannel::Create(
173     Delegate* delegate,
174     ConnectionParams connection_params,
175     Channel::HandlePolicy channel_handle_policy,
176     scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,
177     const ProcessErrorCallback& process_error_callback) {
178 #if defined(OS_NACL_SFI)
179   LOG(FATAL) << "Multi-process not yet supported on NaCl-SFI";
180   return nullptr;
181 #else
182   return new NodeChannel(delegate, std::move(connection_params),
183                          channel_handle_policy, io_task_runner,
184                          process_error_callback);
185 #endif
186 }
187 
188 // static
CreateEventMessage(size_t capacity,size_t payload_size,void ** payload,size_t num_handles)189 Channel::MessagePtr NodeChannel::CreateEventMessage(size_t capacity,
190                                                     size_t payload_size,
191                                                     void** payload,
192                                                     size_t num_handles) {
193   return CreateMessage(MessageType::EVENT_MESSAGE, payload_size, num_handles,
194                        payload, capacity);
195 }
196 
197 // static
GetEventMessageData(Channel::Message * message,void ** data,size_t * num_data_bytes)198 void NodeChannel::GetEventMessageData(Channel::Message* message,
199                                       void** data,
200                                       size_t* num_data_bytes) {
201   // NOTE: OnChannelMessage guarantees that we never accept a Channel::Message
202   // with a payload of fewer than |sizeof(Header)| bytes.
203   *data = reinterpret_cast<Header*>(message->mutable_payload()) + 1;
204   *num_data_bytes = message->payload_size() - sizeof(Header);
205 }
206 
Start()207 void NodeChannel::Start() {
208   base::AutoLock lock(channel_lock_);
209   // ShutDown() may have already been called, in which case |channel_| is null.
210   if (channel_)
211     channel_->Start();
212 }
213 
ShutDown()214 void NodeChannel::ShutDown() {
215   base::AutoLock lock(channel_lock_);
216   if (channel_) {
217     channel_->ShutDown();
218     channel_ = nullptr;
219   }
220 }
221 
LeakHandleOnShutdown()222 void NodeChannel::LeakHandleOnShutdown() {
223   base::AutoLock lock(channel_lock_);
224   if (channel_) {
225     channel_->LeakHandle();
226   }
227 }
228 
NotifyBadMessage(const std::string & error)229 void NodeChannel::NotifyBadMessage(const std::string& error) {
230   DCHECK(HasBadMessageHandler());
231   process_error_callback_.Run("Received bad user message: " + error);
232 }
233 
SetRemoteProcessHandle(ScopedProcessHandle process_handle)234 void NodeChannel::SetRemoteProcessHandle(ScopedProcessHandle process_handle) {
235   DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
236   {
237     base::AutoLock lock(channel_lock_);
238     if (channel_)
239       channel_->set_remote_process(process_handle.Clone());
240   }
241   base::AutoLock lock(remote_process_handle_lock_);
242   DCHECK(!remote_process_handle_.is_valid());
243   CHECK_NE(remote_process_handle_.get(), base::GetCurrentProcessHandle());
244   remote_process_handle_ = std::move(process_handle);
245 }
246 
HasRemoteProcessHandle()247 bool NodeChannel::HasRemoteProcessHandle() {
248   base::AutoLock lock(remote_process_handle_lock_);
249   return remote_process_handle_.is_valid();
250 }
251 
CloneRemoteProcessHandle()252 ScopedProcessHandle NodeChannel::CloneRemoteProcessHandle() {
253   base::AutoLock lock(remote_process_handle_lock_);
254   if (!remote_process_handle_.is_valid())
255     return ScopedProcessHandle();
256   return remote_process_handle_.Clone();
257 }
258 
SetRemoteNodeName(const ports::NodeName & name)259 void NodeChannel::SetRemoteNodeName(const ports::NodeName& name) {
260   DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
261   remote_node_name_ = name;
262 }
263 
AcceptInvitee(const ports::NodeName & inviter_name,const ports::NodeName & token)264 void NodeChannel::AcceptInvitee(const ports::NodeName& inviter_name,
265                                 const ports::NodeName& token) {
266   AcceptInviteeData* data;
267   Channel::MessagePtr message = CreateMessage(
268       MessageType::ACCEPT_INVITEE, sizeof(AcceptInviteeData), 0, &data);
269   data->inviter_name = inviter_name;
270   data->token = token;
271   WriteChannelMessage(std::move(message));
272 }
273 
AcceptInvitation(const ports::NodeName & token,const ports::NodeName & invitee_name)274 void NodeChannel::AcceptInvitation(const ports::NodeName& token,
275                                    const ports::NodeName& invitee_name) {
276   AcceptInvitationData* data;
277   Channel::MessagePtr message = CreateMessage(
278       MessageType::ACCEPT_INVITATION, sizeof(AcceptInvitationData), 0, &data);
279   data->token = token;
280   data->invitee_name = invitee_name;
281   WriteChannelMessage(std::move(message));
282 }
283 
AcceptPeer(const ports::NodeName & sender_name,const ports::NodeName & token,const ports::PortName & port_name)284 void NodeChannel::AcceptPeer(const ports::NodeName& sender_name,
285                              const ports::NodeName& token,
286                              const ports::PortName& port_name) {
287   AcceptPeerData* data;
288   Channel::MessagePtr message =
289       CreateMessage(MessageType::ACCEPT_PEER, sizeof(AcceptPeerData), 0, &data);
290   data->token = token;
291   data->peer_name = sender_name;
292   data->port_name = port_name;
293   WriteChannelMessage(std::move(message));
294 }
295 
AddBrokerClient(const ports::NodeName & client_name,ScopedProcessHandle process_handle)296 void NodeChannel::AddBrokerClient(const ports::NodeName& client_name,
297                                   ScopedProcessHandle process_handle) {
298   AddBrokerClientData* data;
299   std::vector<PlatformHandle> handles;
300 #if defined(OS_WIN)
301   handles.emplace_back(base::win::ScopedHandle(process_handle.release()));
302 #endif
303   Channel::MessagePtr message =
304       CreateMessage(MessageType::ADD_BROKER_CLIENT, sizeof(AddBrokerClientData),
305                     handles.size(), &data);
306   message->SetHandles(std::move(handles));
307   data->client_name = client_name;
308 #if !defined(OS_WIN)
309   data->process_handle = process_handle.get();
310   data->padding = 0;
311 #endif
312   WriteChannelMessage(std::move(message));
313 }
314 
BrokerClientAdded(const ports::NodeName & client_name,PlatformHandle broker_channel)315 void NodeChannel::BrokerClientAdded(const ports::NodeName& client_name,
316                                     PlatformHandle broker_channel) {
317   BrokerClientAddedData* data;
318   std::vector<PlatformHandle> handles;
319   if (broker_channel.is_valid())
320     handles.emplace_back(std::move(broker_channel));
321   Channel::MessagePtr message =
322       CreateMessage(MessageType::BROKER_CLIENT_ADDED,
323                     sizeof(BrokerClientAddedData), handles.size(), &data);
324   message->SetHandles(std::move(handles));
325   data->client_name = client_name;
326   WriteChannelMessage(std::move(message));
327 }
328 
AcceptBrokerClient(const ports::NodeName & broker_name,PlatformHandle broker_channel)329 void NodeChannel::AcceptBrokerClient(const ports::NodeName& broker_name,
330                                      PlatformHandle broker_channel) {
331   AcceptBrokerClientData* data;
332   std::vector<PlatformHandle> handles;
333   if (broker_channel.is_valid())
334     handles.emplace_back(std::move(broker_channel));
335   Channel::MessagePtr message =
336       CreateMessage(MessageType::ACCEPT_BROKER_CLIENT,
337                     sizeof(AcceptBrokerClientData), handles.size(), &data);
338   message->SetHandles(std::move(handles));
339   data->broker_name = broker_name;
340   WriteChannelMessage(std::move(message));
341 }
342 
RequestPortMerge(const ports::PortName & connector_port_name,const std::string & token)343 void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name,
344                                    const std::string& token) {
345   RequestPortMergeData* data;
346   Channel::MessagePtr message =
347       CreateMessage(MessageType::REQUEST_PORT_MERGE,
348                     sizeof(RequestPortMergeData) + token.size(), 0, &data);
349   data->connector_port_name = connector_port_name;
350   memcpy(data + 1, token.data(), token.size());
351   WriteChannelMessage(std::move(message));
352 }
353 
RequestIntroduction(const ports::NodeName & name)354 void NodeChannel::RequestIntroduction(const ports::NodeName& name) {
355   IntroductionData* data;
356   Channel::MessagePtr message = CreateMessage(
357       MessageType::REQUEST_INTRODUCTION, sizeof(IntroductionData), 0, &data);
358   data->name = name;
359   WriteChannelMessage(std::move(message));
360 }
361 
Introduce(const ports::NodeName & name,PlatformHandle channel_handle)362 void NodeChannel::Introduce(const ports::NodeName& name,
363                             PlatformHandle channel_handle) {
364   IntroductionData* data;
365   std::vector<PlatformHandle> handles;
366   if (channel_handle.is_valid())
367     handles.emplace_back(std::move(channel_handle));
368   Channel::MessagePtr message = CreateMessage(
369       MessageType::INTRODUCE, sizeof(IntroductionData), handles.size(), &data);
370   message->SetHandles(std::move(handles));
371   data->name = name;
372   WriteChannelMessage(std::move(message));
373 }
374 
SendChannelMessage(Channel::MessagePtr message)375 void NodeChannel::SendChannelMessage(Channel::MessagePtr message) {
376   WriteChannelMessage(std::move(message));
377 }
378 
Broadcast(Channel::MessagePtr message)379 void NodeChannel::Broadcast(Channel::MessagePtr message) {
380   DCHECK(!message->has_handles());
381   void* data;
382   Channel::MessagePtr broadcast_message = CreateMessage(
383       MessageType::BROADCAST_EVENT, message->data_num_bytes(), 0, &data);
384   memcpy(data, message->data(), message->data_num_bytes());
385   WriteChannelMessage(std::move(broadcast_message));
386 }
387 
BindBrokerHost(PlatformHandle broker_host_handle)388 void NodeChannel::BindBrokerHost(PlatformHandle broker_host_handle) {
389 #if !defined(OS_APPLE) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
390   DCHECK(broker_host_handle.is_valid());
391   BindBrokerHostData* data;
392   std::vector<PlatformHandle> handles;
393   handles.push_back(std::move(broker_host_handle));
394   Channel::MessagePtr message =
395       CreateMessage(MessageType::BIND_BROKER_HOST, sizeof(BindBrokerHostData),
396                     handles.size(), &data);
397   data->padding = 0;
398   message->SetHandles(std::move(handles));
399   WriteChannelMessage(std::move(message));
400 #endif
401 }
402 
403 #if defined(OS_WIN)
RelayEventMessage(const ports::NodeName & destination,Channel::MessagePtr message)404 void NodeChannel::RelayEventMessage(const ports::NodeName& destination,
405                                     Channel::MessagePtr message) {
406 #if defined(OS_WIN)
407   DCHECK(message->has_handles());
408 
409   // Note that this is only used on Windows, and on Windows all platform
410   // handles are included in the message data. We blindly copy all the data
411   // here and the relay node (the broker) will duplicate handles as needed.
412   size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes();
413   RelayEventMessageData* data;
414   Channel::MessagePtr relay_message =
415       CreateMessage(MessageType::RELAY_EVENT_MESSAGE, num_bytes, 0, &data);
416   data->destination = destination;
417   memcpy(data + 1, message->data(), message->data_num_bytes());
418 
419   // When the handles are duplicated in the broker, the source handles will
420   // be closed. If the broker never receives this message then these handles
421   // will leak, but that means something else has probably broken and the
422   // sending process won't likely be around much longer.
423   //
424   // TODO(https://crbug.com/813112): We would like to be able to violate the
425   // above stated assumption. We should not leak handles in cases where we
426   // outlive the broker, as we may continue existing and eventually accept a new
427   // broker invitation.
428   std::vector<PlatformHandleInTransit> handles = message->TakeHandles();
429   for (auto& handle : handles)
430     handle.TakeHandle().release();
431 
432 #else
433   DCHECK(message->has_mach_ports());
434 
435   // On OSX, the handles are extracted from the relayed message and attached to
436   // the wrapper. The broker then takes the handles attached to the wrapper and
437   // moves them back to the relayed message. This is necessary because the
438   // message may contain fds which need to be attached to the outer message so
439   // that they can be transferred to the broker.
440   std::vector<PlatformHandleInTransit> handles = message->TakeHandles();
441   size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes();
442   RelayEventMessageData* data;
443   Channel::MessagePtr relay_message = CreateMessage(
444       MessageType::RELAY_EVENT_MESSAGE, num_bytes, handles.size(), &data);
445   data->destination = destination;
446   memcpy(data + 1, message->data(), message->data_num_bytes());
447   relay_message->SetHandles(std::move(handles));
448 #endif  // defined(OS_WIN)
449 
450   WriteChannelMessage(std::move(relay_message));
451 }
452 
EventMessageFromRelay(const ports::NodeName & source,Channel::MessagePtr message)453 void NodeChannel::EventMessageFromRelay(const ports::NodeName& source,
454                                         Channel::MessagePtr message) {
455   size_t num_bytes =
456       sizeof(EventMessageFromRelayData) + message->payload_size();
457   EventMessageFromRelayData* data;
458   Channel::MessagePtr relayed_message =
459       CreateMessage(MessageType::EVENT_MESSAGE_FROM_RELAY, num_bytes,
460                     message->num_handles(), &data);
461   data->source = source;
462   if (message->payload_size())
463     memcpy(data + 1, message->payload(), message->payload_size());
464   relayed_message->SetHandles(message->TakeHandles());
465   WriteChannelMessage(std::move(relayed_message));
466 }
467 #endif  // defined(OS_WIN)
468 
NodeChannel(Delegate * delegate,ConnectionParams connection_params,Channel::HandlePolicy channel_handle_policy,scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,const ProcessErrorCallback & process_error_callback)469 NodeChannel::NodeChannel(
470     Delegate* delegate,
471     ConnectionParams connection_params,
472     Channel::HandlePolicy channel_handle_policy,
473     scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,
474     const ProcessErrorCallback& process_error_callback)
475     : base::RefCountedDeleteOnSequence<NodeChannel>(io_task_runner),
476       delegate_(delegate),
477       process_error_callback_(process_error_callback)
478 #if !defined(OS_NACL_SFI)
479       ,
480       channel_(Channel::Create(this,
481                                std::move(connection_params),
482                                channel_handle_policy,
483                                std::move(io_task_runner)))
484 #endif
485 {
486 }
487 
~NodeChannel()488 NodeChannel::~NodeChannel() {
489   ShutDown();
490 }
491 
CreateAndBindLocalBrokerHost(PlatformHandle broker_host_handle)492 void NodeChannel::CreateAndBindLocalBrokerHost(
493     PlatformHandle broker_host_handle) {
494 #if !defined(OS_APPLE) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
495   // Self-owned.
496   ConnectionParams connection_params(
497       PlatformChannelEndpoint(std::move(broker_host_handle)));
498   new BrokerHost(remote_process_handle_.get(), std::move(connection_params),
499                  process_error_callback_);
500 #endif
501 }
502 
OnChannelMessage(const void * payload,size_t payload_size,std::vector<PlatformHandle> handles)503 void NodeChannel::OnChannelMessage(const void* payload,
504                                    size_t payload_size,
505                                    std::vector<PlatformHandle> handles) {
506   DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
507 
508   RequestContext request_context(RequestContext::Source::SYSTEM);
509 
510   if (payload_size <= sizeof(Header)) {
511     delegate_->OnChannelError(remote_node_name_, this);
512     return;
513   }
514 
515   const Header* header = static_cast<const Header*>(payload);
516   switch (header->type) {
517     case MessageType::ACCEPT_INVITEE: {
518       const AcceptInviteeData* data;
519       if (GetMessagePayload(payload, payload_size, &data)) {
520         delegate_->OnAcceptInvitee(remote_node_name_, data->inviter_name,
521                                    data->token);
522         return;
523       }
524       break;
525     }
526 
527     case MessageType::ACCEPT_INVITATION: {
528       const AcceptInvitationData* data;
529       if (GetMessagePayload(payload, payload_size, &data)) {
530         delegate_->OnAcceptInvitation(remote_node_name_, data->token,
531                                       data->invitee_name);
532         return;
533       }
534       break;
535     }
536 
537     case MessageType::ADD_BROKER_CLIENT: {
538       const AddBrokerClientData* data;
539       if (GetMessagePayload(payload, payload_size, &data)) {
540 #if defined(OS_WIN)
541         if (handles.size() != 1) {
542           DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
543           break;
544         }
545         delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
546                                      handles[0].ReleaseHandle());
547 #else
548         if (!handles.empty()) {
549           DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
550           break;
551         }
552         delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
553                                      data->process_handle);
554 #endif
555         return;
556       }
557       break;
558     }
559 
560     case MessageType::BROKER_CLIENT_ADDED: {
561       const BrokerClientAddedData* data;
562       if (GetMessagePayload(payload, payload_size, &data)) {
563         if (handles.size() != 1) {
564           DLOG(ERROR) << "Dropping invalid BrokerClientAdded message.";
565           break;
566         }
567         delegate_->OnBrokerClientAdded(remote_node_name_, data->client_name,
568                                        std::move(handles[0]));
569         return;
570       }
571       break;
572     }
573 
574     case MessageType::ACCEPT_BROKER_CLIENT: {
575       const AcceptBrokerClientData* data;
576       if (GetMessagePayload(payload, payload_size, &data)) {
577         PlatformHandle broker_channel;
578         if (handles.size() > 1) {
579           DLOG(ERROR) << "Dropping invalid AcceptBrokerClient message.";
580           break;
581         }
582         if (handles.size() == 1)
583           broker_channel = std::move(handles[0]);
584 
585         delegate_->OnAcceptBrokerClient(remote_node_name_, data->broker_name,
586                                         std::move(broker_channel));
587         return;
588       }
589       break;
590     }
591 
592     case MessageType::EVENT_MESSAGE: {
593       Channel::MessagePtr message(
594           new Channel::Message(payload_size, handles.size()));
595       message->SetHandles(std::move(handles));
596       memcpy(message->mutable_payload(), payload, payload_size);
597       delegate_->OnEventMessage(remote_node_name_, std::move(message));
598       return;
599     }
600 
601     case MessageType::REQUEST_PORT_MERGE: {
602       const RequestPortMergeData* data;
603       if (GetMessagePayload(payload, payload_size, &data)) {
604         // Don't accept an empty token.
605         size_t token_size = payload_size - sizeof(*data) - sizeof(Header);
606         if (token_size == 0)
607           break;
608         std::string token(reinterpret_cast<const char*>(data + 1), token_size);
609         delegate_->OnRequestPortMerge(remote_node_name_,
610                                       data->connector_port_name, token);
611         return;
612       }
613       break;
614     }
615 
616     case MessageType::REQUEST_INTRODUCTION: {
617       const IntroductionData* data;
618       if (GetMessagePayload(payload, payload_size, &data)) {
619         delegate_->OnRequestIntroduction(remote_node_name_, data->name);
620         return;
621       }
622       break;
623     }
624 
625     case MessageType::INTRODUCE: {
626       const IntroductionData* data;
627       if (GetMessagePayload(payload, payload_size, &data)) {
628         if (handles.size() > 1) {
629           DLOG(ERROR) << "Dropping invalid introduction message.";
630           break;
631         }
632         PlatformHandle channel_handle;
633         if (handles.size() == 1)
634           channel_handle = std::move(handles[0]);
635 
636         delegate_->OnIntroduce(remote_node_name_, data->name,
637                                std::move(channel_handle));
638         return;
639       }
640       break;
641     }
642 
643 #if defined(OS_WIN)
644     case MessageType::RELAY_EVENT_MESSAGE: {
645       base::ProcessHandle from_process;
646       {
647         base::AutoLock lock(remote_process_handle_lock_);
648         // NOTE: It's safe to retain a weak reference to this process handle
649         // through the extent of this call because |this| is kept alive and
650         // |remote_process_handle_| is never reset once set.
651         from_process = remote_process_handle_.get();
652       }
653       const RelayEventMessageData* data;
654       if (GetMessagePayload(payload, payload_size, &data)) {
655         // Don't try to relay an empty message.
656         if (payload_size <= sizeof(Header) + sizeof(RelayEventMessageData))
657           break;
658 
659         const void* message_start = data + 1;
660         Channel::MessagePtr message = Channel::Message::Deserialize(
661             message_start, payload_size - sizeof(Header) - sizeof(*data),
662             from_process);
663         if (!message) {
664           DLOG(ERROR) << "Dropping invalid relay message.";
665           break;
666         }
667         delegate_->OnRelayEventMessage(remote_node_name_, from_process,
668                                        data->destination, std::move(message));
669         return;
670       }
671       break;
672     }
673 #endif
674 
675     case MessageType::BROADCAST_EVENT: {
676       if (payload_size <= sizeof(Header))
677         break;
678       const void* data = static_cast<const void*>(
679           reinterpret_cast<const Header*>(payload) + 1);
680       Channel::MessagePtr message =
681           Channel::Message::Deserialize(data, payload_size - sizeof(Header));
682       if (!message || message->has_handles()) {
683         DLOG(ERROR) << "Dropping invalid broadcast message.";
684         break;
685       }
686       delegate_->OnBroadcast(remote_node_name_, std::move(message));
687       return;
688     }
689 
690 #if defined(OS_WIN)
691     case MessageType::EVENT_MESSAGE_FROM_RELAY: {
692       const EventMessageFromRelayData* data;
693       if (GetMessagePayload(payload, payload_size, &data)) {
694         size_t num_bytes = payload_size - sizeof(*data);
695         if (num_bytes < sizeof(Header))
696           break;
697         num_bytes -= sizeof(Header);
698 
699         Channel::MessagePtr message(
700             new Channel::Message(num_bytes, handles.size()));
701         message->SetHandles(std::move(handles));
702         if (num_bytes)
703           memcpy(message->mutable_payload(), data + 1, num_bytes);
704         delegate_->OnEventMessageFromRelay(remote_node_name_, data->source,
705                                            std::move(message));
706         return;
707       }
708       break;
709     }
710 #endif  // defined(OS_WIN)
711 
712     case MessageType::ACCEPT_PEER: {
713       const AcceptPeerData* data;
714       if (GetMessagePayload(payload, payload_size, &data)) {
715         delegate_->OnAcceptPeer(remote_node_name_, data->token, data->peer_name,
716                                 data->port_name);
717         return;
718       }
719       break;
720     }
721 
722     case MessageType::BIND_BROKER_HOST:
723       if (handles.size() == 1) {
724         CreateAndBindLocalBrokerHost(std::move(handles[0]));
725         return;
726       }
727       break;
728 
729     default:
730       // Ignore unrecognized message types, allowing for future extensibility.
731       return;
732   }
733 
734   DLOG(ERROR) << "Received invalid message. Closing channel.";
735   if (process_error_callback_)
736     process_error_callback_.Run("NodeChannel received a malformed message");
737   delegate_->OnChannelError(remote_node_name_, this);
738 }
739 
OnChannelError(Channel::Error error)740 void NodeChannel::OnChannelError(Channel::Error error) {
741   DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
742 
743   RequestContext request_context(RequestContext::Source::SYSTEM);
744 
745   ShutDown();
746 
747   if (process_error_callback_ &&
748       error == Channel::Error::kReceivedMalformedData) {
749     process_error_callback_.Run("Channel received a malformed message");
750   }
751 
752   // |OnChannelError()| may cause |this| to be destroyed, but still need access
753   // to the name after that destruction. So make a copy of
754   // |remote_node_name_| so it can be used if |this| becomes destroyed.
755   ports::NodeName node_name = remote_node_name_;
756   delegate_->OnChannelError(node_name, this);
757 }
758 
WriteChannelMessage(Channel::MessagePtr message)759 void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) {
760   base::AutoLock lock(channel_lock_);
761   if (!channel_)
762     DLOG(ERROR) << "Dropping message on closed channel.";
763   else
764     channel_->Write(std::move(message));
765 }
766 
767 }  // namespace core
768 }  // namespace mojo
769