1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "chrome/browser/nearby_sharing/incoming_frames_reader.h"
6 
7 #include <type_traits>
8 
9 #include "base/threading/sequenced_task_runner_handle.h"
10 #include "chrome/browser/nearby_sharing/logging/logging.h"
11 #include "chrome/browser/nearby_sharing/nearby_connection.h"
12 #include "chrome/browser/nearby_sharing/nearby_process_manager.h"
13 #include "chrome/browser/profiles/profile.h"
14 #include "chromeos/services/nearby/public/mojom/nearby_decoder.mojom.h"
15 
16 namespace {
17 
operator <<(std::ostream & out,const sharing::mojom::V1Frame::Tag & obj)18 std::ostream& operator<<(std::ostream& out,
19                          const sharing::mojom::V1Frame::Tag& obj) {
20   out << static_cast<std::underlying_type<sharing::mojom::V1Frame::Tag>::type>(
21       obj);
22   return out;
23 }
24 
25 }  // namespace
26 
IncomingFramesReader(NearbyProcessManager * process_manager,Profile * profile,NearbyConnection * connection)27 IncomingFramesReader::IncomingFramesReader(
28     NearbyProcessManager* process_manager,
29     Profile* profile,
30     NearbyConnection* connection)
31     : process_manager_(process_manager),
32       profile_(profile),
33       connection_(connection) {
34   DCHECK(process_manager_);
35   DCHECK(profile_);
36   DCHECK(connection_);
37 
38   nearby_process_observer_.Add(process_manager);
39 }
40 
41 IncomingFramesReader::~IncomingFramesReader() = default;
42 
ReadFrame(base::OnceCallback<void (base::Optional<sharing::mojom::V1FramePtr>)> callback)43 void IncomingFramesReader::ReadFrame(
44     base::OnceCallback<void(base::Optional<sharing::mojom::V1FramePtr>)>
45         callback) {
46   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
47   DCHECK(!callback_);
48   DCHECK(!is_process_stopped_);
49 
50   callback_ = std::move(callback);
51   frame_type_ = base::nullopt;
52 
53   // Check in cache for frame.
54   base::Optional<sharing::mojom::V1FramePtr> cached_frame =
55       GetCachedFrame(frame_type_);
56   if (cached_frame) {
57     Done(std::move(cached_frame));
58     return;
59   }
60 
61   ReadNextFrame();
62 }
63 
ReadFrame(sharing::mojom::V1Frame::Tag frame_type,base::OnceCallback<void (base::Optional<sharing::mojom::V1FramePtr>)> callback,base::TimeDelta timeout)64 void IncomingFramesReader::ReadFrame(
65     sharing::mojom::V1Frame::Tag frame_type,
66     base::OnceCallback<void(base::Optional<sharing::mojom::V1FramePtr>)>
67         callback,
68     base::TimeDelta timeout) {
69   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
70   DCHECK(!callback_);
71   DCHECK(!is_process_stopped_);
72   if (!connection_) {
73     std::move(callback).Run(base::nullopt);
74     return;
75   }
76 
77   callback_ = std::move(callback);
78   frame_type_ = frame_type;
79 
80   timeout_callback_.Reset(base::BindOnce(&IncomingFramesReader::OnTimeout,
81                                          weak_ptr_factory_.GetWeakPtr()));
82   base::SequencedTaskRunnerHandle::Get()->PostDelayedTask(
83       FROM_HERE, base::BindOnce(timeout_callback_.callback()), timeout);
84 
85   // Check in cache for frame.
86   base::Optional<sharing::mojom::V1FramePtr> cached_frame =
87       GetCachedFrame(frame_type_);
88   if (cached_frame) {
89     Done(std::move(cached_frame));
90     return;
91   }
92 
93   ReadNextFrame();
94 }
95 
OnNearbyProfileChanged(Profile * profile)96 void IncomingFramesReader::OnNearbyProfileChanged(Profile* profile) {}
97 
OnNearbyProcessStarted()98 void IncomingFramesReader::OnNearbyProcessStarted() {}
99 
OnNearbyProcessStopped()100 void IncomingFramesReader::OnNearbyProcessStopped() {
101   is_process_stopped_ = true;
102   Done(base::nullopt);
103 }
104 
ReadNextFrame()105 void IncomingFramesReader::ReadNextFrame() {
106   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
107   connection_->Read(
108       base::BindOnce(&IncomingFramesReader::OnDataReadFromConnection,
109                      weak_ptr_factory_.GetWeakPtr()));
110 }
111 
OnTimeout()112 void IncomingFramesReader::OnTimeout() {
113   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
114 
115   if (!callback_)
116     return;
117 
118   NS_LOG(WARNING) << __func__ << ": Timed out reading from NearbyConnection.";
119   connection_->Close();
120 }
121 
OnDataReadFromConnection(base::Optional<std::vector<uint8_t>> bytes)122 void IncomingFramesReader::OnDataReadFromConnection(
123     base::Optional<std::vector<uint8_t>> bytes) {
124   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
125 
126   if (!callback_) {
127     return;
128   }
129 
130   if (!bytes) {
131     NS_LOG(WARNING) << __func__ << ": Failed to read frame";
132     Done(base::nullopt);
133     return;
134   }
135 
136   process_manager_->GetOrStartNearbySharingDecoder(profile_)->DecodeFrame(
137       *bytes, base::BindOnce(&IncomingFramesReader::OnFrameDecoded,
138                              weak_ptr_factory_.GetWeakPtr()));
139 }
140 
OnFrameDecoded(sharing::mojom::FramePtr frame)141 void IncomingFramesReader::OnFrameDecoded(sharing::mojom::FramePtr frame) {
142   if (!frame) {
143     ReadNextFrame();
144     return;
145   }
146 
147   if (!frame->is_v1()) {
148     NS_LOG(VERBOSE) << __func__ << ": Frame read does not have V1Frame";
149     ReadNextFrame();
150     return;
151   }
152 
153   sharing::mojom::V1FramePtr v1_frame(std::move(frame->get_v1()));
154   sharing::mojom::V1Frame::Tag v1_frame_type = v1_frame->which();
155   if (frame_type_ && *frame_type_ != v1_frame_type) {
156     NS_LOG(WARNING) << __func__ << ": Failed to read frame of type "
157                     << *frame_type_ << ", but got frame of type "
158                     << v1_frame_type << ". Cached for later.";
159     cached_frames_.insert({v1_frame_type, std::move(v1_frame)});
160     ReadNextFrame();
161     return;
162   }
163 
164   NS_LOG(VERBOSE) << __func__ << ": Successfully read frame of type "
165                   << v1_frame_type;
166   Done(std::move(v1_frame));
167 }
168 
Done(base::Optional<sharing::mojom::V1FramePtr> frame)169 void IncomingFramesReader::Done(
170     base::Optional<sharing::mojom::V1FramePtr> frame) {
171   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
172 
173   frame_type_ = base::nullopt;
174   timeout_callback_.Cancel();
175   if (callback_) {
176     std::move(callback_).Run(std::move(frame));
177   }
178 }
179 
GetCachedFrame(base::Optional<sharing::mojom::V1Frame::Tag> frame_type)180 base::Optional<sharing::mojom::V1FramePtr> IncomingFramesReader::GetCachedFrame(
181     base::Optional<sharing::mojom::V1Frame::Tag> frame_type) {
182   NS_LOG(VERBOSE) << __func__ << ": Fetching cached frame";
183   if (frame_type)
184     NS_LOG(VERBOSE) << __func__ << ": Requested frame type - " << *frame_type;
185 
186   auto iter =
187       frame_type ? cached_frames_.find(*frame_type) : cached_frames_.begin();
188 
189   if (iter == cached_frames_.end())
190     return base::nullopt;
191 
192   NS_LOG(VERBOSE) << __func__ << ": Successfully read cached frame";
193   sharing::mojom::V1FramePtr frame = std::move(iter->second);
194   cached_frames_.erase(iter);
195   return frame;
196 }
197