1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/core/node_channel.h"
6
7 #include <cstring>
8 #include <limits>
9 #include <sstream>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/memory/ptr_util.h"
15 #include "mojo/core/broker_host.h"
16 #include "mojo/core/channel.h"
17 #include "mojo/core/configuration.h"
18 #include "mojo/core/core.h"
19 #include "mojo/core/request_context.h"
20
21 namespace mojo {
22 namespace core {
23
24 namespace {
25
26 // NOTE: Please ONLY append messages to the end of this enum.
27 enum class MessageType : uint32_t {
28 ACCEPT_INVITEE,
29 ACCEPT_INVITATION,
30 ADD_BROKER_CLIENT,
31 BROKER_CLIENT_ADDED,
32 ACCEPT_BROKER_CLIENT,
33 EVENT_MESSAGE,
34 REQUEST_PORT_MERGE,
35 REQUEST_INTRODUCTION,
36 INTRODUCE,
37 #if defined(OS_WIN)
38 RELAY_EVENT_MESSAGE,
39 #endif
40 BROADCAST_EVENT,
41 #if defined(OS_WIN)
42 EVENT_MESSAGE_FROM_RELAY,
43 #endif
44 ACCEPT_PEER,
45 BIND_BROKER_HOST,
46 };
47
48 struct Header {
49 MessageType type;
50 uint32_t padding;
51 };
52
53 static_assert(IsAlignedForChannelMessage(sizeof(Header)),
54 "Invalid header size.");
55
56 struct AcceptInviteeData {
57 ports::NodeName inviter_name;
58 ports::NodeName token;
59 };
60
61 struct AcceptInvitationData {
62 ports::NodeName token;
63 ports::NodeName invitee_name;
64 };
65
66 struct AcceptPeerData {
67 ports::NodeName token;
68 ports::NodeName peer_name;
69 ports::PortName port_name;
70 };
71
72 // This message may include a process handle on plaforms that require it.
73 struct AddBrokerClientData {
74 ports::NodeName client_name;
75 #if !defined(OS_WIN)
76 uint32_t process_handle;
77 uint32_t padding;
78 #endif
79 };
80
81 #if !defined(OS_WIN)
82 static_assert(sizeof(base::ProcessHandle) == sizeof(uint32_t),
83 "Unexpected pid size");
84 static_assert(sizeof(AddBrokerClientData) % kChannelMessageAlignment == 0,
85 "Invalid AddBrokerClientData size.");
86 #endif
87
88 // This data is followed by a platform channel handle to the broker.
89 struct BrokerClientAddedData {
90 ports::NodeName client_name;
91 };
92
93 // This data may be followed by a platform channel handle to the broker. If not,
94 // then the inviter is the broker and its channel should be used as such.
95 struct AcceptBrokerClientData {
96 ports::NodeName broker_name;
97 };
98
99 // This is followed by arbitrary payload data which is interpreted as a token
100 // string for port location.
101 struct RequestPortMergeData {
102 ports::PortName connector_port_name;
103 };
104
105 // Used for both REQUEST_INTRODUCTION and INTRODUCE.
106 //
107 // For INTRODUCE the message also includes a valid platform handle for a channel
108 // the receiver may use to communicate with the named node directly, or an
109 // invalid platform handle if the node is unknown to the sender or otherwise
110 // cannot be introduced.
111 struct IntroductionData {
112 ports::NodeName name;
113 };
114
115 // This message is just a PlatformHandle. The data struct here has only a
116 // padding field to ensure an aligned, non-zero-length payload.
117 struct BindBrokerHostData {
118 uint64_t padding;
119 };
120
121 #if defined(OS_WIN)
122 // This struct is followed by the full payload of a message to be relayed.
123 struct RelayEventMessageData {
124 ports::NodeName destination;
125 };
126
127 // This struct is followed by the full payload of a relayed message.
128 struct EventMessageFromRelayData {
129 ports::NodeName source;
130 };
131 #endif
132
133 template <typename DataType>
CreateMessage(MessageType type,size_t payload_size,size_t num_handles,DataType ** out_data,size_t capacity=0)134 Channel::MessagePtr CreateMessage(MessageType type,
135 size_t payload_size,
136 size_t num_handles,
137 DataType** out_data,
138 size_t capacity = 0) {
139 const size_t total_size = payload_size + sizeof(Header);
140 if (capacity == 0)
141 capacity = total_size;
142 else
143 capacity = std::max(total_size, capacity);
144 auto message =
145 std::make_unique<Channel::Message>(capacity, total_size, num_handles);
146 Header* header = reinterpret_cast<Header*>(message->mutable_payload());
147 header->type = type;
148 header->padding = 0;
149 *out_data = reinterpret_cast<DataType*>(&header[1]);
150 return message;
151 }
152
153 template <typename DataType>
GetMessagePayload(const void * bytes,size_t num_bytes,DataType ** out_data)154 bool GetMessagePayload(const void* bytes,
155 size_t num_bytes,
156 DataType** out_data) {
157 static_assert(sizeof(DataType) > 0, "DataType must have non-zero size.");
158 if (num_bytes < sizeof(Header) + sizeof(DataType))
159 return false;
160 *out_data = reinterpret_cast<const DataType*>(
161 static_cast<const char*>(bytes) + sizeof(Header));
162 return true;
163 }
164
165 } // namespace
166
167 // static
Create(Delegate * delegate,ConnectionParams connection_params,Channel::HandlePolicy channel_handle_policy,scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,const ProcessErrorCallback & process_error_callback)168 scoped_refptr<NodeChannel> NodeChannel::Create(
169 Delegate* delegate,
170 ConnectionParams connection_params,
171 Channel::HandlePolicy channel_handle_policy,
172 scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,
173 const ProcessErrorCallback& process_error_callback) {
174 #if defined(OS_NACL_SFI)
175 LOG(FATAL) << "Multi-process not yet supported on NaCl-SFI";
176 return nullptr;
177 #else
178 return new NodeChannel(delegate, std::move(connection_params),
179 channel_handle_policy, io_task_runner,
180 process_error_callback);
181 #endif
182 }
183
184 // static
CreateEventMessage(size_t capacity,size_t payload_size,void ** payload,size_t num_handles)185 Channel::MessagePtr NodeChannel::CreateEventMessage(size_t capacity,
186 size_t payload_size,
187 void** payload,
188 size_t num_handles) {
189 return CreateMessage(MessageType::EVENT_MESSAGE, payload_size, num_handles,
190 payload, capacity);
191 }
192
193 // static
GetEventMessageData(Channel::Message * message,void ** data,size_t * num_data_bytes)194 void NodeChannel::GetEventMessageData(Channel::Message* message,
195 void** data,
196 size_t* num_data_bytes) {
197 // NOTE: OnChannelMessage guarantees that we never accept a Channel::Message
198 // with a payload of fewer than |sizeof(Header)| bytes.
199 *data = reinterpret_cast<Header*>(message->mutable_payload()) + 1;
200 *num_data_bytes = message->payload_size() - sizeof(Header);
201 }
202
Start()203 void NodeChannel::Start() {
204 base::AutoLock lock(channel_lock_);
205 // ShutDown() may have already been called, in which case |channel_| is null.
206 if (channel_)
207 channel_->Start();
208 }
209
ShutDown()210 void NodeChannel::ShutDown() {
211 base::AutoLock lock(channel_lock_);
212 if (channel_) {
213 channel_->ShutDown();
214 channel_ = nullptr;
215 }
216 }
217
LeakHandleOnShutdown()218 void NodeChannel::LeakHandleOnShutdown() {
219 base::AutoLock lock(channel_lock_);
220 if (channel_) {
221 channel_->LeakHandle();
222 }
223 }
224
NotifyBadMessage(const std::string & error)225 void NodeChannel::NotifyBadMessage(const std::string& error) {
226 DCHECK(HasBadMessageHandler());
227 process_error_callback_.Run("Received bad user message: " + error);
228 }
229
SetRemoteProcessHandle(ScopedProcessHandle process_handle)230 void NodeChannel::SetRemoteProcessHandle(ScopedProcessHandle process_handle) {
231 DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
232 {
233 base::AutoLock lock(channel_lock_);
234 if (channel_)
235 channel_->set_remote_process(process_handle.Clone());
236 }
237 base::AutoLock lock(remote_process_handle_lock_);
238 DCHECK(!remote_process_handle_.is_valid());
239 CHECK_NE(remote_process_handle_.get(), base::GetCurrentProcessHandle());
240 remote_process_handle_ = std::move(process_handle);
241 }
242
HasRemoteProcessHandle()243 bool NodeChannel::HasRemoteProcessHandle() {
244 base::AutoLock lock(remote_process_handle_lock_);
245 return remote_process_handle_.is_valid();
246 }
247
CloneRemoteProcessHandle()248 ScopedProcessHandle NodeChannel::CloneRemoteProcessHandle() {
249 base::AutoLock lock(remote_process_handle_lock_);
250 if (!remote_process_handle_.is_valid())
251 return ScopedProcessHandle();
252 return remote_process_handle_.Clone();
253 }
254
SetRemoteNodeName(const ports::NodeName & name)255 void NodeChannel::SetRemoteNodeName(const ports::NodeName& name) {
256 DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
257 remote_node_name_ = name;
258 }
259
AcceptInvitee(const ports::NodeName & inviter_name,const ports::NodeName & token)260 void NodeChannel::AcceptInvitee(const ports::NodeName& inviter_name,
261 const ports::NodeName& token) {
262 AcceptInviteeData* data;
263 Channel::MessagePtr message = CreateMessage(
264 MessageType::ACCEPT_INVITEE, sizeof(AcceptInviteeData), 0, &data);
265 data->inviter_name = inviter_name;
266 data->token = token;
267 WriteChannelMessage(std::move(message));
268 }
269
AcceptInvitation(const ports::NodeName & token,const ports::NodeName & invitee_name)270 void NodeChannel::AcceptInvitation(const ports::NodeName& token,
271 const ports::NodeName& invitee_name) {
272 AcceptInvitationData* data;
273 Channel::MessagePtr message = CreateMessage(
274 MessageType::ACCEPT_INVITATION, sizeof(AcceptInvitationData), 0, &data);
275 data->token = token;
276 data->invitee_name = invitee_name;
277 WriteChannelMessage(std::move(message));
278 }
279
AcceptPeer(const ports::NodeName & sender_name,const ports::NodeName & token,const ports::PortName & port_name)280 void NodeChannel::AcceptPeer(const ports::NodeName& sender_name,
281 const ports::NodeName& token,
282 const ports::PortName& port_name) {
283 AcceptPeerData* data;
284 Channel::MessagePtr message =
285 CreateMessage(MessageType::ACCEPT_PEER, sizeof(AcceptPeerData), 0, &data);
286 data->token = token;
287 data->peer_name = sender_name;
288 data->port_name = port_name;
289 WriteChannelMessage(std::move(message));
290 }
291
AddBrokerClient(const ports::NodeName & client_name,ScopedProcessHandle process_handle)292 void NodeChannel::AddBrokerClient(const ports::NodeName& client_name,
293 ScopedProcessHandle process_handle) {
294 AddBrokerClientData* data;
295 std::vector<PlatformHandle> handles;
296 #if defined(OS_WIN)
297 handles.emplace_back(base::win::ScopedHandle(process_handle.release()));
298 #endif
299 Channel::MessagePtr message =
300 CreateMessage(MessageType::ADD_BROKER_CLIENT, sizeof(AddBrokerClientData),
301 handles.size(), &data);
302 message->SetHandles(std::move(handles));
303 data->client_name = client_name;
304 #if !defined(OS_WIN)
305 data->process_handle = process_handle.get();
306 data->padding = 0;
307 #endif
308 WriteChannelMessage(std::move(message));
309 }
310
BrokerClientAdded(const ports::NodeName & client_name,PlatformHandle broker_channel)311 void NodeChannel::BrokerClientAdded(const ports::NodeName& client_name,
312 PlatformHandle broker_channel) {
313 BrokerClientAddedData* data;
314 std::vector<PlatformHandle> handles;
315 if (broker_channel.is_valid())
316 handles.emplace_back(std::move(broker_channel));
317 Channel::MessagePtr message =
318 CreateMessage(MessageType::BROKER_CLIENT_ADDED,
319 sizeof(BrokerClientAddedData), handles.size(), &data);
320 message->SetHandles(std::move(handles));
321 data->client_name = client_name;
322 WriteChannelMessage(std::move(message));
323 }
324
AcceptBrokerClient(const ports::NodeName & broker_name,PlatformHandle broker_channel)325 void NodeChannel::AcceptBrokerClient(const ports::NodeName& broker_name,
326 PlatformHandle broker_channel) {
327 AcceptBrokerClientData* data;
328 std::vector<PlatformHandle> handles;
329 if (broker_channel.is_valid())
330 handles.emplace_back(std::move(broker_channel));
331 Channel::MessagePtr message =
332 CreateMessage(MessageType::ACCEPT_BROKER_CLIENT,
333 sizeof(AcceptBrokerClientData), handles.size(), &data);
334 message->SetHandles(std::move(handles));
335 data->broker_name = broker_name;
336 WriteChannelMessage(std::move(message));
337 }
338
RequestPortMerge(const ports::PortName & connector_port_name,const std::string & token)339 void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name,
340 const std::string& token) {
341 RequestPortMergeData* data;
342 Channel::MessagePtr message =
343 CreateMessage(MessageType::REQUEST_PORT_MERGE,
344 sizeof(RequestPortMergeData) + token.size(), 0, &data);
345 data->connector_port_name = connector_port_name;
346 memcpy(data + 1, token.data(), token.size());
347 WriteChannelMessage(std::move(message));
348 }
349
RequestIntroduction(const ports::NodeName & name)350 void NodeChannel::RequestIntroduction(const ports::NodeName& name) {
351 IntroductionData* data;
352 Channel::MessagePtr message = CreateMessage(
353 MessageType::REQUEST_INTRODUCTION, sizeof(IntroductionData), 0, &data);
354 data->name = name;
355 WriteChannelMessage(std::move(message));
356 }
357
Introduce(const ports::NodeName & name,PlatformHandle channel_handle)358 void NodeChannel::Introduce(const ports::NodeName& name,
359 PlatformHandle channel_handle) {
360 IntroductionData* data;
361 std::vector<PlatformHandle> handles;
362 if (channel_handle.is_valid())
363 handles.emplace_back(std::move(channel_handle));
364 Channel::MessagePtr message = CreateMessage(
365 MessageType::INTRODUCE, sizeof(IntroductionData), handles.size(), &data);
366 message->SetHandles(std::move(handles));
367 data->name = name;
368 WriteChannelMessage(std::move(message));
369 }
370
SendChannelMessage(Channel::MessagePtr message)371 void NodeChannel::SendChannelMessage(Channel::MessagePtr message) {
372 WriteChannelMessage(std::move(message));
373 }
374
Broadcast(Channel::MessagePtr message)375 void NodeChannel::Broadcast(Channel::MessagePtr message) {
376 DCHECK(!message->has_handles());
377 void* data;
378 Channel::MessagePtr broadcast_message = CreateMessage(
379 MessageType::BROADCAST_EVENT, message->data_num_bytes(), 0, &data);
380 memcpy(data, message->data(), message->data_num_bytes());
381 WriteChannelMessage(std::move(broadcast_message));
382 }
383
BindBrokerHost(PlatformHandle broker_host_handle)384 void NodeChannel::BindBrokerHost(PlatformHandle broker_host_handle) {
385 #if !defined(OS_MACOSX) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
386 DCHECK(broker_host_handle.is_valid());
387 BindBrokerHostData* data;
388 std::vector<PlatformHandle> handles;
389 handles.push_back(std::move(broker_host_handle));
390 Channel::MessagePtr message =
391 CreateMessage(MessageType::BIND_BROKER_HOST, sizeof(BindBrokerHostData),
392 handles.size(), &data);
393 data->padding = 0;
394 message->SetHandles(std::move(handles));
395 WriteChannelMessage(std::move(message));
396 #endif
397 }
398
399 #if defined(OS_WIN)
RelayEventMessage(const ports::NodeName & destination,Channel::MessagePtr message)400 void NodeChannel::RelayEventMessage(const ports::NodeName& destination,
401 Channel::MessagePtr message) {
402 #if defined(OS_WIN)
403 DCHECK(message->has_handles());
404
405 // Note that this is only used on Windows, and on Windows all platform
406 // handles are included in the message data. We blindly copy all the data
407 // here and the relay node (the broker) will duplicate handles as needed.
408 size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes();
409 RelayEventMessageData* data;
410 Channel::MessagePtr relay_message =
411 CreateMessage(MessageType::RELAY_EVENT_MESSAGE, num_bytes, 0, &data);
412 data->destination = destination;
413 memcpy(data + 1, message->data(), message->data_num_bytes());
414
415 // When the handles are duplicated in the broker, the source handles will
416 // be closed. If the broker never receives this message then these handles
417 // will leak, but that means something else has probably broken and the
418 // sending process won't likely be around much longer.
419 //
420 // TODO(https://crbug.com/813112): We would like to be able to violate the
421 // above stated assumption. We should not leak handles in cases where we
422 // outlive the broker, as we may continue existing and eventually accept a new
423 // broker invitation.
424 std::vector<PlatformHandleInTransit> handles = message->TakeHandles();
425 for (auto& handle : handles)
426 handle.TakeHandle().release();
427
428 #else
429 DCHECK(message->has_mach_ports());
430
431 // On OSX, the handles are extracted from the relayed message and attached to
432 // the wrapper. The broker then takes the handles attached to the wrapper and
433 // moves them back to the relayed message. This is necessary because the
434 // message may contain fds which need to be attached to the outer message so
435 // that they can be transferred to the broker.
436 std::vector<PlatformHandleInTransit> handles = message->TakeHandles();
437 size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes();
438 RelayEventMessageData* data;
439 Channel::MessagePtr relay_message = CreateMessage(
440 MessageType::RELAY_EVENT_MESSAGE, num_bytes, handles.size(), &data);
441 data->destination = destination;
442 memcpy(data + 1, message->data(), message->data_num_bytes());
443 relay_message->SetHandles(std::move(handles));
444 #endif // defined(OS_WIN)
445
446 WriteChannelMessage(std::move(relay_message));
447 }
448
EventMessageFromRelay(const ports::NodeName & source,Channel::MessagePtr message)449 void NodeChannel::EventMessageFromRelay(const ports::NodeName& source,
450 Channel::MessagePtr message) {
451 size_t num_bytes =
452 sizeof(EventMessageFromRelayData) + message->payload_size();
453 EventMessageFromRelayData* data;
454 Channel::MessagePtr relayed_message =
455 CreateMessage(MessageType::EVENT_MESSAGE_FROM_RELAY, num_bytes,
456 message->num_handles(), &data);
457 data->source = source;
458 if (message->payload_size())
459 memcpy(data + 1, message->payload(), message->payload_size());
460 relayed_message->SetHandles(message->TakeHandles());
461 WriteChannelMessage(std::move(relayed_message));
462 }
463 #endif // defined(OS_WIN)
464
NodeChannel(Delegate * delegate,ConnectionParams connection_params,Channel::HandlePolicy channel_handle_policy,scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,const ProcessErrorCallback & process_error_callback)465 NodeChannel::NodeChannel(
466 Delegate* delegate,
467 ConnectionParams connection_params,
468 Channel::HandlePolicy channel_handle_policy,
469 scoped_refptr<base::SingleThreadTaskRunner> io_task_runner,
470 const ProcessErrorCallback& process_error_callback)
471 : base::RefCountedDeleteOnSequence<NodeChannel>(io_task_runner),
472 delegate_(delegate),
473 process_error_callback_(process_error_callback)
474 #if !defined(OS_NACL_SFI)
475 ,
476 channel_(Channel::Create(this,
477 std::move(connection_params),
478 channel_handle_policy,
479 std::move(io_task_runner)))
480 #endif
481 {
482 }
483
~NodeChannel()484 NodeChannel::~NodeChannel() {
485 ShutDown();
486 }
487
CreateAndBindLocalBrokerHost(PlatformHandle broker_host_handle)488 void NodeChannel::CreateAndBindLocalBrokerHost(
489 PlatformHandle broker_host_handle) {
490 #if !defined(OS_MACOSX) && !defined(OS_NACL) && !defined(OS_FUCHSIA)
491 // Self-owned.
492 ConnectionParams connection_params(
493 PlatformChannelEndpoint(std::move(broker_host_handle)));
494 new BrokerHost(remote_process_handle_.get(), std::move(connection_params),
495 process_error_callback_);
496 #endif
497 }
498
OnChannelMessage(const void * payload,size_t payload_size,std::vector<PlatformHandle> handles)499 void NodeChannel::OnChannelMessage(const void* payload,
500 size_t payload_size,
501 std::vector<PlatformHandle> handles) {
502 DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
503
504 RequestContext request_context(RequestContext::Source::SYSTEM);
505
506 if (payload_size <= sizeof(Header)) {
507 delegate_->OnChannelError(remote_node_name_, this);
508 return;
509 }
510
511 const Header* header = static_cast<const Header*>(payload);
512 switch (header->type) {
513 case MessageType::ACCEPT_INVITEE: {
514 const AcceptInviteeData* data;
515 if (GetMessagePayload(payload, payload_size, &data)) {
516 delegate_->OnAcceptInvitee(remote_node_name_, data->inviter_name,
517 data->token);
518 return;
519 }
520 break;
521 }
522
523 case MessageType::ACCEPT_INVITATION: {
524 const AcceptInvitationData* data;
525 if (GetMessagePayload(payload, payload_size, &data)) {
526 delegate_->OnAcceptInvitation(remote_node_name_, data->token,
527 data->invitee_name);
528 return;
529 }
530 break;
531 }
532
533 case MessageType::ADD_BROKER_CLIENT: {
534 const AddBrokerClientData* data;
535 if (GetMessagePayload(payload, payload_size, &data)) {
536 #if defined(OS_WIN)
537 if (handles.size() != 1) {
538 DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
539 break;
540 }
541 delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
542 handles[0].ReleaseHandle());
543 #else
544 if (!handles.empty()) {
545 DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
546 break;
547 }
548 delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
549 data->process_handle);
550 #endif
551 return;
552 }
553 break;
554 }
555
556 case MessageType::BROKER_CLIENT_ADDED: {
557 const BrokerClientAddedData* data;
558 if (GetMessagePayload(payload, payload_size, &data)) {
559 if (handles.size() != 1) {
560 DLOG(ERROR) << "Dropping invalid BrokerClientAdded message.";
561 break;
562 }
563 delegate_->OnBrokerClientAdded(remote_node_name_, data->client_name,
564 std::move(handles[0]));
565 return;
566 }
567 break;
568 }
569
570 case MessageType::ACCEPT_BROKER_CLIENT: {
571 const AcceptBrokerClientData* data;
572 if (GetMessagePayload(payload, payload_size, &data)) {
573 PlatformHandle broker_channel;
574 if (handles.size() > 1) {
575 DLOG(ERROR) << "Dropping invalid AcceptBrokerClient message.";
576 break;
577 }
578 if (handles.size() == 1)
579 broker_channel = std::move(handles[0]);
580
581 delegate_->OnAcceptBrokerClient(remote_node_name_, data->broker_name,
582 std::move(broker_channel));
583 return;
584 }
585 break;
586 }
587
588 case MessageType::EVENT_MESSAGE: {
589 Channel::MessagePtr message(
590 new Channel::Message(payload_size, handles.size()));
591 message->SetHandles(std::move(handles));
592 memcpy(message->mutable_payload(), payload, payload_size);
593 delegate_->OnEventMessage(remote_node_name_, std::move(message));
594 return;
595 }
596
597 case MessageType::REQUEST_PORT_MERGE: {
598 const RequestPortMergeData* data;
599 if (GetMessagePayload(payload, payload_size, &data)) {
600 // Don't accept an empty token.
601 size_t token_size = payload_size - sizeof(*data) - sizeof(Header);
602 if (token_size == 0)
603 break;
604 std::string token(reinterpret_cast<const char*>(data + 1), token_size);
605 delegate_->OnRequestPortMerge(remote_node_name_,
606 data->connector_port_name, token);
607 return;
608 }
609 break;
610 }
611
612 case MessageType::REQUEST_INTRODUCTION: {
613 const IntroductionData* data;
614 if (GetMessagePayload(payload, payload_size, &data)) {
615 delegate_->OnRequestIntroduction(remote_node_name_, data->name);
616 return;
617 }
618 break;
619 }
620
621 case MessageType::INTRODUCE: {
622 const IntroductionData* data;
623 if (GetMessagePayload(payload, payload_size, &data)) {
624 if (handles.size() > 1) {
625 DLOG(ERROR) << "Dropping invalid introduction message.";
626 break;
627 }
628 PlatformHandle channel_handle;
629 if (handles.size() == 1)
630 channel_handle = std::move(handles[0]);
631
632 delegate_->OnIntroduce(remote_node_name_, data->name,
633 std::move(channel_handle));
634 return;
635 }
636 break;
637 }
638
639 #if defined(OS_WIN)
640 case MessageType::RELAY_EVENT_MESSAGE: {
641 base::ProcessHandle from_process;
642 {
643 base::AutoLock lock(remote_process_handle_lock_);
644 // NOTE: It's safe to retain a weak reference to this process handle
645 // through the extent of this call because |this| is kept alive and
646 // |remote_process_handle_| is never reset once set.
647 from_process = remote_process_handle_.get();
648 }
649 const RelayEventMessageData* data;
650 if (GetMessagePayload(payload, payload_size, &data)) {
651 // Don't try to relay an empty message.
652 if (payload_size <= sizeof(Header) + sizeof(RelayEventMessageData))
653 break;
654
655 const void* message_start = data + 1;
656 Channel::MessagePtr message = Channel::Message::Deserialize(
657 message_start, payload_size - sizeof(Header) - sizeof(*data),
658 from_process);
659 if (!message) {
660 DLOG(ERROR) << "Dropping invalid relay message.";
661 break;
662 }
663 delegate_->OnRelayEventMessage(remote_node_name_, from_process,
664 data->destination, std::move(message));
665 return;
666 }
667 break;
668 }
669 #endif
670
671 case MessageType::BROADCAST_EVENT: {
672 if (payload_size <= sizeof(Header))
673 break;
674 const void* data = static_cast<const void*>(
675 reinterpret_cast<const Header*>(payload) + 1);
676 Channel::MessagePtr message =
677 Channel::Message::Deserialize(data, payload_size - sizeof(Header));
678 if (!message || message->has_handles()) {
679 DLOG(ERROR) << "Dropping invalid broadcast message.";
680 break;
681 }
682 delegate_->OnBroadcast(remote_node_name_, std::move(message));
683 return;
684 }
685
686 #if defined(OS_WIN)
687 case MessageType::EVENT_MESSAGE_FROM_RELAY: {
688 const EventMessageFromRelayData* data;
689 if (GetMessagePayload(payload, payload_size, &data)) {
690 size_t num_bytes = payload_size - sizeof(*data);
691 if (num_bytes < sizeof(Header))
692 break;
693 num_bytes -= sizeof(Header);
694
695 Channel::MessagePtr message(
696 new Channel::Message(num_bytes, handles.size()));
697 message->SetHandles(std::move(handles));
698 if (num_bytes)
699 memcpy(message->mutable_payload(), data + 1, num_bytes);
700 delegate_->OnEventMessageFromRelay(remote_node_name_, data->source,
701 std::move(message));
702 return;
703 }
704 break;
705 }
706 #endif // defined(OS_WIN)
707
708 case MessageType::ACCEPT_PEER: {
709 const AcceptPeerData* data;
710 if (GetMessagePayload(payload, payload_size, &data)) {
711 delegate_->OnAcceptPeer(remote_node_name_, data->token, data->peer_name,
712 data->port_name);
713 return;
714 }
715 break;
716 }
717
718 case MessageType::BIND_BROKER_HOST:
719 if (handles.size() == 1) {
720 CreateAndBindLocalBrokerHost(std::move(handles[0]));
721 return;
722 }
723 break;
724
725 default:
726 // Ignore unrecognized message types, allowing for future extensibility.
727 return;
728 }
729
730 DLOG(ERROR) << "Received invalid message. Closing channel.";
731 if (process_error_callback_)
732 process_error_callback_.Run("NodeChannel received a malformed message");
733 delegate_->OnChannelError(remote_node_name_, this);
734 }
735
OnChannelError(Channel::Error error)736 void NodeChannel::OnChannelError(Channel::Error error) {
737 DCHECK(owning_task_runner()->RunsTasksInCurrentSequence());
738
739 RequestContext request_context(RequestContext::Source::SYSTEM);
740
741 ShutDown();
742
743 if (process_error_callback_ &&
744 error == Channel::Error::kReceivedMalformedData) {
745 process_error_callback_.Run("Channel received a malformed message");
746 }
747
748 // |OnChannelError()| may cause |this| to be destroyed, but still need access
749 // to the name after that destruction. So make a copy of
750 // |remote_node_name_| so it can be used if |this| becomes destroyed.
751 ports::NodeName node_name = remote_node_name_;
752 delegate_->OnChannelError(node_name, this);
753 }
754
WriteChannelMessage(Channel::MessagePtr message)755 void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) {
756 // Force a crash if this process attempts to send a message larger than the
757 // maximum allowed size. This is more useful than killing a Channel when we
758 // *receive* an oversized message, as we should consider oversized message
759 // transmission to be a bug and this helps easily identify offending code.
760 CHECK_LT(message->data_num_bytes(), GetConfiguration().max_message_num_bytes);
761
762 base::AutoLock lock(channel_lock_);
763 if (!channel_)
764 DLOG(ERROR) << "Dropping message on closed channel.";
765 else
766 channel_->Write(std::move(message));
767 }
768
769 } // namespace core
770 } // namespace mojo
771