1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/core/node_channel.h"
6
7 #include <cstring>
8 #include <limits>
9 #include <sstream>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/memory/ptr_util.h"
15 #include "mojo/core/broker_host.h"
16 #include "mojo/core/channel.h"
17 #include "mojo/core/configuration.h"
18 #include "mojo/core/core.h"
19 #include "mojo/core/request_context.h"
20
21 namespace mojo {
22 namespace core {
23
24 namespace {
25
26 // NOTE: Please ONLY append messages to the end of this enum.
27 enum class MessageType : uint32_t {
28 ACCEPT_INVITEE,
29 ACCEPT_INVITATION,
30 ADD_BROKER_CLIENT,
31 BROKER_CLIENT_ADDED,
32 ACCEPT_BROKER_CLIENT,
33 EVENT_MESSAGE,
34 REQUEST_PORT_MERGE,
35 REQUEST_INTRODUCTION,
36 INTRODUCE,
37 #if defined(OS_WIN)
38 RELAY_EVENT_MESSAGE,
39 #endif
40 BROADCAST_EVENT,
41 #if defined(OS_WIN)
42 EVENT_MESSAGE_FROM_RELAY,
43 #endif
44 ACCEPT_PEER,
45 BIND_BROKER_HOST,
46 };
47
48 #pragma pack(push, 1)
49
50 struct Header {
51 MessageType type;
52 uint32_t padding;
53 };
54
55 static_assert(IsAlignedForChannelMessage(sizeof(Header)),
56 "Invalid header size.");
57
58 struct AcceptInviteeData {
59 ports::NodeName inviter_name;
60 ports::NodeName token;
61 };
62
63 struct AcceptInvitationData {
64 ports::NodeName token;
65 ports::NodeName invitee_name;
66 };
67
68 struct AcceptPeerData {
69 ports::NodeName token;
70 ports::NodeName peer_name;
71 ports::PortName port_name;
72 };
73
74 // This message may include a process handle on plaforms that require it.
75 struct AddBrokerClientData {
76 ports::NodeName client_name;
77 #if !defined(OS_WIN)
78 uint32_t process_handle;
79 uint32_t padding;
80 #endif
81 };
82
83 #if !defined(OS_WIN)
84 static_assert(sizeof(base::ProcessHandle) == sizeof(uint32_t),
85 "Unexpected pid size");
86 static_assert(sizeof(AddBrokerClientData) % kChannelMessageAlignment == 0,
87 "Invalid AddBrokerClientData size.");
88 #endif
89
90 // This data is followed by a platform channel handle to the broker.
91 struct BrokerClientAddedData {
92 ports::NodeName client_name;
93 };
94
95 // This data may be followed by a platform channel handle to the broker. If not,
96 // then the inviter is the broker and its channel should be used as such.
97 struct AcceptBrokerClientData {
98 ports::NodeName broker_name;
99 };
100
101 // This is followed by arbitrary payload data which is interpreted as a token
102 // string for port location.
103 struct RequestPortMergeData {
104 ports::PortName connector_port_name;
105 };
106
107 // Used for both REQUEST_INTRODUCTION and INTRODUCE.
108 //
109 // For INTRODUCE the message also includes a valid platform handle for a channel
110 // the receiver may use to communicate with the named node directly, or an
111 // invalid platform handle if the node is unknown to the sender or otherwise
112 // cannot be introduced.
113 struct IntroductionData {
114 ports::NodeName name;
115 };
116
117 // This message is just a PlatformHandle. The data struct here has only a
118 // padding field to ensure an aligned, non-zero-length payload.
119 struct BindBrokerHostData {
120 uint64_t padding;
121 };
122
123 #if defined(OS_WIN)
124 // This struct is followed by the full payload of a message to be relayed.
125 struct RelayEventMessageData {
126 ports::NodeName destination;
127 };
128
129 // This struct is followed by the full payload of a relayed message.
130 struct EventMessageFromRelayData {
131 ports::NodeName source;
132 };
133 #endif
134
135 #pragma pack(pop)
136
137 template <typename DataType>
CreateMessage(MessageType type,size_t payload_size,size_t num_handles,DataType ** out_data,size_t capacity=0)138 Channel::MessagePtr CreateMessage(MessageType type,
139 size_t payload_size,
140 size_t num_handles,
141 DataType** out_data,
142 size_t capacity = 0) {
143 const size_t total_size = payload_size + sizeof(Header);
144 if (capacity == 0)
145 capacity = total_size;
146 else
147 capacity = std::max(total_size, capacity);
148 auto message =
149 std::make_unique<Channel::Message>(capacity, total_size, num_handles);
150 Header* header = reinterpret_cast<Header*>(message->mutable_payload());
151 header->type = type;
152 header->padding = 0;
153 *out_data = reinterpret_cast<DataType*>(&header[1]);
154 return message;
155 }
156
157 template <typename DataType>
GetMessagePayload(const void * bytes,size_t num_bytes,DataType ** out_data)158 bool GetMessagePayload(const void* bytes,
159 size_t num_bytes,
160 DataType** out_data) {
161 static_assert(sizeof(DataType) > 0, "DataType must have non-zero size.");
162 if (num_bytes < sizeof(Header) + sizeof(DataType))
163 return false;
164 *out_data = reinterpret_cast<const DataType*>(
165 static_cast<const char*>(bytes) + sizeof(Header));
166 return true;
167 }
168
169 } // namespace
170
171 // static
Create(Delegate * delegate,ConnectionParams connection_params,Channel::HandlePolicy channel_handle_policy,scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,const ProcessErrorCallback & process_error_callback)172 scoped_refptr<NodeChannel> NodeChannel::Create(
173 Delegate* delegate,
174 ConnectionParams connection_params,
175 Channel::HandlePolicy channel_handle_policy,
176 scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,
177 const ProcessErrorCallback& process_error_callback) {
178 #if defined(OS_NACL_SFI)
179 LOG(FATAL) << "Multi-process not yet supported on NaCl-SFI";
180 return nullptr;
181 #else
182 return new NodeChannel(delegate, std::move(connection_params),
183 channel_handle_policy, io_task_runner,
184 process_error_callback);
185 #endif
186 }
187
188 // static
CreateEventMessage(size_t capacity,size_t payload_size,void ** payload,size_t num_handles)189 Channel::MessagePtr NodeChannel::CreateEventMessage(size_t capacity,
190 size_t payload_size,
191 void** payload,
192 size_t num_handles) {
193 return CreateMessage(MessageType::EVENT_MESSAGE, payload_size, num_handles,
194 payload, capacity);
195 }
196
197 // static
GetEventMessageData(Channel::Message * message,void ** data,size_t * num_data_bytes)198 void NodeChannel::GetEventMessageData(Channel::Message* message,
199 void** data,
200 size_t* num_data_bytes) {
201 // NOTE: OnChannelMessage guarantees that we never accept a Channel::Message
202 // with a payload of fewer than |sizeof(Header)| bytes.
203 *data = reinterpret_cast<Header*>(message->mutable_payload()) + 1;
204 *num_data_bytes = message->payload_size() - sizeof(Header);
205 }
206
Start()207 void NodeChannel::Start() {
208 base::AutoLock lock(channel_lock_);
209 // ShutDown() may have already been called, in which case |channel_| is null.
210 if (channel_)
211 channel_->Start();
212 }
213
ShutDown()214 void NodeChannel::ShutDown() {
215 base::AutoLock lock(channel_lock_);
216 if (channel_) {
217 channel_->ShutDown();
218 channel_ = nullptr;
219 }
220 }
221
LeakHandleOnShutdown()222 void NodeChannel::LeakHandleOnShutdown() {
223 base::AutoLock lock(channel_lock_);
224 if (channel_) {
225 channel_->LeakHandle();
226 }
227 }
228
NotifyBadMessage(const std::string & error)229 void NodeChannel::NotifyBadMessage(const std::string& error) {
230 DCHECK(HasBadMessageHandler());
231 process_error_callback_.Run("Received bad user message: " + error);
232 }
233
SetRemoteProcessHandle(ScopedProcessHandle process_handle)234 void NodeChannel::SetRemoteProcessHandle(ScopedProcessHandle process_handle) {
235 DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
236 {
237 base::AutoLock lock(channel_lock_);
238 if (channel_)
239 channel_->set_remote_process(process_handle.Clone());
240 }
241 base::AutoLock lock(remote_process_handle_lock_);
242 DCHECK(!remote_process_handle_.is_valid());
243 CHECK_NE(remote_process_handle_.get(), base::GetCurrentProcessHandle());
244 remote_process_handle_ = std::move(process_handle);
245 }
246
HasRemoteProcessHandle()247 bool NodeChannel::HasRemoteProcessHandle() {
248 base::AutoLock lock(remote_process_handle_lock_);
249 return remote_process_handle_.is_valid();
250 }
251
CloneRemoteProcessHandle()252 ScopedProcessHandle NodeChannel::CloneRemoteProcessHandle() {
253 base::AutoLock lock(remote_process_handle_lock_);
254 if (!remote_process_handle_.is_valid())
255 return ScopedProcessHandle();
256 return remote_process_handle_.Clone();
257 }
258
SetRemoteNodeName(const ports::NodeName & name)259 void NodeChannel::SetRemoteNodeName(const ports::NodeName& name) {
260 DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
261 remote_node_name_ = name;
262 }
263
AcceptInvitee(const ports::NodeName & inviter_name,const ports::NodeName & token)264 void NodeChannel::AcceptInvitee(const ports::NodeName& inviter_name,
265 const ports::NodeName& token) {
266 AcceptInviteeData* data;
267 Channel::MessagePtr message = CreateMessage(
268 MessageType::ACCEPT_INVITEE, sizeof(AcceptInviteeData), 0, &data);
269 data->inviter_name = inviter_name;
270 data->token = token;
271 WriteChannelMessage(std::move(message));
272 }
273
AcceptInvitation(const ports::NodeName & token,const ports::NodeName & invitee_name)274 void NodeChannel::AcceptInvitation(const ports::NodeName& token,
275 const ports::NodeName& invitee_name) {
276 AcceptInvitationData* data;
277 Channel::MessagePtr message = CreateMessage(
278 MessageType::ACCEPT_INVITATION, sizeof(AcceptInvitationData), 0, &data);
279 data->token = token;
280 data->invitee_name = invitee_name;
281 WriteChannelMessage(std::move(message));
282 }
283
AcceptPeer(const ports::NodeName & sender_name,const ports::NodeName & token,const ports::PortName & port_name)284 void NodeChannel::AcceptPeer(const ports::NodeName& sender_name,
285 const ports::NodeName& token,
286 const ports::PortName& port_name) {
287 AcceptPeerData* data;
288 Channel::MessagePtr message =
289 CreateMessage(MessageType::ACCEPT_PEER, sizeof(AcceptPeerData), 0, &data);
290 data->token = token;
291 data->peer_name = sender_name;
292 data->port_name = port_name;
293 WriteChannelMessage(std::move(message));
294 }
295
AddBrokerClient(const ports::NodeName & client_name,ScopedProcessHandle process_handle)296 void NodeChannel::AddBrokerClient(const ports::NodeName& client_name,
297 ScopedProcessHandle process_handle) {
298 AddBrokerClientData* data;
299 std::vector<PlatformHandle> handles;
300 #if defined(OS_WIN)
301 handles.emplace_back(base::win::ScopedHandle(process_handle.release()));
302 #endif
303 Channel::MessagePtr message =
304 CreateMessage(MessageType::ADD_BROKER_CLIENT, sizeof(AddBrokerClientData),
305 handles.size(), &data);
306 message->SetHandles(std::move(handles));
307 data->client_name = client_name;
308 #if !defined(OS_WIN)
309 data->process_handle = process_handle.get();
310 data->padding = 0;
311 #endif
312 WriteChannelMessage(std::move(message));
313 }
314
BrokerClientAdded(const ports::NodeName & client_name,PlatformHandle broker_channel)315 void NodeChannel::BrokerClientAdded(const ports::NodeName& client_name,
316 PlatformHandle broker_channel) {
317 BrokerClientAddedData* data;
318 std::vector<PlatformHandle> handles;
319 if (broker_channel.is_valid())
320 handles.emplace_back(std::move(broker_channel));
321 Channel::MessagePtr message =
322 CreateMessage(MessageType::BROKER_CLIENT_ADDED,
323 sizeof(BrokerClientAddedData), handles.size(), &data);
324 message->SetHandles(std::move(handles));
325 data->client_name = client_name;
326 WriteChannelMessage(std::move(message));
327 }
328
AcceptBrokerClient(const ports::NodeName & broker_name,PlatformHandle broker_channel)329 void NodeChannel::AcceptBrokerClient(const ports::NodeName& broker_name,
330 PlatformHandle broker_channel) {
331 AcceptBrokerClientData* data;
332 std::vector<PlatformHandle> handles;
333 if (broker_channel.is_valid())
334 handles.emplace_back(std::move(broker_channel));
335 Channel::MessagePtr message =
336 CreateMessage(MessageType::ACCEPT_BROKER_CLIENT,
337 sizeof(AcceptBrokerClientData), handles.size(), &data);
338 message->SetHandles(std::move(handles));
339 data->broker_name = broker_name;
340 WriteChannelMessage(std::move(message));
341 }
342
RequestPortMerge(const ports::PortName & connector_port_name,const std::string & token)343 void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name,
344 const std::string& token) {
345 RequestPortMergeData* data;
346 Channel::MessagePtr message =
347 CreateMessage(MessageType::REQUEST_PORT_MERGE,
348 sizeof(RequestPortMergeData) + token.size(), 0, &data);
349 data->connector_port_name = connector_port_name;
350 memcpy(data + 1, token.data(), token.size());
351 WriteChannelMessage(std::move(message));
352 }
353
RequestIntroduction(const ports::NodeName & name)354 void NodeChannel::RequestIntroduction(const ports::NodeName& name) {
355 IntroductionData* data;
356 Channel::MessagePtr message = CreateMessage(
357 MessageType::REQUEST_INTRODUCTION, sizeof(IntroductionData), 0, &data);
358 data->name = name;
359 WriteChannelMessage(std::move(message));
360 }
361
Introduce(const ports::NodeName & name,PlatformHandle channel_handle)362 void NodeChannel::Introduce(const ports::NodeName& name,
363 PlatformHandle channel_handle) {
364 IntroductionData* data;
365 std::vector<PlatformHandle> handles;
366 if (channel_handle.is_valid())
367 handles.emplace_back(std::move(channel_handle));
368 Channel::MessagePtr message = CreateMessage(
369 MessageType::INTRODUCE, sizeof(IntroductionData), handles.size(), &data);
370 message->SetHandles(std::move(handles));
371 data->name = name;
372 WriteChannelMessage(std::move(message));
373 }
374
SendChannelMessage(Channel::MessagePtr message)375 void NodeChannel::SendChannelMessage(Channel::MessagePtr message) {
376 WriteChannelMessage(std::move(message));
377 }
378
Broadcast(Channel::MessagePtr message)379 void NodeChannel::Broadcast(Channel::MessagePtr message) {
380 DCHECK(!message->has_handles());
381 void* data;
382 Channel::MessagePtr broadcast_message = CreateMessage(
383 MessageType::BROADCAST_EVENT, message->data_num_bytes(), 0, &data);
384 memcpy(data, message->data(), message->data_num_bytes());
385 WriteChannelMessage(std::move(broadcast_message));
386 }
387
BindBrokerHost(PlatformHandle broker_host_handle)388 void NodeChannel::BindBrokerHost(PlatformHandle broker_host_handle) {
389 #if !defined(OS_APPLE) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
390 DCHECK(broker_host_handle.is_valid());
391 BindBrokerHostData* data;
392 std::vector<PlatformHandle> handles;
393 handles.push_back(std::move(broker_host_handle));
394 Channel::MessagePtr message =
395 CreateMessage(MessageType::BIND_BROKER_HOST, sizeof(BindBrokerHostData),
396 handles.size(), &data);
397 data->padding = 0;
398 message->SetHandles(std::move(handles));
399 WriteChannelMessage(std::move(message));
400 #endif
401 }
402
403 #if defined(OS_WIN)
RelayEventMessage(const ports::NodeName & destination,Channel::MessagePtr message)404 void NodeChannel::RelayEventMessage(const ports::NodeName& destination,
405 Channel::MessagePtr message) {
406 #if defined(OS_WIN)
407 DCHECK(message->has_handles());
408
409 // Note that this is only used on Windows, and on Windows all platform
410 // handles are included in the message data. We blindly copy all the data
411 // here and the relay node (the broker) will duplicate handles as needed.
412 size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes();
413 RelayEventMessageData* data;
414 Channel::MessagePtr relay_message =
415 CreateMessage(MessageType::RELAY_EVENT_MESSAGE, num_bytes, 0, &data);
416 data->destination = destination;
417 memcpy(data + 1, message->data(), message->data_num_bytes());
418
419 // When the handles are duplicated in the broker, the source handles will
420 // be closed. If the broker never receives this message then these handles
421 // will leak, but that means something else has probably broken and the
422 // sending process won't likely be around much longer.
423 //
424 // TODO(https://crbug.com/813112): We would like to be able to violate the
425 // above stated assumption. We should not leak handles in cases where we
426 // outlive the broker, as we may continue existing and eventually accept a new
427 // broker invitation.
428 std::vector<PlatformHandleInTransit> handles = message->TakeHandles();
429 for (auto& handle : handles)
430 handle.TakeHandle().release();
431
432 #else
433 DCHECK(message->has_mach_ports());
434
435 // On OSX, the handles are extracted from the relayed message and attached to
436 // the wrapper. The broker then takes the handles attached to the wrapper and
437 // moves them back to the relayed message. This is necessary because the
438 // message may contain fds which need to be attached to the outer message so
439 // that they can be transferred to the broker.
440 std::vector<PlatformHandleInTransit> handles = message->TakeHandles();
441 size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes();
442 RelayEventMessageData* data;
443 Channel::MessagePtr relay_message = CreateMessage(
444 MessageType::RELAY_EVENT_MESSAGE, num_bytes, handles.size(), &data);
445 data->destination = destination;
446 memcpy(data + 1, message->data(), message->data_num_bytes());
447 relay_message->SetHandles(std::move(handles));
448 #endif // defined(OS_WIN)
449
450 WriteChannelMessage(std::move(relay_message));
451 }
452
EventMessageFromRelay(const ports::NodeName & source,Channel::MessagePtr message)453 void NodeChannel::EventMessageFromRelay(const ports::NodeName& source,
454 Channel::MessagePtr message) {
455 size_t num_bytes =
456 sizeof(EventMessageFromRelayData) + message->payload_size();
457 EventMessageFromRelayData* data;
458 Channel::MessagePtr relayed_message =
459 CreateMessage(MessageType::EVENT_MESSAGE_FROM_RELAY, num_bytes,
460 message->num_handles(), &data);
461 data->source = source;
462 if (message->payload_size())
463 memcpy(data + 1, message->payload(), message->payload_size());
464 relayed_message->SetHandles(message->TakeHandles());
465 WriteChannelMessage(std::move(relayed_message));
466 }
467 #endif // defined(OS_WIN)
468
NodeChannel(Delegate * delegate,ConnectionParams connection_params,Channel::HandlePolicy channel_handle_policy,scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,const ProcessErrorCallback & process_error_callback)469 NodeChannel::NodeChannel(
470 Delegate* delegate,
471 ConnectionParams connection_params,
472 Channel::HandlePolicy channel_handle_policy,
473 scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,
474 const ProcessErrorCallback& process_error_callback)
475 : base::RefCountedDeleteOnSequence<NodeChannel>(io_task_runner),
476 delegate_(delegate),
477 process_error_callback_(process_error_callback)
478 #if !defined(OS_NACL_SFI)
479 ,
480 channel_(Channel::Create(this,
481 std::move(connection_params),
482 channel_handle_policy,
483 std::move(io_task_runner)))
484 #endif
485 {
486 }
487
~NodeChannel()488 NodeChannel::~NodeChannel() {
489 ShutDown();
490 }
491
CreateAndBindLocalBrokerHost(PlatformHandle broker_host_handle)492 void NodeChannel::CreateAndBindLocalBrokerHost(
493 PlatformHandle broker_host_handle) {
494 #if !defined(OS_APPLE) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
495 // Self-owned.
496 ConnectionParams connection_params(
497 PlatformChannelEndpoint(std::move(broker_host_handle)));
498 new BrokerHost(remote_process_handle_.get(), std::move(connection_params),
499 process_error_callback_);
500 #endif
501 }
502
OnChannelMessage(const void * payload,size_t payload_size,std::vector<PlatformHandle> handles)503 void NodeChannel::OnChannelMessage(const void* payload,
504 size_t payload_size,
505 std::vector<PlatformHandle> handles) {
506 DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
507
508 RequestContext request_context(RequestContext::Source::SYSTEM);
509
510 if (payload_size <= sizeof(Header)) {
511 delegate_->OnChannelError(remote_node_name_, this);
512 return;
513 }
514
515 const Header* header = static_cast<const Header*>(payload);
516 switch (header->type) {
517 case MessageType::ACCEPT_INVITEE: {
518 const AcceptInviteeData* data;
519 if (GetMessagePayload(payload, payload_size, &data)) {
520 delegate_->OnAcceptInvitee(remote_node_name_, data->inviter_name,
521 data->token);
522 return;
523 }
524 break;
525 }
526
527 case MessageType::ACCEPT_INVITATION: {
528 const AcceptInvitationData* data;
529 if (GetMessagePayload(payload, payload_size, &data)) {
530 delegate_->OnAcceptInvitation(remote_node_name_, data->token,
531 data->invitee_name);
532 return;
533 }
534 break;
535 }
536
537 case MessageType::ADD_BROKER_CLIENT: {
538 const AddBrokerClientData* data;
539 if (GetMessagePayload(payload, payload_size, &data)) {
540 #if defined(OS_WIN)
541 if (handles.size() != 1) {
542 DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
543 break;
544 }
545 delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
546 handles[0].ReleaseHandle());
547 #else
548 if (!handles.empty()) {
549 DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
550 break;
551 }
552 delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
553 data->process_handle);
554 #endif
555 return;
556 }
557 break;
558 }
559
560 case MessageType::BROKER_CLIENT_ADDED: {
561 const BrokerClientAddedData* data;
562 if (GetMessagePayload(payload, payload_size, &data)) {
563 if (handles.size() != 1) {
564 DLOG(ERROR) << "Dropping invalid BrokerClientAdded message.";
565 break;
566 }
567 delegate_->OnBrokerClientAdded(remote_node_name_, data->client_name,
568 std::move(handles[0]));
569 return;
570 }
571 break;
572 }
573
574 case MessageType::ACCEPT_BROKER_CLIENT: {
575 const AcceptBrokerClientData* data;
576 if (GetMessagePayload(payload, payload_size, &data)) {
577 PlatformHandle broker_channel;
578 if (handles.size() > 1) {
579 DLOG(ERROR) << "Dropping invalid AcceptBrokerClient message.";
580 break;
581 }
582 if (handles.size() == 1)
583 broker_channel = std::move(handles[0]);
584
585 delegate_->OnAcceptBrokerClient(remote_node_name_, data->broker_name,
586 std::move(broker_channel));
587 return;
588 }
589 break;
590 }
591
592 case MessageType::EVENT_MESSAGE: {
593 Channel::MessagePtr message(
594 new Channel::Message(payload_size, handles.size()));
595 message->SetHandles(std::move(handles));
596 memcpy(message->mutable_payload(), payload, payload_size);
597 delegate_->OnEventMessage(remote_node_name_, std::move(message));
598 return;
599 }
600
601 case MessageType::REQUEST_PORT_MERGE: {
602 const RequestPortMergeData* data;
603 if (GetMessagePayload(payload, payload_size, &data)) {
604 // Don't accept an empty token.
605 size_t token_size = payload_size - sizeof(*data) - sizeof(Header);
606 if (token_size == 0)
607 break;
608 std::string token(reinterpret_cast<const char*>(data + 1), token_size);
609 delegate_->OnRequestPortMerge(remote_node_name_,
610 data->connector_port_name, token);
611 return;
612 }
613 break;
614 }
615
616 case MessageType::REQUEST_INTRODUCTION: {
617 const IntroductionData* data;
618 if (GetMessagePayload(payload, payload_size, &data)) {
619 delegate_->OnRequestIntroduction(remote_node_name_, data->name);
620 return;
621 }
622 break;
623 }
624
625 case MessageType::INTRODUCE: {
626 const IntroductionData* data;
627 if (GetMessagePayload(payload, payload_size, &data)) {
628 if (handles.size() > 1) {
629 DLOG(ERROR) << "Dropping invalid introduction message.";
630 break;
631 }
632 PlatformHandle channel_handle;
633 if (handles.size() == 1)
634 channel_handle = std::move(handles[0]);
635
636 delegate_->OnIntroduce(remote_node_name_, data->name,
637 std::move(channel_handle));
638 return;
639 }
640 break;
641 }
642
643 #if defined(OS_WIN)
644 case MessageType::RELAY_EVENT_MESSAGE: {
645 base::ProcessHandle from_process;
646 {
647 base::AutoLock lock(remote_process_handle_lock_);
648 // NOTE: It's safe to retain a weak reference to this process handle
649 // through the extent of this call because |this| is kept alive and
650 // |remote_process_handle_| is never reset once set.
651 from_process = remote_process_handle_.get();
652 }
653 const RelayEventMessageData* data;
654 if (GetMessagePayload(payload, payload_size, &data)) {
655 // Don't try to relay an empty message.
656 if (payload_size <= sizeof(Header) + sizeof(RelayEventMessageData))
657 break;
658
659 const void* message_start = data + 1;
660 Channel::MessagePtr message = Channel::Message::Deserialize(
661 message_start, payload_size - sizeof(Header) - sizeof(*data),
662 from_process);
663 if (!message) {
664 DLOG(ERROR) << "Dropping invalid relay message.";
665 break;
666 }
667 delegate_->OnRelayEventMessage(remote_node_name_, from_process,
668 data->destination, std::move(message));
669 return;
670 }
671 break;
672 }
673 #endif
674
675 case MessageType::BROADCAST_EVENT: {
676 if (payload_size <= sizeof(Header))
677 break;
678 const void* data = static_cast<const void*>(
679 reinterpret_cast<const Header*>(payload) + 1);
680 Channel::MessagePtr message =
681 Channel::Message::Deserialize(data, payload_size - sizeof(Header));
682 if (!message || message->has_handles()) {
683 DLOG(ERROR) << "Dropping invalid broadcast message.";
684 break;
685 }
686 delegate_->OnBroadcast(remote_node_name_, std::move(message));
687 return;
688 }
689
690 #if defined(OS_WIN)
691 case MessageType::EVENT_MESSAGE_FROM_RELAY: {
692 const EventMessageFromRelayData* data;
693 if (GetMessagePayload(payload, payload_size, &data)) {
694 size_t num_bytes = payload_size - sizeof(*data);
695 if (num_bytes < sizeof(Header))
696 break;
697 num_bytes -= sizeof(Header);
698
699 Channel::MessagePtr message(
700 new Channel::Message(num_bytes, handles.size()));
701 message->SetHandles(std::move(handles));
702 if (num_bytes)
703 memcpy(message->mutable_payload(), data + 1, num_bytes);
704 delegate_->OnEventMessageFromRelay(remote_node_name_, data->source,
705 std::move(message));
706 return;
707 }
708 break;
709 }
710 #endif // defined(OS_WIN)
711
712 case MessageType::ACCEPT_PEER: {
713 const AcceptPeerData* data;
714 if (GetMessagePayload(payload, payload_size, &data)) {
715 delegate_->OnAcceptPeer(remote_node_name_, data->token, data->peer_name,
716 data->port_name);
717 return;
718 }
719 break;
720 }
721
722 case MessageType::BIND_BROKER_HOST:
723 if (handles.size() == 1) {
724 CreateAndBindLocalBrokerHost(std::move(handles[0]));
725 return;
726 }
727 break;
728
729 default:
730 // Ignore unrecognized message types, allowing for future extensibility.
731 return;
732 }
733
734 DLOG(ERROR) << "Received invalid message. Closing channel.";
735 if (process_error_callback_)
736 process_error_callback_.Run("NodeChannel received a malformed message");
737 delegate_->OnChannelError(remote_node_name_, this);
738 }
739
OnChannelError(Channel::Error error)740 void NodeChannel::OnChannelError(Channel::Error error) {
741 DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
742
743 RequestContext request_context(RequestContext::Source::SYSTEM);
744
745 ShutDown();
746
747 if (process_error_callback_ &&
748 error == Channel::Error::kReceivedMalformedData) {
749 process_error_callback_.Run("Channel received a malformed message");
750 }
751
752 // |OnChannelError()| may cause |this| to be destroyed, but still need access
753 // to the name after that destruction. So make a copy of
754 // |remote_node_name_| so it can be used if |this| becomes destroyed.
755 ports::NodeName node_name = remote_node_name_;
756 delegate_->OnChannelError(node_name, this);
757 }
758
WriteChannelMessage(Channel::MessagePtr message)759 void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) {
760 base::AutoLock lock(channel_lock_);
761 if (!channel_)
762 DLOG(ERROR) << "Dropping message on closed channel.";
763 else
764 channel_->Write(std::move(message));
765 }
766
767 } // namespace core
768 } // namespace mojo
769