1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/nearby_sharing/incoming_frames_reader.h"
6
7 #include <type_traits>
8
9 #include "base/threading/sequenced_task_runner_handle.h"
10 #include "chrome/browser/nearby_sharing/logging/logging.h"
11 #include "chrome/browser/nearby_sharing/nearby_connection.h"
12 #include "chrome/browser/nearby_sharing/nearby_process_manager.h"
13 #include "chrome/browser/profiles/profile.h"
14 #include "chromeos/services/nearby/public/mojom/nearby_decoder.mojom.h"
15
16 namespace {
17
operator <<(std::ostream & out,const sharing::mojom::V1Frame::Tag & obj)18 std::ostream& operator<<(std::ostream& out,
19 const sharing::mojom::V1Frame::Tag& obj) {
20 out << static_cast<std::underlying_type<sharing::mojom::V1Frame::Tag>::type>(
21 obj);
22 return out;
23 }
24
25 } // namespace
26
IncomingFramesReader(NearbyProcessManager * process_manager,Profile * profile,NearbyConnection * connection)27 IncomingFramesReader::IncomingFramesReader(
28 NearbyProcessManager* process_manager,
29 Profile* profile,
30 NearbyConnection* connection)
31 : process_manager_(process_manager),
32 profile_(profile),
33 connection_(connection) {
34 DCHECK(process_manager_);
35 DCHECK(profile_);
36 DCHECK(connection_);
37
38 nearby_process_observer_.Add(process_manager);
39 }
40
41 IncomingFramesReader::~IncomingFramesReader() = default;
42
ReadFrame(base::OnceCallback<void (base::Optional<sharing::mojom::V1FramePtr>)> callback)43 void IncomingFramesReader::ReadFrame(
44 base::OnceCallback<void(base::Optional<sharing::mojom::V1FramePtr>)>
45 callback) {
46 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
47 DCHECK(!callback_);
48 DCHECK(!is_process_stopped_);
49
50 callback_ = std::move(callback);
51 frame_type_ = base::nullopt;
52
53 // Check in cache for frame.
54 base::Optional<sharing::mojom::V1FramePtr> cached_frame =
55 GetCachedFrame(frame_type_);
56 if (cached_frame) {
57 Done(std::move(cached_frame));
58 return;
59 }
60
61 ReadNextFrame();
62 }
63
ReadFrame(sharing::mojom::V1Frame::Tag frame_type,base::OnceCallback<void (base::Optional<sharing::mojom::V1FramePtr>)> callback,base::TimeDelta timeout)64 void IncomingFramesReader::ReadFrame(
65 sharing::mojom::V1Frame::Tag frame_type,
66 base::OnceCallback<void(base::Optional<sharing::mojom::V1FramePtr>)>
67 callback,
68 base::TimeDelta timeout) {
69 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
70 DCHECK(!callback_);
71 DCHECK(!is_process_stopped_);
72 if (!connection_) {
73 std::move(callback).Run(base::nullopt);
74 return;
75 }
76
77 callback_ = std::move(callback);
78 frame_type_ = frame_type;
79
80 timeout_callback_.Reset(base::BindOnce(&IncomingFramesReader::OnTimeout,
81 weak_ptr_factory_.GetWeakPtr()));
82 base::SequencedTaskRunnerHandle::Get()->PostDelayedTask(
83 FROM_HERE, base::BindOnce(timeout_callback_.callback()), timeout);
84
85 // Check in cache for frame.
86 base::Optional<sharing::mojom::V1FramePtr> cached_frame =
87 GetCachedFrame(frame_type_);
88 if (cached_frame) {
89 Done(std::move(cached_frame));
90 return;
91 }
92
93 ReadNextFrame();
94 }
95
OnNearbyProfileChanged(Profile * profile)96 void IncomingFramesReader::OnNearbyProfileChanged(Profile* profile) {}
97
OnNearbyProcessStarted()98 void IncomingFramesReader::OnNearbyProcessStarted() {}
99
OnNearbyProcessStopped()100 void IncomingFramesReader::OnNearbyProcessStopped() {
101 is_process_stopped_ = true;
102 Done(base::nullopt);
103 }
104
ReadNextFrame()105 void IncomingFramesReader::ReadNextFrame() {
106 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
107 connection_->Read(
108 base::BindOnce(&IncomingFramesReader::OnDataReadFromConnection,
109 weak_ptr_factory_.GetWeakPtr()));
110 }
111
OnTimeout()112 void IncomingFramesReader::OnTimeout() {
113 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
114
115 if (!callback_)
116 return;
117
118 NS_LOG(WARNING) << __func__ << ": Timed out reading from NearbyConnection.";
119 connection_->Close();
120 }
121
OnDataReadFromConnection(base::Optional<std::vector<uint8_t>> bytes)122 void IncomingFramesReader::OnDataReadFromConnection(
123 base::Optional<std::vector<uint8_t>> bytes) {
124 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
125
126 if (!callback_) {
127 return;
128 }
129
130 if (!bytes) {
131 NS_LOG(WARNING) << __func__ << ": Failed to read frame";
132 Done(base::nullopt);
133 return;
134 }
135
136 process_manager_->GetOrStartNearbySharingDecoder(profile_)->DecodeFrame(
137 *bytes, base::BindOnce(&IncomingFramesReader::OnFrameDecoded,
138 weak_ptr_factory_.GetWeakPtr()));
139 }
140
OnFrameDecoded(sharing::mojom::FramePtr frame)141 void IncomingFramesReader::OnFrameDecoded(sharing::mojom::FramePtr frame) {
142 if (!frame) {
143 ReadNextFrame();
144 return;
145 }
146
147 if (!frame->is_v1()) {
148 NS_LOG(VERBOSE) << __func__ << ": Frame read does not have V1Frame";
149 ReadNextFrame();
150 return;
151 }
152
153 sharing::mojom::V1FramePtr v1_frame(std::move(frame->get_v1()));
154 sharing::mojom::V1Frame::Tag v1_frame_type = v1_frame->which();
155 if (frame_type_ && *frame_type_ != v1_frame_type) {
156 NS_LOG(WARNING) << __func__ << ": Failed to read frame of type "
157 << *frame_type_ << ", but got frame of type "
158 << v1_frame_type << ". Cached for later.";
159 cached_frames_.insert({v1_frame_type, std::move(v1_frame)});
160 ReadNextFrame();
161 return;
162 }
163
164 NS_LOG(VERBOSE) << __func__ << ": Successfully read frame of type "
165 << v1_frame_type;
166 Done(std::move(v1_frame));
167 }
168
Done(base::Optional<sharing::mojom::V1FramePtr> frame)169 void IncomingFramesReader::Done(
170 base::Optional<sharing::mojom::V1FramePtr> frame) {
171 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
172
173 frame_type_ = base::nullopt;
174 timeout_callback_.Cancel();
175 if (callback_) {
176 std::move(callback_).Run(std::move(frame));
177 }
178 }
179
GetCachedFrame(base::Optional<sharing::mojom::V1Frame::Tag> frame_type)180 base::Optional<sharing::mojom::V1FramePtr> IncomingFramesReader::GetCachedFrame(
181 base::Optional<sharing::mojom::V1Frame::Tag> frame_type) {
182 NS_LOG(VERBOSE) << __func__ << ": Fetching cached frame";
183 if (frame_type)
184 NS_LOG(VERBOSE) << __func__ << ": Requested frame type - " << *frame_type;
185
186 auto iter =
187 frame_type ? cached_frames_.find(*frame_type) : cached_frames_.begin();
188
189 if (iter == cached_frames_.end())
190 return base::nullopt;
191
192 NS_LOG(VERBOSE) << __func__ << ": Successfully read cached frame";
193 sharing::mojom::V1FramePtr frame = std::move(iter->second);
194 cached_frames_.erase(iter);
195 return frame;
196 }
197