1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "remoting/host/security_key/security_key_message_reader_impl.h"
6 
7 #include <cstdint>
8 #include <string>
9 #include <utility>
10 
11 #include "base/bind.h"
12 #include "base/files/file.h"
13 #include "base/logging.h"
14 #include "base/macros.h"
15 #include "base/message_loop/message_pump_type.h"
16 #include "base/single_thread_task_runner.h"
17 #include "base/stl_util.h"
18 #include "base/threading/thread_task_runner_handle.h"
19 #include "remoting/host/security_key/security_key_message.h"
20 
21 namespace remoting {
22 
SecurityKeyMessageReaderImpl(base::File input_file)23 SecurityKeyMessageReaderImpl::SecurityKeyMessageReaderImpl(
24     base::File input_file)
25     : read_stream_(std::move(input_file)),
26       reader_thread_("SecurityKeyMessageReaderImpl") {
27   base::Thread::Options options;
28   options.message_pump_type = base::MessagePumpType::IO;
29   reader_thread_.StartWithOptions(options);
30 
31   read_task_runner_ = reader_thread_.task_runner();
32   main_task_runner_ = base::ThreadTaskRunnerHandle::Get();
33 }
34 
~SecurityKeyMessageReaderImpl()35 SecurityKeyMessageReaderImpl::~SecurityKeyMessageReaderImpl() {
36   DCHECK(main_task_runner_->RunsTasksInCurrentSequence());
37 
38   // In order to ensure the reader thread is stopped cleanly, we want to stop
39   // the thread before the task runners and weak pointers are invalidated.
40   reader_thread_.Stop();
41 }
42 
Start(const SecurityKeyMessageCallback & message_callback,base::OnceClosure error_callback)43 void SecurityKeyMessageReaderImpl::Start(
44     const SecurityKeyMessageCallback& message_callback,
45     base::OnceClosure error_callback) {
46   DCHECK(main_task_runner_->RunsTasksInCurrentSequence());
47 
48   message_callback_ = message_callback;
49   error_callback_ = std::move(error_callback);
50 
51   // base::Unretained is safe since this class owns the thread running this task
52   // which will be destroyed before this instance is.
53   read_task_runner_->PostTask(
54       FROM_HERE, base::BindOnce(&SecurityKeyMessageReaderImpl::ReadMessage,
55                                 base::Unretained(this)));
56 }
57 
ReadMessage()58 void SecurityKeyMessageReaderImpl::ReadMessage() {
59   DCHECK(read_task_runner_->RunsTasksInCurrentSequence());
60 
61   while (true) {
62     if (!read_stream_.IsValid()) {
63       LOG(ERROR) << "Cannot read from invalid stream.";
64       NotifyError();
65       return;
66     }
67 
68     uint32_t message_length_bytes = 0;
69     if (!ReadFromStream(reinterpret_cast<char*>(&message_length_bytes), 4)) {
70       NotifyError();
71       return;
72     }
73 
74     if (!SecurityKeyMessage::IsValidMessageSize(message_length_bytes)) {
75       LOG(ERROR) << "Message size is invalid: " << message_length_bytes;
76       NotifyError();
77       return;
78     }
79 
80     std::string message_data(message_length_bytes, '\0');
81     if (!ReadFromStream(base::data(message_data), message_data.size())) {
82       NotifyError();
83       return;
84     }
85 
86     std::unique_ptr<SecurityKeyMessage> message(new SecurityKeyMessage());
87     if (!message->ParseMessage(message_data)) {
88       LOG(ERROR) << "Invalid message data received.";
89       NotifyError();
90       return;
91     }
92 
93     // Notify callback of the new message received.
94     main_task_runner_->PostTask(
95         FROM_HERE,
96         base::BindOnce(&SecurityKeyMessageReaderImpl::InvokeMessageCallback,
97                        weak_factory_.GetWeakPtr(), std::move(message)));
98   }
99 }
100 
ReadFromStream(char * buffer,size_t bytes_to_read)101 bool SecurityKeyMessageReaderImpl::ReadFromStream(char* buffer,
102                                                   size_t bytes_to_read) {
103   DCHECK(buffer);
104   DCHECK_GT(bytes_to_read, 0u);
105 
106   size_t bytes_read = 0;
107   do {
108     int read_result = read_stream_.ReadAtCurrentPosNoBestEffort(
109         buffer + bytes_read, bytes_to_read - bytes_read);
110     if (read_result < 1) {
111       // 0 means EOF which is normal and should not be logged as an error.
112       if (read_result != 0) {
113         LOG(ERROR) << "Failed to read from stream, ReadAtCurrentPos returned "
114                    << read_result;
115       }
116       return false;
117     }
118     bytes_read += read_result;
119   } while (bytes_read < bytes_to_read);
120   DCHECK_EQ(bytes_read, bytes_to_read);
121 
122   return true;
123 }
124 
NotifyError()125 void SecurityKeyMessageReaderImpl::NotifyError() {
126   DCHECK(read_task_runner_->RunsTasksInCurrentSequence());
127 
128   main_task_runner_->PostTask(
129       FROM_HERE,
130       base::BindOnce(&SecurityKeyMessageReaderImpl::InvokeErrorCallback,
131                      weak_factory_.GetWeakPtr()));
132 }
133 
InvokeMessageCallback(std::unique_ptr<SecurityKeyMessage> message)134 void SecurityKeyMessageReaderImpl::InvokeMessageCallback(
135     std::unique_ptr<SecurityKeyMessage> message) {
136   DCHECK(main_task_runner_->RunsTasksInCurrentSequence());
137   message_callback_.Run(std::move(message));
138 }
139 
InvokeErrorCallback()140 void SecurityKeyMessageReaderImpl::InvokeErrorCallback() {
141   DCHECK(main_task_runner_->RunsTasksInCurrentSequence());
142   std::move(error_callback_).Run();
143 }
144 
145 }  // namespace remoting
146